From 33cee9f38a3e662ad68973ab48f595f0435d423f Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Wed, 16 Jun 2021 13:21:36 -0700 Subject: [PATCH] Improve multipart upload (#12514) Each multipart upload is holding a read lock for the entire upload duration of each part. This makes it impossible for other parts to complete until all currently uploading parts have released their locks. It will also make it impossible for new parts to start as long as the write lock is still being requested, essentially deadlocking uploads until all that may have been granted a read lock has been completed. Refactor to only hold the upload id lock while reading and writing the metadata, but hold a part id lock while the part is being uploaded. --- cmd/erasure-multipart.go | 45 ++++++++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 75d79a028..9db82f8ff 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -397,16 +397,27 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec // // Implements S3 compatible Upload Part API. func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) { - uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - rlkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) + // Write lock for this part ID. + // Held throughout the operation. + partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID))) + plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout) + if err != nil { + return PartInfo{}, err + } + pctx := plkctx.Context() + defer partIDLock.Unlock(plkctx.Cancel) + + // Read lock for upload id. + // Only held while reading the upload metadata. + uploadIDRLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) + rlkctx, err := uploadIDRLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return PartInfo{}, err } rctx := rlkctx.Context() - readLocked := true defer func() { - if readLocked { - uploadIDLock.RUnlock(rlkctx.Cancel) + if uploadIDRLock != nil { + uploadIDRLock.RUnlock(rlkctx.Cancel) } }() @@ -432,13 +443,17 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) + // Unlock upload id locks before, so others can get it. + uploadIDRLock.RUnlock(rlkctx.Cancel) + uploadIDRLock = nil + // get Quorum for this object - _, writeQuorum, err := objectQuorumFromMeta(rctx, partsMetadata, errs, er.defaultParityCount) + _, writeQuorum, err := objectQuorumFromMeta(pctx, partsMetadata, errs, er.defaultParityCount) if err != nil { return pi, toObjectErr(err, bucket, object) } - reducedErr := reduceWriteQuorumErrs(rctx, errs, objectOpIgnoredErrs, writeQuorum) + reducedErr := reduceWriteQuorumErrs(pctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return pi, toObjectErr(reducedErr, bucket, object) } @@ -447,7 +462,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, dataDir, writeQuorum) + fi, err := pickValidFileInfo(pctx, partsMetadata, modTime, dataDir, writeQuorum) if err != nil { return pi, err } @@ -469,7 +484,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } }() - erasure, err := NewErasure(rctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) + erasure, err := NewErasure(pctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) if err != nil { return pi, toObjectErr(err, bucket, object) } @@ -505,7 +520,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize()) } - n, err := erasure.Encode(rctx, data, writers, buffer, writeQuorum) + n, err := erasure.Encode(pctx, data, writers, buffer, writeQuorum) closeBitrotWriters(writers) if err != nil { return pi, toObjectErr(err, bucket, object) @@ -523,16 +538,14 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } } - // Unlock here before acquiring write locks all concurrent - // PutObjectParts would serialize here updating `xl.meta` - uploadIDLock.RUnlock(rlkctx.Cancel) - readLocked = false - wlkctx, err := uploadIDLock.GetLock(ctx, globalOperationTimeout) + // Acquire write lock to update metadata. + uploadIDWLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) + wlkctx, err := uploadIDWLock.GetLock(pctx, globalOperationTimeout) if err != nil { return PartInfo{}, err } wctx := wlkctx.Context() - defer uploadIDLock.Unlock(wlkctx.Cancel) + defer uploadIDWLock.Unlock(wlkctx.Cancel) // Validates if upload ID exists. if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil {