replication: Lock object while replicating (#13014)

Introduce a replication lock that will ensure that only one replication 
operation will run for any given object at any time.

Fixes #13013
This commit is contained in:
Klaus Post 2021-08-23 17:16:18 +02:00 committed by GitHub
parent a75412440f
commit 63f3e5c3fc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -28,7 +28,7 @@ import (
"time"
"github.com/minio/madmin-go"
minio "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7"
miniogo "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/minio/minio-go/v7/pkg/tags"
@ -316,6 +316,28 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
return
}
// Lock the object name before starting replication operation.
// Use separate lock that doesn't collide with regular objects.
lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+dobj.ObjectName)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", dobj.ObjectName, bucket, rcfg.RoleArn))
sendEvent(eventArgs{
BucketName: bucket,
Object: ObjectInfo{
Bucket: bucket,
Name: dobj.ObjectName,
VersionID: versionID,
DeleteMarker: dobj.DeleteMarker,
},
Host: "Internal: [Replication]",
EventName: event.ObjectReplicationNotTracked,
})
return
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
rmErr := tgt.RemoveObject(ctx, rcfg.GetDestination().Bucket, dobj.ObjectName, miniogo.RemoveObjectOptions{
VersionID: versionID,
Internal: miniogo.AdvancedRemoveOptions{
@ -669,6 +691,24 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
})
return
}
// Lock the object name before starting replication.
// Use separate lock that doesn't collide with regular objects.
lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+object)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", object, bucket, cfg.RoleArn))
return
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
var closeOnDefer bool
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
VersionID: objInfo.VersionID,