diff --git a/object-api_test.go b/object-api_test.go index 32678529b..23ae3116a 100644 --- a/object-api_test.go +++ b/object-api_test.go @@ -29,6 +29,10 @@ var _ = Suite(&MySuite{}) func (s *MySuite) TestFSAPISuite(c *C) { var storageList []string + + // Initialize name space lock. + initNSLock() + create := func() ObjectLayer { path, err := ioutil.TempDir(os.TempDir(), "minio-") c.Check(err, IsNil) diff --git a/object-common-multipart.go b/object-common-multipart.go index 824e461e4..3d4ed51ba 100644 --- a/object-common-multipart.go +++ b/object-common-multipart.go @@ -84,7 +84,9 @@ func newMultipartUploadCommon(storage StorageAPI, bucket string, object string) if !IsValidObjectName(object) { return "", ObjectNameInvalid{Bucket: bucket, Object: object} } - + // This lock needs to be held for any changes to the directory contents of ".minio/multipart/object/" + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) // Loops through until successfully generates a new unique upload id. for { uuid, err := uuid.New() @@ -146,6 +148,13 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa if !isUploadIDExists(storage, bucket, object, uploadID) { return "", InvalidUploadID{UploadID: uploadID} } + // Hold read lock on the uploadID so that no one aborts it. + nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + defer nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + + // Hold write lock on the part so that there is no parallel upload on the part. + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID))) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID))) partSuffix := fmt.Sprintf("%s.%.5d", uploadID, partID) partSuffixPath := path.Join(tmpMetaPrefix, bucket, object, partSuffix) @@ -246,6 +255,10 @@ func abortMultipartUploadCommon(storage StorageAPI, bucket, object, uploadID str return InvalidUploadID{UploadID: uploadID} } + // Hold lock so that there is no competing complete-multipart-upload or put-object-part. + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + if err := cleanupUploadedParts(storage, bucket, object, uploadID); err != nil { return err } @@ -509,6 +522,9 @@ func listObjectPartsCommon(storage StorageAPI, bucket, object, uploadID string, if !isUploadIDExists(storage, bucket, object, uploadID) { return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID} } + // Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload. + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) result := ListPartsInfo{} entries, err := storage.ListDir(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID)) if err != nil { diff --git a/xl-erasure-v1-createfile.go b/xl-erasure-v1-createfile.go index 2ee6881f6..aa669f5d6 100644 --- a/xl-erasure-v1-createfile.go +++ b/xl-erasure-v1-createfile.go @@ -53,10 +53,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w // Release the block writer upon function return. defer wcloser.release() - // Lock right before reading from disk. - nsMutex.RLock(volume, path) partsMetadata, errs := xl.getPartsMetadata(volume, path) - nsMutex.RUnlock(volume, path) // Convert errs into meaningful err to be sent upwards if possible // based on total number of errors and read quorum. @@ -240,10 +237,6 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w } } - // Lock right before commit to disk. - nsMutex.Lock(volume, path) - defer nsMutex.Unlock(volume, path) - // Close all writers and metadata writers in routines. for index, writer := range writers { if writer == nil { diff --git a/xl-erasure-v1-healfile.go b/xl-erasure-v1-healfile.go index ca86718e8..0def508cc 100644 --- a/xl-erasure-v1-healfile.go +++ b/xl-erasure-v1-healfile.go @@ -30,10 +30,6 @@ func (xl XL) healFile(volume string, path string) error { var readers = make([]io.Reader, totalBlocks) var writers = make([]io.WriteCloser, totalBlocks) - // Acquire a read lock. - nsMutex.RLock(volume, path) - defer nsMutex.RUnlock(volume, path) - // List all online disks to verify if we need to heal. onlineDisks, metadata, heal, err := xl.listOnlineDisks(volume, path) if err != nil { diff --git a/xl-erasure-v1-readfile.go b/xl-erasure-v1-readfile.go index 795d8eff3..987cd6143 100644 --- a/xl-erasure-v1-readfile.go +++ b/xl-erasure-v1-readfile.go @@ -34,10 +34,7 @@ func (xl XL) ReadFile(volume, path string, startOffset int64) (io.ReadCloser, er return nil, errInvalidArgument } - // Acquire a read lock. - nsMutex.RLock(volume, path) onlineDisks, metadata, heal, err := xl.listOnlineDisks(volume, path) - nsMutex.RUnlock(volume, path) if err != nil { return nil, err } @@ -51,8 +48,6 @@ func (xl XL) ReadFile(volume, path string, startOffset int64) (io.ReadCloser, er }() } - // Acquire read lock again. - nsMutex.RLock(volume, path) readers := make([]io.ReadCloser, len(xl.storageDisks)) for index, disk := range onlineDisks { if disk == nil { @@ -67,7 +62,6 @@ func (xl XL) ReadFile(volume, path string, startOffset int64) (io.ReadCloser, er readers[index] = reader } } - nsMutex.RUnlock(volume, path) // Initialize pipe. pipeReader, pipeWriter := io.Pipe() diff --git a/xl-erasure-v1.go b/xl-erasure-v1.go index b024836b7..88b4c5b08 100644 --- a/xl-erasure-v1.go +++ b/xl-erasure-v1.go @@ -127,10 +127,6 @@ func (xl XL) MakeVol(volume string) error { return errInvalidArgument } - // Hold a write lock before creating a volume. - nsMutex.Lock(volume, "") - defer nsMutex.Unlock(volume, "") - // Err counters. createVolErr := 0 // Count generic create vol errs. volumeExistsErrCnt := 0 // Count all errVolumeExists errs. @@ -188,10 +184,6 @@ func (xl XL) DeleteVol(volume string) error { return errInvalidArgument } - // Hold a write lock for Delete volume. - nsMutex.Lock(volume, "") - defer nsMutex.Unlock(volume, "") - // Collect if all disks report volume not found. var volumeNotFoundErrCnt int @@ -369,10 +361,6 @@ func (xl XL) listAllVolInfo(volume string) ([]VolInfo, bool, error) { // healVolume - heals any missing volumes. func (xl XL) healVolume(volume string) error { - // Acquire a read lock. - nsMutex.RLock(volume, "") - defer nsMutex.RUnlock(volume, "") - // Lists volume info for all online disks. volsInfo, heal, err := xl.listAllVolInfo(volume) if err != nil { @@ -420,10 +408,7 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) { return VolInfo{}, errInvalidArgument } - // Acquire a read lock before reading. - nsMutex.RLock(volume, "") volsInfo, heal, err := xl.listAllVolInfo(volume) - nsMutex.RUnlock(volume, "") if err != nil { return VolInfo{}, err } @@ -500,10 +485,7 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) { return FileInfo{}, errInvalidArgument } - // Acquire read lock. - nsMutex.RLock(volume, path) _, metadata, heal, err := xl.listOnlineDisks(volume, path) - nsMutex.RUnlock(volume, path) if err != nil { return FileInfo{}, err } @@ -535,9 +517,6 @@ func (xl XL) DeleteFile(volume, path string) error { return errInvalidArgument } - nsMutex.Lock(volume, path) - defer nsMutex.Unlock(volume, path) - errCount := 0 // Update meta data file and remove part file for index, disk := range xl.storageDisks { @@ -590,14 +569,6 @@ func (xl XL) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error { return errInvalidArgument } - // Hold read lock at source before rename. - nsMutex.RLock(srcVolume, srcPath) - defer nsMutex.RUnlock(srcVolume, srcPath) - - // Hold write lock at destination before rename. - nsMutex.Lock(dstVolume, dstPath) - defer nsMutex.Unlock(dstVolume, dstPath) - errCount := 0 for _, disk := range xl.storageDisks { // Append "/" as srcPath and dstPath are either leaf-dirs or non-leaf-dris. diff --git a/xl-objects-multipart.go b/xl-objects-multipart.go index 608771769..12ce5c884 100644 --- a/xl-objects-multipart.go +++ b/xl-objects-multipart.go @@ -133,6 +133,11 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload if !isUploadIDExists(xl.storage, bucket, object, uploadID) { return "", InvalidUploadID{UploadID: uploadID} } + // Hold lock so that + // 1) no one aborts this multipart upload + // 2) no one does a parallel complete-multipart-upload on this multipart upload + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) // Calculate s3 compatible md5sum for complete multipart. s3MD5, err := completeMultipartMD5(parts...) @@ -245,6 +250,10 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return "", toObjectErr(err, minioMetaBucket, uploadIDIncompletePath) } + // Hold write lock on the destination before rename + nsMutex.Lock(bucket, object) + defer nsMutex.Unlock(bucket, object) + // Delete if an object already exists. // FIXME: rename it to tmp file and delete only after // the newly uploaded file is renamed from tmp location to @@ -258,6 +267,12 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload if err = xl.storage.RenameFile(minioMetaBucket, uploadIDPath, bucket, object); err != nil { return "", toObjectErr(err, bucket, object) } + + // Hold the lock so that two parallel complete-multipart-uploads do no + // leave a stale uploads.json behind. + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) + // Validate if there are other incomplete upload-id's present for // the object, if yes do not attempt to delete 'uploads.json'. var entries []string diff --git a/xl-objects.go b/xl-objects.go index 7b8539d24..342210fcd 100644 --- a/xl-objects.go +++ b/xl-objects.go @@ -121,11 +121,15 @@ func newXLObjects(exportPaths ...string) (ObjectLayer, error) { // MakeBucket - make a bucket. func (xl xlObjects) MakeBucket(bucket string) error { + nsMutex.Lock(bucket, "") + defer nsMutex.Unlock(bucket, "") return makeBucket(xl.storage, bucket) } // GetBucketInfo - get bucket info. func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) { + nsMutex.RLock(bucket, "") + defer nsMutex.RUnlock(bucket, "") return getBucketInfo(xl.storage, bucket) } @@ -136,6 +140,8 @@ func (xl xlObjects) ListBuckets() ([]BucketInfo, error) { // DeleteBucket - delete a bucket. func (xl xlObjects) DeleteBucket(bucket string) error { + nsMutex.Lock(bucket, "") + nsMutex.Unlock(bucket, "") return deleteBucket(xl.storage, bucket) } @@ -151,6 +157,8 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read if !IsValidObjectName(object) { return nil, ObjectNameInvalid{Bucket: bucket, Object: object} } + nsMutex.RLock(bucket, object) + defer nsMutex.RUnlock(bucket, object) if ok, err := isMultipartObject(xl.storage, bucket, object); err != nil { return nil, toObjectErr(err, bucket, object) } else if !ok { @@ -173,7 +181,13 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read if err != nil { return nil, toObjectErr(err, bucket, object) } + + // Hold a read lock once more which can be released after the following go-routine ends. + // We hold RLock once more because the current function would return before the go routine below + // executes and hence releasing the read lock (because of defer'ed nsMutex.RUnlock() call). + nsMutex.RLock(bucket, object) go func() { + defer nsMutex.RUnlock(bucket, object) for ; partIndex < len(info.Parts); partIndex++ { part := info.Parts[partIndex] r, err := xl.storage.ReadFile(bucket, pathJoin(object, partNumToPartFileName(part.PartNumber)), offset) @@ -263,6 +277,8 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { if !IsValidObjectName(object) { return ObjectInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} } + nsMutex.RLock(bucket, object) + defer nsMutex.RUnlock(bucket, object) info, err := xl.getObjectInfo(bucket, object) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) @@ -286,6 +302,8 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. Object: object, } } + nsMutex.Lock(bucket, object) + defer nsMutex.Unlock(bucket, object) tempObj := path.Join(tmpMetaPrefix, bucket, object) fileWriter, err := xl.storage.CreateFile(minioMetaBucket, tempObj) @@ -468,6 +486,8 @@ func (xl xlObjects) DeleteObject(bucket, object string) error { if !IsValidObjectName(object) { return ObjectNameInvalid{Bucket: bucket, Object: object} } + nsMutex.Lock(bucket, object) + defer nsMutex.Unlock(bucket, object) if err := xl.deleteObject(bucket, object); err != nil { return toObjectErr(err, bucket, object) }