erasure: MakeVol, DeleteVol and StatVol should hold locks. (#1597)

Since there is a good amount of overlap, each code has to lock
properly for the operation they are going to perform.

- MakeVol create vols in a routine on all disks, hold locks.
- DeleteVol delete vols in a routine on all disks, hold locks.
- StatVol stat vols in a routine on all disks, hold locks.

Fixes #1588
This commit is contained in:
Harshavardhana 2016-05-11 12:54:21 -07:00
parent 72748d2073
commit 86e5d71519
3 changed files with 104 additions and 25 deletions

View file

@ -83,15 +83,20 @@ func parseDirents(dirPath string, buf []byte) (entries []string, err error) {
// On Linux XFS does not implement d_type for on disk
// format << v5. Fall back to Stat().
var fi os.FileInfo
if fi, err = os.Stat(path.Join(dirPath, name)); err == nil {
if fi.IsDir() {
entries = append(entries, fi.Name()+slashSeparator)
} else if fi.Mode().IsRegular() {
entries = append(entries, fi.Name())
fi, err = os.Stat(path.Join(dirPath, name))
if err != nil {
// If file does not exist, we continue and skip it.
// Could happen if it was deleted in the middle while
// this list was being performed.
if os.IsNotExist(err) {
continue
}
} else {
// This is unexpected.
return
return nil, err
}
if fi.IsDir() {
entries = append(entries, fi.Name()+slashSeparator)
} else if fi.Mode().IsRegular() {
entries = append(entries, fi.Name())
}
default:
// Skip entries which are not file or directory.

View file

@ -143,7 +143,7 @@ func getAllUniqueVols(dirPath string) ([]VolInfo, error) {
log.WithFields(logrus.Fields{
"dirPath": dirPath,
}).Debugf("readDir failed with error %s", err)
return nil, err
return nil, errDiskNotFound
}
var volsInfo []VolInfo
for _, entry := range entries {
@ -151,11 +151,16 @@ func getAllUniqueVols(dirPath string) ([]VolInfo, error) {
// Skip if entry is neither a directory not a valid volume name.
continue
}
fi, err := os.Stat(pathJoin(dirPath, entry))
var fi os.FileInfo
fi, err = os.Stat(pathJoin(dirPath, entry))
if err != nil {
log.WithFields(logrus.Fields{
"path": pathJoin(dirPath, entry),
}).Debugf("Stat failed with error %s", err)
// If the file does not exist, skip the entry.
if os.IsNotExist(err) {
continue
}
return nil, err
}
volsInfo = append(volsInfo, VolInfo{
@ -186,7 +191,7 @@ func (s fsStorage) getVolumeDir(volume string) (string, error) {
volsInfo, err = getAllUniqueVols(s.diskPath)
if err != nil {
// For any errors, treat it as disk not found.
return volumeDir, errDiskNotFound
return volumeDir, err
}
for _, vol := range volsInfo {
// Verify if lowercase version of the volume is equal to

View file

@ -23,6 +23,7 @@ import (
"strings"
"path"
"sync"
"github.com/Sirupsen/logrus"
"github.com/klauspost/reedsolomon"
@ -115,8 +116,11 @@ func (xl XL) MakeVol(volume string) error {
return errInvalidArgument
}
// Hold read lock.
nsMutex.RLock(volume, "")
// Verify if the volume already exists.
_, errs := xl.getAllVolumeInfo(volume)
nsMutex.RUnlock(volume, "")
// Count errors other than errVolumeNotFound, bigger than the allowed
// readQuorum, if yes throw an error.
@ -133,11 +137,36 @@ func (xl XL) MakeVol(volume string) error {
}
}
createVolErr := 0
volumeExistsErrCnt := 0
// Hold a write lock before creating a volume.
nsMutex.Lock(volume, "")
defer nsMutex.Unlock(volume, "")
// Err counters.
createVolErr := 0 // Count generic create vol errs.
volumeExistsErrCnt := 0 // Count all errVolumeExists errs.
// Initialize sync waitgroup.
var wg = &sync.WaitGroup{}
// Initialize list of errors.
var dErrs = make([]error, len(xl.storageDisks))
// Make a volume entry on all underlying storage disks.
for _, disk := range xl.storageDisks {
if err := disk.MakeVol(volume); err != nil {
for index, disk := range xl.storageDisks {
wg.Add(1)
// Make a volume inside a go-routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
dErrs[index] = disk.MakeVol(volume)
}(index, disk)
}
// Wait for all make vol to finish.
wg.Wait()
// Loop through all the concocted errors.
for _, err := range dErrs {
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
}).Errorf("MakeVol failed with %s", err)
@ -150,11 +179,15 @@ func (xl XL) MakeVol(volume string) error {
}
continue
}
// Update error counter separately.
createVolErr++
if createVolErr <= len(xl.storageDisks)-xl.writeQuorum {
continue
}
// Return errWriteQuorum if errors were more than
// allowed write quorum.
return errWriteQuorum
}
}
@ -167,12 +200,32 @@ func (xl XL) DeleteVol(volume string) error {
return errInvalidArgument
}
// Hold a write lock for Delete volume.
nsMutex.Lock(volume, "")
defer nsMutex.Unlock(volume, "")
// Collect if all disks report volume not found.
var volumeNotFoundErrCnt int
var wg = &sync.WaitGroup{}
var dErrs = make([]error, len(xl.storageDisks))
// Remove a volume entry on all underlying storage disks.
for _, disk := range xl.storageDisks {
if err := disk.DeleteVol(volume); err != nil {
for index, disk := range xl.storageDisks {
wg.Add(1)
// Delete volume inside a go-routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
dErrs[index] = disk.DeleteVol(volume)
}(index, disk)
}
// Wait for all the delete vols to finish.
wg.Wait()
// Loop through concocted errors and return anything unusual.
for _, err := range dErrs {
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
}).Errorf("DeleteVol failed with %s", err)
@ -245,16 +298,30 @@ func (xl XL) ListVols() (volsInfo []VolInfo, err error) {
// getAllVolumeInfo - get bucket volume info from all disks.
// Returns error slice indicating the failed volume stat operations.
func (xl XL) getAllVolumeInfo(volume string) (volsInfo []VolInfo, errs []error) {
// Create errs and volInfo slices of storageDisks size.
errs = make([]error, len(xl.storageDisks))
volsInfo = make([]VolInfo, len(xl.storageDisks))
// Allocate a new waitgroup.
var wg = &sync.WaitGroup{}
for index, disk := range xl.storageDisks {
volInfo, err := disk.StatVol(volume)
if err != nil {
errs[index] = err
continue
}
volsInfo[index] = volInfo
wg.Add(1)
// Stat volume on all the disks in a routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
volInfo, err := disk.StatVol(volume)
if err != nil {
errs[index] = err
return
}
volsInfo[index] = volInfo
}(index, disk)
}
// Wait for all the Stat operations to finish.
wg.Wait()
// Return the concocted values.
return volsInfo, errs
}
@ -349,6 +416,8 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) {
if !isValidVolname(volume) {
return VolInfo{}, errInvalidArgument
}
// Acquire a read lock before reading.
nsMutex.RLock(volume, "")
volsInfo, heal, err := xl.listAllVolumeInfo(volume)
nsMutex.RUnlock(volume, "")
@ -361,10 +430,10 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) {
if heal {
go func() {
if herr := xl.healVolume(volume); herr != nil {
if hErr := xl.healVolume(volume); hErr != nil {
log.WithFields(logrus.Fields{
"volume": volume,
}).Errorf("healVolume failed with %s", herr)
}).Errorf("healVolume failed with %s", hErr)
return
}
}()