Fix deadlock in in-place CopyObject decryption/encryption (#5637)

In-place decryption/encryption already holds write
locks on them, attempting to acquire a read lock would
fail.
This commit is contained in:
Harshavardhana 2018-03-12 13:52:38 -07:00 committed by kannappanr
parent 574b667c56
commit 29ef7d29e4
3 changed files with 45 additions and 26 deletions

View file

@ -12,7 +12,7 @@ clone_folder: c:\gopath\src\github.com\minio\minio
# Environment variables
environment:
GOPATH: c:\gopath
GOROOT: c:\go
GOROOT: c:\go19
# scripts that run after cloning repository
install:

View file

@ -388,6 +388,9 @@ func (fs *FSObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject strin
}
if cpSrcDstSame && srcInfo.metadataOnly {
// Close any writer which was initialized.
defer srcInfo.Writer.Close()
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, srcBucket, srcObject, fsMetaJSONFile)
wlk, err := fs.rwPool.Write(fsMetaPath)
if err != nil {
@ -494,7 +497,7 @@ func (fs *FSObjects) getObject(bucket, object string, offset int64, length int64
}
if etag != "" {
objEtag, perr := fs.getObjectETag(bucket, object)
objEtag, perr := fs.getObjectETag(bucket, object, lock)
if perr != nil {
return toObjectErr(errors.Trace(perr), bucket, object)
}
@ -828,43 +831,55 @@ func (fs *FSObjects) listDirFactory(isLeaf isLeafFunc) listDirFunc {
// getObjectETag is a helper function, which returns only the md5sum
// of the file on the disk.
func (fs *FSObjects) getObjectETag(bucket, entry string) (string, error) {
func (fs *FSObjects) getObjectETag(bucket, entry string, lock bool) (string, error) {
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, entry, fsMetaJSONFile)
// Read `fs.json` to perhaps contend with
// parallel Put() operations.
rlk, err := fs.rwPool.Open(fsMetaPath)
// Ignore if `fs.json` is not available, this is true for pre-existing data.
if err != nil && err != errFileNotFound {
return "", toObjectErr(errors.Trace(err), bucket, entry)
}
var reader io.Reader
var fi os.FileInfo
var size int64
if lock {
// Read `fs.json` to perhaps contend with
// parallel Put() operations.
rlk, err := fs.rwPool.Open(fsMetaPath)
// Ignore if `fs.json` is not available, this is true for pre-existing data.
if err != nil && err != errFileNotFound {
return "", toObjectErr(errors.Trace(err), bucket, entry)
}
// If file is not found, we don't need to proceed forward.
if err == errFileNotFound {
return "", nil
}
// If file is not found, we don't need to proceed forward.
if err == errFileNotFound {
return "", nil
}
// Read from fs metadata only if it exists.
defer fs.rwPool.Close(fsMetaPath)
// Read from fs metadata only if it exists.
defer fs.rwPool.Close(fsMetaPath)
// Fetch the size of the underlying file.
fi, err := rlk.LockedFile.Stat()
if err != nil {
return "", toObjectErr(errors.Trace(err), bucket, entry)
// Fetch the size of the underlying file.
fi, err = rlk.LockedFile.Stat()
if err != nil {
return "", toObjectErr(errors.Trace(err), bucket, entry)
}
size = fi.Size()
reader = io.NewSectionReader(rlk.LockedFile, 0, fi.Size())
} else {
var err error
reader, size, err = fsOpenFile(fsMetaPath, 0)
if err != nil {
return "", toObjectErr(errors.Trace(err), bucket, entry)
}
}
// `fs.json` can be empty due to previously failed
// PutObject() transaction, if we arrive at such
// a situation we just ignore and continue.
if fi.Size() == 0 {
if size == 0 {
return "", nil
}
// Wrap the locked file in a ReadAt() backend section reader to
// make sure the underlying offsets don't move.
fsMetaBuf, err := ioutil.ReadAll(io.NewSectionReader(rlk.LockedFile, 0, fi.Size()))
fsMetaBuf, err := ioutil.ReadAll(reader)
if err != nil {
return "", errors.Trace(err)
return "", toObjectErr(errors.Trace(err), bucket, entry)
}
// Check if FS metadata is valid, if not return error.

View file

@ -402,7 +402,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
if sseC {
newKey, err = ParseSSECustomerRequest(r)
if err != nil {
pipeReader.CloseWithError(err)
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
@ -426,6 +426,9 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// Since we are rotating the keys, make sure to update the metadata.
srcInfo.metadataOnly = true
} else {
if sseCopyC {
// Source is encrypted make sure to save the encrypted size.
@ -500,6 +503,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// object is same then only metadata is updated.
objInfo, err := objectAPI.CopyObject(srcBucket, srcObject, dstBucket, dstObject, srcInfo)
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}