From 8099396ff0db764baa7102e86868c21f510657ca Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Sat, 14 May 2016 00:22:36 +0530 Subject: [PATCH] xl/putObject: Should take care of the situation if an object already exists at the location. (#1606) Fixes #1598 #1594 #1595 --- fs-objects.go | 70 ++++++++++++++- object-common.go | 83 ------------------ posix.go | 25 +++++- xl-erasure-v1.go | 52 ++---------- xl-objects-multipart.go | 63 +++++++++++--- xl-objects.go | 184 ++++++++++++++++++++++++++++++++-------- 6 files changed, 301 insertions(+), 176 deletions(-) diff --git a/fs-objects.go b/fs-objects.go index 1bf28bb3f..f82d573f4 100644 --- a/fs-objects.go +++ b/fs-objects.go @@ -17,6 +17,8 @@ package main import ( + "crypto/md5" + "encoding/hex" "io" "path/filepath" "strings" @@ -143,7 +145,73 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { // PutObject - create an object. func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) { - return putObjectCommon(fs.storage, bucket, object, size, data, metadata) + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return "", BucketNameInvalid{Bucket: bucket} + } + // Check whether the bucket exists. + if !isBucketExist(fs.storage, bucket) { + return "", BucketNotFound{Bucket: bucket} + } + if !IsValidObjectName(object) { + return "", ObjectNameInvalid{ + Bucket: bucket, + Object: object, + } + } + + fileWriter, err := fs.storage.CreateFile(bucket, object) + if err != nil { + return "", toObjectErr(err, bucket, object) + } + + // Initialize md5 writer. + md5Writer := md5.New() + + // Instantiate a new multi writer. + multiWriter := io.MultiWriter(md5Writer, fileWriter) + + // Instantiate checksum hashers and create a multiwriter. + if size > 0 { + if _, err = io.CopyN(multiWriter, data, size); err != nil { + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", clErr + } + return "", toObjectErr(err, bucket, object) + } + } else { + if _, err = io.Copy(multiWriter, data); err != nil { + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", clErr + } + return "", toObjectErr(err, bucket, object) + } + } + + newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) + // md5Hex representation. + var md5Hex string + if len(metadata) != 0 { + md5Hex = metadata["md5Sum"] + } + if md5Hex != "" { + if newMD5Hex != md5Hex { + if err = safeCloseAndRemove(fileWriter); err != nil { + return "", err + } + return "", BadDigest{md5Hex, newMD5Hex} + } + } + err = fileWriter.Close() + if err != nil { + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", clErr + } + return "", err + } + + // Return md5sum, successfully wrote object. + return newMD5Hex, nil } func (fs fsObjects) DeleteObject(bucket, object string) error { diff --git a/object-common.go b/object-common.go index d8ab93239..a705c8266 100644 --- a/object-common.go +++ b/object-common.go @@ -17,10 +17,6 @@ package main import ( - "crypto/md5" - "encoding/hex" - "io" - "path" "sort" "strings" @@ -142,85 +138,6 @@ func deleteBucket(storage StorageAPI, bucket string) error { return nil } -// putObjectCommon - create an object, is a common function for both object layers. -func putObjectCommon(storage StorageAPI, bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return "", BucketNameInvalid{Bucket: bucket} - } - // Check whether the bucket exists. - if !isBucketExist(storage, bucket) { - return "", BucketNotFound{Bucket: bucket} - } - if !IsValidObjectName(object) { - return "", ObjectNameInvalid{ - Bucket: bucket, - Object: object, - } - } - - tempObj := path.Join(tmpMetaPrefix, bucket, object) - fileWriter, err := storage.CreateFile(minioMetaBucket, tempObj) - if err != nil { - return "", toObjectErr(err, bucket, object) - } - - // Initialize md5 writer. - md5Writer := md5.New() - - // Instantiate a new multi writer. - multiWriter := io.MultiWriter(md5Writer, fileWriter) - - // Instantiate checksum hashers and create a multiwriter. - if size > 0 { - if _, err = io.CopyN(multiWriter, data, size); err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", clErr - } - return "", toObjectErr(err, bucket, object) - } - } else { - if _, err = io.Copy(multiWriter, data); err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", clErr - } - return "", toObjectErr(err, bucket, object) - } - } - - newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) - // md5Hex representation. - var md5Hex string - if len(metadata) != 0 { - md5Hex = metadata["md5Sum"] - } - if md5Hex != "" { - if newMD5Hex != md5Hex { - if err = safeCloseAndRemove(fileWriter); err != nil { - return "", err - } - return "", BadDigest{md5Hex, newMD5Hex} - } - } - err = fileWriter.Close() - if err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", clErr - } - return "", err - } - err = storage.RenameFile(minioMetaBucket, tempObj, bucket, object) - if err != nil { - if derr := storage.DeleteFile(minioMetaBucket, tempObj); derr != nil { - return "", derr - } - return "", toObjectErr(err, bucket, object) - } - - // Return md5sum, successfully wrote object. - return newMD5Hex, nil -} - func listObjectsCommon(layer ObjectLayer, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { var storage StorageAPI switch l := layer.(type) { diff --git a/posix.go b/posix.go index ac06b3dde..fc250852f 100644 --- a/posix.go +++ b/posix.go @@ -650,7 +650,30 @@ func (s fsStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) err }).Errorf("getVolumeDir failed with %s", err) return err } - if err = os.MkdirAll(slashpath.Join(dstVolumeDir, slashpath.Dir(dstPath)), 0755); err != nil { + srcIsDir := strings.HasSuffix(srcPath, slashSeparator) + dstIsDir := strings.HasSuffix(dstPath, slashSeparator) + // for XL src and dst are always directories. + // for FS src and dst are always files. + if !(srcIsDir && dstIsDir || !srcIsDir && !dstIsDir) { + // Either src and dst have to be directories or files, else return error. + log.Errorf("source and destination are not of same file type. source=%s, destination=%s", srcPath, dstPath) + return errFileAccessDenied + } + if srcIsDir { + // If source is a directory we expect the destination to be non-existent always. + _, err = os.Stat(slashpath.Join(dstVolumeDir, dstPath)) + if err == nil { + log.Errorf("Source is a directory and destination exists. source=%s, destination=%s", srcPath, dstPath) + return errFileAccessDenied + } + if !os.IsNotExist(err) { + // Return error for any error other than ENOENT. + log.Errorf("Stat failed with %s", err) + return err + } + // Destination does not exist, hence proceed with the rename. + } + if err = os.MkdirAll(slashpath.Dir(slashpath.Join(dstVolumeDir, dstPath)), 0755); err != nil { // File path cannot be verified since one of the parents is a file. if strings.Contains(err.Error(), "not a directory") { return errFileAccessDenied diff --git a/xl-erasure-v1.go b/xl-erasure-v1.go index 56d8544ce..feb2cb37f 100644 --- a/xl-erasure-v1.go +++ b/xl-erasure-v1.go @@ -628,54 +628,18 @@ func (xl XL) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error { defer nsMutex.Unlock(dstVolume, dstPath) errCount := 0 - for index, disk := range xl.storageDisks { - // Make sure to rename all the files only, not directories. - srcErasurePartPath := slashpath.Join(srcPath, fmt.Sprintf("file.%d", index)) - dstErasurePartPath := slashpath.Join(dstPath, fmt.Sprintf("file.%d", index)) - err := disk.RenameFile(srcVolume, srcErasurePartPath, dstVolume, dstErasurePartPath) - if err != nil { - log.WithFields(logrus.Fields{ - "srcVolume": srcVolume, - "srcPath": srcErasurePartPath, - "dstVolume": dstVolume, - "dstPath": dstErasurePartPath, - }).Errorf("RenameFile failed with %s", err) - - errCount++ - // We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum - // otherwise return failure. - if errCount <= len(xl.storageDisks)-xl.writeQuorum { - continue - } - - return err - } - srcXLMetaPath := slashpath.Join(srcPath, xlMetaV1File) - dstXLMetaPath := slashpath.Join(dstPath, xlMetaV1File) - err = disk.RenameFile(srcVolume, srcXLMetaPath, dstVolume, dstXLMetaPath) - if err != nil { - log.WithFields(logrus.Fields{ - "srcVolume": srcVolume, - "srcPath": srcXLMetaPath, - "dstVolume": dstVolume, - "dstPath": dstXLMetaPath, - }).Errorf("RenameFile failed with %s", err) - - errCount++ - // We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum - // otherwise return failure. - if errCount <= len(xl.storageDisks)-xl.writeQuorum { - continue - } - - return err - } - err = disk.DeleteFile(srcVolume, srcPath) + for _, disk := range xl.storageDisks { + // Append "/" as srcPath and dstPath are either leaf-dirs or non-leaf-dris. + // If srcPath is an object instead of prefix we just rename the leaf-dir and + // not rename the part and metadata files separately. + err := disk.RenameFile(srcVolume, retainSlash(srcPath), dstVolume, retainSlash(dstPath)) if err != nil { log.WithFields(logrus.Fields{ "srcVolume": srcVolume, "srcPath": srcPath, - }).Errorf("DeleteFile failed with %s", err) + "dstVolume": dstVolume, + "dstPath": dstPath, + }).Errorf("RenameFile failed with %s", err) errCount++ // We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum diff --git a/xl-objects-multipart.go b/xl-objects-multipart.go index d6b4da0e4..2e42f8ca5 100644 --- a/xl-objects-multipart.go +++ b/xl-objects-multipart.go @@ -92,6 +92,29 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM return listObjectPartsCommon(xl.storage, bucket, object, uploadID, partNumberMarker, maxParts) } +// This function does the following check, suppose +// object is "a/b/c/d", stat makes sure that objects ""a/b/c"" +// "a/b" and "a" do not exist. +func (xl xlObjects) parentDirIsObject(bucket, parent string) error { + var stat func(string) error + stat = func(p string) error { + if p == "." { + return nil + } + _, err := xl.getObjectInfo(bucket, p) + if err == nil { + // If there is already a file at prefix "p" return error. + return errFileAccessDenied + } + if err == errFileNotFound { + // Check if there is a file as one of the parent paths. + return stat(path.Dir(p)) + } + return err + } + return stat(parent) +} + func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { @@ -149,13 +172,18 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload metadata.Size += fi.Size } + // check if an object is present as one of the parent dir. + if err = xl.parentDirIsObject(bucket, path.Dir(object)); err != nil { + return "", toObjectErr(err, bucket, object) + } + // Save successfully calculated md5sum. metadata.MD5Sum = s3MD5 // Save modTime as well as the current time. metadata.ModTime = time.Now().UTC() // Create temporary multipart meta file to write and then rename. - tempMultipartMetaFile := path.Join(tmpMetaPrefix, bucket, object, multipartMetaFile) + tempMultipartMetaFile := path.Join(tmpMetaPrefix, bucket, object, uploadID, multipartMetaFile) w, err := xl.storage.CreateFile(minioMetaBucket, tempMultipartMetaFile) if err != nil { return "", toObjectErr(err, bucket, object) @@ -164,21 +192,21 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload err = encoder.Encode(&metadata) if err != nil { if err = safeCloseAndRemove(w); err != nil { - return "", err + return "", toObjectErr(err, bucket, object) } - return "", err + return "", toObjectErr(err, bucket, object) } // Close the writer. if err = w.Close(); err != nil { if err = safeCloseAndRemove(w); err != nil { - return "", err + return "", toObjectErr(err, bucket, object) } - return "", err + return "", toObjectErr(err, bucket, object) } // Attempt a Rename of multipart meta file to final namespace. - multipartObjFile := path.Join(object, multipartMetaFile) - err = xl.storage.RenameFile(minioMetaBucket, tempMultipartMetaFile, bucket, multipartObjFile) + multipartObjFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, multipartMetaFile) + err = xl.storage.RenameFile(minioMetaBucket, tempMultipartMetaFile, minioMetaBucket, multipartObjFile) if err != nil { if derr := xl.storage.DeleteFile(minioMetaBucket, tempMultipartMetaFile); derr != nil { return "", toObjectErr(err, minioMetaBucket, tempMultipartMetaFile) @@ -192,11 +220,11 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload go func(index int, part completePart) { defer wg.Done() partSuffix := fmt.Sprintf("%.5d.%s", part.PartNumber, part.ETag) - multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) - multipartObjSuffix := path.Join(object, partNumToPartFileName(part.PartNumber)) - errs[index] = xl.storage.RenameFile(minioMetaBucket, multipartPartFile, bucket, multipartObjSuffix) + src := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) + dst := path.Join(mpartMetaPrefix, bucket, object, uploadID, partNumToPartFileName(part.PartNumber)) + errs[index] = xl.storage.RenameFile(minioMetaBucket, src, minioMetaBucket, dst) if errs[index] != nil { - log.Errorf("Unable to rename file %s to %s, failed with %s", multipartPartFile, multipartObjSuffix, errs[index]) + log.Errorf("Unable to rename file %s to %s, failed with %s", src, dst, errs[index]) } }(index, part) } @@ -218,6 +246,18 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return "", toObjectErr(err, minioMetaBucket, uploadIDPath) } + // Delete if an object already exists. + // FIXME: rename it to tmp file and delete only after + // the newly uploaded file is renamed from tmp location to + // the original location. + err = xl.deleteObject(bucket, object) + if err != nil && err != errFileNotFound { + return "", toObjectErr(err, bucket, object) + } + + if err = xl.storage.RenameFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID), bucket, object); err != nil { + return "", toObjectErr(err, bucket, object) + } // Validate if there are other incomplete upload-id's present for // the object, if yes do not attempt to delete 'uploads.json'. var entries []string @@ -226,6 +266,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return s3MD5, nil } } + uploadsJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) err = xl.storage.DeleteFile(minioMetaBucket, uploadsJSONPath) if err != nil { diff --git a/xl-objects.go b/xl-objects.go index 763061406..3b3d173da 100644 --- a/xl-objects.go +++ b/xl-objects.go @@ -17,10 +17,13 @@ package main import ( + "crypto/md5" + "encoding/hex" "encoding/json" "errors" "fmt" "io" + "path" "path/filepath" "strings" "sync" @@ -223,29 +226,19 @@ func getMultipartObjectInfo(storage StorageAPI, bucket, object string) (info Mul return info, nil } -// GetObjectInfo - get object info. -func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return ObjectInfo{}, BucketNameInvalid{Bucket: bucket} - } - // Check whether the bucket exists. - if !isBucketExist(xl.storage, bucket) { - return ObjectInfo{}, BucketNotFound{Bucket: bucket} - } - // Verify if object is valid. - if !IsValidObjectName(object) { - return ObjectInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} - } +// Return ObjectInfo. +func (xl xlObjects) getObjectInfo(bucket, object string) (ObjectInfo, error) { + // First see if the object was a simple-PUT upload. fi, err := xl.storage.StatFile(bucket, object) if err != nil { if err != errFileNotFound { - return ObjectInfo{}, toObjectErr(err, bucket, object) + return ObjectInfo{}, err } var info MultipartObjectInfo + // Check if the object was multipart upload. info, err = getMultipartObjectInfo(xl.storage, bucket, object) if err != nil { - return ObjectInfo{}, toObjectErr(err, bucket, object) + return ObjectInfo{}, err } fi.Size = info.Size fi.ModTime = info.ModTime @@ -269,9 +262,118 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { }, nil } +// GetObjectInfo - get object info. +func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return ObjectInfo{}, BucketNameInvalid{Bucket: bucket} + } + // Check whether the bucket exists. + if !isBucketExist(xl.storage, bucket) { + return ObjectInfo{}, BucketNotFound{Bucket: bucket} + } + // Verify if object is valid. + if !IsValidObjectName(object) { + return ObjectInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} + } + info, err := xl.getObjectInfo(bucket, object) + if err != nil { + return ObjectInfo{}, toObjectErr(err, bucket, object) + } + return info, nil +} + // PutObject - create an object. func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) { - return putObjectCommon(xl.storage, bucket, object, size, data, metadata) + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return "", BucketNameInvalid{Bucket: bucket} + } + // Check whether the bucket exists. + if !isBucketExist(xl.storage, bucket) { + return "", BucketNotFound{Bucket: bucket} + } + if !IsValidObjectName(object) { + return "", ObjectNameInvalid{ + Bucket: bucket, + Object: object, + } + } + + tempObj := path.Join(tmpMetaPrefix, bucket, object) + fileWriter, err := xl.storage.CreateFile(minioMetaBucket, tempObj) + if err != nil { + return "", toObjectErr(err, bucket, object) + } + + // Initialize md5 writer. + md5Writer := md5.New() + + // Instantiate a new multi writer. + multiWriter := io.MultiWriter(md5Writer, fileWriter) + + // Instantiate checksum hashers and create a multiwriter. + if size > 0 { + if _, err = io.CopyN(multiWriter, data, size); err != nil { + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", toObjectErr(clErr, bucket, object) + } + return "", toObjectErr(err, bucket, object) + } + } else { + if _, err = io.Copy(multiWriter, data); err != nil { + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", toObjectErr(clErr, bucket, object) + } + return "", toObjectErr(err, bucket, object) + } + } + + newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) + // md5Hex representation. + var md5Hex string + if len(metadata) != 0 { + md5Hex = metadata["md5Sum"] + } + if md5Hex != "" { + if newMD5Hex != md5Hex { + if err = safeCloseAndRemove(fileWriter); err != nil { + return "", toObjectErr(err, bucket, object) + } + return "", BadDigest{md5Hex, newMD5Hex} + } + } + err = fileWriter.Close() + if err != nil { + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", toObjectErr(clErr, bucket, object) + } + return "", toObjectErr(err, bucket, object) + } + + // check if an object is present as one of the parent dir. + if err = xl.parentDirIsObject(bucket, path.Dir(object)); err != nil { + return "", toObjectErr(err, bucket, object) + } + + // Delete if an object already exists. + // FIXME: rename it to tmp file and delete only after + // the newly uploaded file is renamed from tmp location to + // the original location. + err = xl.deleteObject(bucket, object) + if err != nil && err != errFileNotFound { + return "", toObjectErr(err, bucket, object) + } + err = xl.storage.RenameFile(minioMetaBucket, tempObj, bucket, object) + if err != nil { + if derr := xl.storage.DeleteFile(minioMetaBucket, tempObj); derr != nil { + return "", toObjectErr(derr, bucket, object) + } + return "", toObjectErr(err, bucket, object) + } + + // Return md5sum, successfully wrote object. + return newMD5Hex, nil } // isMultipartObject - verifies if an object is special multipart file. @@ -286,30 +388,21 @@ func isMultipartObject(storage StorageAPI, bucket, object string) (bool, error) return true, nil } -func (xl xlObjects) DeleteObject(bucket, object string) error { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return BucketNameInvalid{Bucket: bucket} - } - if !isBucketExist(xl.storage, bucket) { - return BucketNotFound{Bucket: bucket} - } - if !IsValidObjectName(object) { - return ObjectNameInvalid{Bucket: bucket, Object: object} - } +// Deletes and object. +func (xl xlObjects) deleteObject(bucket, object string) error { // Verify if the object is a multipart object. if ok, err := isMultipartObject(xl.storage, bucket, object); err != nil { - return toObjectErr(err, bucket, object) + return err } else if !ok { if err = xl.storage.DeleteFile(bucket, object); err != nil { - return toObjectErr(err, bucket, object) + return err } return nil } // Get parts info. info, err := getMultipartObjectInfo(xl.storage, bucket, object) if err != nil { - return toObjectErr(err, bucket, object) + return err } // Range through all files and delete it. var wg = &sync.WaitGroup{} @@ -325,16 +418,35 @@ func (xl xlObjects) DeleteObject(bucket, object string) error { } // Wait for all the deletes to finish. wg.Wait() - // Loop through and validate if any errors, return back the first - // error occurred. - for index, err := range errs { + // Loop through and validate if any errors, if we are unable to remove any part return + // "unexpected" error as returning any other error might be misleading. For ex. + // if DeleteFile() had returned errFileNotFound and we return it, then client would see + // ObjectNotFound which is misleading. + for _, err := range errs { if err != nil { - partFileName := partNumToPartFileName(info.Parts[index].PartNumber) - return toObjectErr(err, bucket, pathJoin(object, partFileName)) + return errUnexpected } } err = xl.storage.DeleteFile(bucket, pathJoin(object, multipartMetaFile)) if err != nil { + return err + } + return nil +} + +// DeleteObject - delete the object. +func (xl xlObjects) DeleteObject(bucket, object string) error { + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return BucketNameInvalid{Bucket: bucket} + } + if !isBucketExist(xl.storage, bucket) { + return BucketNotFound{Bucket: bucket} + } + if !IsValidObjectName(object) { + return ObjectNameInvalid{Bucket: bucket, Object: object} + } + if err := xl.deleteObject(bucket, object); err != nil { return toObjectErr(err, bucket, object) } return nil