Avoid extra GetObjectInfo call in DeleteObject API (#17599)

Optimize DeleteObject API to avoid extra 
GetObjectInfo call on the replicating side.

For receiving side, it is just a regular
DeleteObject call.

Bonus: Fix a corner case where version purged is 
absent on target (either due to replication not yet
complete or target version already deleted in a
one-way replication or when replication was disabled). 

In such cases, mark version purge complete.
This commit is contained in:
Poorna 2023-07-10 10:57:56 -04:00 committed by GitHub
parent dfd7cca0d2
commit e8c98c3246
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 124 additions and 81 deletions

View file

@ -580,8 +580,8 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
}
}
if object.VersionID != "" && hasLockEnabled {
if apiErrCode := enforceRetentionBypassForDelete(ctx, r, bucket, object, goi, gerr); apiErrCode != ErrNone {
apiErr := errorCodes.ToAPIErr(apiErrCode)
if err := enforceRetentionBypassForDelete(ctx, r, bucket, object, goi, gerr); err != nil {
apiErr := toAPIError(ctx, err)
deleteResults[index].errInfo = DeleteError{
Code: apiErr.Code,
Message: apiErr.Description,

View file

@ -82,24 +82,24 @@ func enforceRetentionForDeletion(ctx context.Context, objInfo ObjectInfo) (locke
// For objects in "Governance" mode, overwrite is allowed if a) object retention date is past OR
// governance bypass headers are set and user has governance bypass permissions.
// Objects in "Compliance" mode can be overwritten only if retention date is past.
func enforceRetentionBypassForDelete(ctx context.Context, r *http.Request, bucket string, object ObjectToDelete, oi ObjectInfo, gerr error) APIErrorCode {
func enforceRetentionBypassForDelete(ctx context.Context, r *http.Request, bucket string, object ObjectToDelete, oi ObjectInfo, gerr error) error {
if gerr != nil { // error from GetObjectInfo
if _, ok := gerr.(MethodNotAllowed); ok {
// This happens usually for a delete marker
if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() {
// Delete marker should be present and valid.
return ErrNone
return nil
}
}
if isErrObjectNotFound(gerr) || isErrVersionNotFound(gerr) {
return ErrNone
return nil
}
return toAPIErrorCode(ctx, gerr)
return gerr
}
lhold := objectlock.GetObjectLegalHoldMeta(oi.UserDefined)
if lhold.Status.Valid() && lhold.Status == objectlock.LegalHoldOn {
return ErrObjectLocked
return ObjectLocked{}
}
ret := objectlock.GetObjectRetentionMeta(oi.UserDefined)
@ -115,13 +115,13 @@ func enforceRetentionBypassForDelete(ctx context.Context, r *http.Request, bucke
t, err := objectlock.UTCNowNTP()
if err != nil {
logger.LogIf(ctx, err)
return ErrObjectLocked
return ObjectLocked{}
}
if !ret.RetainUntilDate.Before(t) {
return ErrObjectLocked
return ObjectLocked{}
}
return ErrNone
return nil
case objectlock.RetGovernance:
// In governance mode, users can't overwrite or delete an object
// version or alter its lock settings unless they have special
@ -141,22 +141,22 @@ func enforceRetentionBypassForDelete(ctx context.Context, r *http.Request, bucke
t, err := objectlock.UTCNowNTP()
if err != nil {
logger.LogIf(ctx, err)
return ErrObjectLocked
return ObjectLocked{}
}
if !ret.RetainUntilDate.Before(t) {
return ErrObjectLocked
return ObjectLocked{}
}
return ErrNone
return nil
}
// https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock-overview.html#object-lock-retention-modes
// If you try to delete objects protected by governance mode and have s3:BypassGovernanceRetention, the operation will succeed.
if checkRequestAuthType(ctx, r, policy.BypassGovernanceRetentionAction, bucket, object.ObjectName) != ErrNone {
return ErrAccessDenied
return errAuthentication
}
}
}
return ErrNone
return nil
}
// enforceRetentionBypassForPut enforces whether an existing object under governance can be overwritten

View file

@ -536,12 +536,18 @@ func getHealReplicateObjectInfo(objInfo ObjectInfo, rcfg replicationConfig) Repl
}
}
func (ri *ReplicateObjectInfo) getReplicationState() ReplicationState {
rs := ri.ObjectInfo.getReplicationState()
rs.ReplicateDecisionStr = ri.Dsc.String()
return rs
}
// vID here represents the versionID client specified in request - need to distinguish between delete marker and delete marker deletion
func (o *ObjectInfo) getReplicationState(dsc string, vID string, heal bool) ReplicationState {
func (o *ObjectInfo) getReplicationState() ReplicationState {
rs := ReplicationState{
ReplicationStatusInternal: o.ReplicationStatusInternal,
VersionPurgeStatusInternal: o.VersionPurgeStatusInternal,
ReplicateDecisionStr: dsc,
ReplicateDecisionStr: o.replicationDecision,
Targets: make(map[string]replication.StatusType),
PurgeTargets: make(map[string]VersionPurgeStatusType),
ResetStatusesMap: make(map[string]string),

View file

@ -634,13 +634,20 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI
IsReplicationReadyForDeleteMarker: true,
},
})
if isErrMethodNotAllowed(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)) {
if dobj.VersionID == "" {
switch {
case isErrMethodNotAllowed(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)):
// delete marker already replicated
if dobj.VersionID == "" && rinfo.VersionPurgeStatus.Empty() {
rinfo.ReplicationStatus = replication.Completed
return
}
}
if !isErrObjectNotFound(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)) {
case isErrObjectNotFound(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)):
// version being purged is already not found on target.
if !rinfo.VersionPurgeStatus.Empty() {
rinfo.VersionPurgeStatus = Complete
return
}
default:
// mark delete marker replication as failed if target cluster not ready to receive
// this request yet (object version not replicated yet)
if err != nil && !toi.ReplicationReady {
@ -649,7 +656,6 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI
}
}
}
rmErr := tgt.RemoveObject(ctx, tgt.Bucket, dobj.ObjectName, minio.RemoveObjectOptions{
VersionID: versionID,
Internal: minio.AdvancedRemoveOptions{
@ -1048,7 +1054,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
popts := ObjectOptions{
MTime: objInfo.ModTime,
VersionID: objInfo.VersionID,
EvalMetadataFn: func(oi *ObjectInfo) error {
EvalMetadataFn: func(oi *ObjectInfo, gerr error) (dsc ReplicateDecision, err error) {
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = newReplStatusInternal
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
oi.UserDefined[xhttp.AmzBucketReplicationStatus] = string(rinfos.ReplicationStatus())
@ -1060,7 +1066,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
if objInfo.UserTags != "" {
oi.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
}
return nil
return dsc, nil
},
}
@ -2478,7 +2484,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
ObjectName: roi.Name,
DeleteMarkerVersionID: dmVersionID,
VersionID: versionID,
ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true),
ReplicationState: roi.getReplicationState(),
DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
DeleteMarker: roi.DeleteMarker,
},
@ -2895,7 +2901,7 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf
ObjectName: roi.Name,
DeleteMarkerVersionID: dmVersionID,
VersionID: versionID,
ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true),
ReplicationState: roi.getReplicationState(),
DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
DeleteMarker: roi.DeleteMarker,
},

View file

@ -1657,6 +1657,22 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
return objInfo, gerr
}
}
if opts.EvalMetadataFn != nil {
dsc, err := opts.EvalMetadataFn(&goi, err)
if err != nil {
return ObjectInfo{}, err
}
if dsc.ReplicateAny() {
opts.SetDeleteReplicationState(dsc, opts.VersionID)
goi.replicationDecision = opts.DeleteReplication.ReplicateDecisionStr
}
}
if opts.EvalRetentionBypassFn != nil {
if err := opts.EvalRetentionBypassFn(goi, gerr); err != nil {
return ObjectInfo{}, err
}
}
if opts.Expiration.Expire {
if gerr == nil {
@ -1765,7 +1781,9 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
if err = er.deleteObjectVersion(ctx, bucket, object, fi, opts.DeleteMarker); err != nil {
return objInfo, toObjectErr(err, bucket, object)
}
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
oi := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
oi.replicationDecision = goi.replicationDecision
return oi, nil
}
// Delete the object version on all disks.
@ -1855,7 +1873,7 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s
objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
if opts.EvalMetadataFn != nil {
if err := opts.EvalMetadataFn(&objInfo); err != nil {
if _, err := opts.EvalMetadataFn(&objInfo, err); err != nil {
return ObjectInfo{}, err
}
}

View file

@ -185,6 +185,7 @@ type ObjectInfo struct {
VersionPurgeStatusInternal string
VersionPurgeStatus VersionPurgeStatusType
replicationDecision string // internal representation of replication decision for use by DeleteObject handler
// The total count of all versions of this object
NumVersions int
// The modtime of the successor object version if any

View file

@ -35,8 +35,11 @@ import (
// CheckPreconditionFn returns true if precondition check failed.
type CheckPreconditionFn func(o ObjectInfo) bool
// EvalMetadataFn validates input objInfo and returns an updated metadata
type EvalMetadataFn func(o *ObjectInfo) error
// EvalMetadataFn validates input objInfo and GetObjectInfo error and returns an updated metadata and replication decision if any
type EvalMetadataFn func(o *ObjectInfo, gerr error) (ReplicateDecision, error)
// EvalRetentionBypassFn validates input objInfo and GetObjectInfo error and returns an error if retention bypass is not allowed.
type EvalRetentionBypassFn func(o ObjectInfo, gerr error) error
// GetObjectInfoFn is the signature of GetObjectInfo function.
type GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error)
@ -100,7 +103,8 @@ type ObjectOptions struct {
InclFreeVersions bool
MetadataChg bool // is true if it is a metadata update operation.
MetadataChg bool // is true if it is a metadata update operation.
EvalRetentionBypassFn EvalRetentionBypassFn // only set for enforcing retention bypass on DeleteObject.
}
// ExpirationOptions represents object options for object expiration at objectLayer.
@ -182,6 +186,16 @@ func (o *ObjectOptions) PutReplicationState() (r ReplicationState) {
return
}
// SetEvalMetadataFn sets the metadata evaluation function
func (o *ObjectOptions) SetEvalMetadataFn(f EvalMetadataFn) {
o.EvalMetadataFn = f
}
// SetEvalRetentionBypassFn sets the retention bypass function
func (o *ObjectOptions) SetEvalRetentionBypassFn(f EvalRetentionBypassFn) {
o.EvalRetentionBypassFn = f
}
// ObjectLayer implements primitives for object API layer.
type ObjectLayer interface {
// Locking operations on object.

View file

@ -2326,33 +2326,29 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
return
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
os := newObjSweeper(bucket, object).WithVersion(opts.VersionID).WithVersioning(opts.Versioned, opts.VersionSuspended)
// Mutations of objects on versioning suspended buckets
// affect its null version. Through opts below we select
// the null version's remote object to delete if
// transitioned.
goiOpts := os.GetOpts()
goi, gerr := getObjectInfo(ctx, bucket, object, goiOpts)
if gerr == nil {
os.SetTransitionState(goi.TransitionedObject)
}
dsc := checkReplicateDelete(ctx, bucket, ObjectToDelete{
ObjectV: ObjectV{
ObjectName: object,
VersionID: opts.VersionID,
},
}, goi, opts, gerr)
opts.SetEvalMetadataFn(func(oi *ObjectInfo, gerr error) (dsc ReplicateDecision, err error) {
if replica { // no need to check replication on receiver
return dsc, nil
}
dsc = checkReplicateDelete(ctx, bucket, ObjectToDelete{
ObjectV: ObjectV{
ObjectName: object,
VersionID: opts.VersionID,
},
}, *oi, opts, gerr)
// Mutations of objects on versioning suspended buckets
// affect its null version. Through opts below we select
// the null version's remote object to delete if
// transitioned.
if gerr == nil {
os.SetTransitionState(oi.TransitionedObject)
}
return dsc, nil
})
vID := opts.VersionID
if dsc.ReplicateAny() {
opts.SetDeleteReplicationState(dsc, opts.VersionID)
}
if replica {
opts.SetReplicaStatus(replication.Replica)
if opts.VersionPurgeStatus().Empty() {
@ -2360,25 +2356,21 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
vID = ""
}
}
apiErr := ErrNone
if vID != "" {
apiErr = enforceRetentionBypassForDelete(ctx, r, bucket, ObjectToDelete{
ObjectV: ObjectV{
ObjectName: object,
VersionID: vID,
},
}, goi, gerr)
if apiErr != ErrNone && apiErr != ErrNoSuchKey {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(apiErr), r.URL)
return
opts.SetEvalRetentionBypassFn(func(goi ObjectInfo, gerr error) (err error) {
err = nil
if vID != "" {
err := enforceRetentionBypassForDelete(ctx, r, bucket, ObjectToDelete{
ObjectV: ObjectV{
ObjectName: object,
VersionID: vID,
},
}, goi, gerr)
if err != nil && !isErrObjectNotFound(err) {
return err
}
}
}
if apiErr == ErrNoSuchKey {
writeSuccessNoContent(w)
return
}
})
deleteObject := objectAPI.DeleteObject
if api.CacheAPI() != nil {
@ -2393,6 +2385,12 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if isErrObjectNotFound(err) {
writeSuccessNoContent(w)
return
}
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if objInfo.Name == "" {
@ -2419,7 +2417,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
Host: handlers.GetSourceIP(r),
})
if dsc.ReplicateAny() {
if objInfo.ReplicationStatus == replication.Pending || objInfo.VersionPurgeStatus == Pending {
dmVersionID := ""
versionID := ""
if objInfo.DeleteMarker {
@ -2434,7 +2432,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
DeleteMarkerVersionID: dmVersionID,
DeleteMarkerMTime: DeleteMarkerMTime{objInfo.ModTime},
DeleteMarker: objInfo.DeleteMarker,
ReplicationState: objInfo.getReplicationState(dsc.String(), opts.VersionID, false),
ReplicationState: objInfo.getReplicationState(),
},
Bucket: bucket,
EventType: ReplicateIncomingDelete,
@ -2505,7 +2503,7 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r
popts := ObjectOptions{
MTime: opts.MTime,
VersionID: opts.VersionID,
EvalMetadataFn: func(oi *ObjectInfo) error {
EvalMetadataFn: func(oi *ObjectInfo, gerr error) (ReplicateDecision, error) {
oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status))
oi.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp] = UTCNow().Format(time.RFC3339Nano)
@ -2514,7 +2512,7 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
}
return nil
return dsc, nil
},
}
@ -2666,9 +2664,9 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r
popts := ObjectOptions{
MTime: opts.MTime,
VersionID: opts.VersionID,
EvalMetadataFn: func(oi *ObjectInfo) error {
EvalMetadataFn: func(oi *ObjectInfo, gerr error) (dsc ReplicateDecision, err error) {
if err := enforceRetentionBypassForPut(ctx, r, *oi, objRetention, cred, owner); err != nil {
return err
return dsc, err
}
if objRetention.Mode.Valid() {
oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = string(objRetention.Mode)
@ -2678,12 +2676,12 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r
oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = ""
}
oi.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = UTCNow().Format(time.RFC3339Nano)
dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(*oi, replication.MetadataReplicationType, opts))
dsc = mustReplicate(ctx, bucket, object, getMustReplicateOptions(*oi, replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
}
return nil
return dsc, nil
},
}

View file

@ -519,10 +519,10 @@ func updateObjectMetadataWithZipInfo(ctx context.Context, objectAPI ObjectLayer,
popts := ObjectOptions{
MTime: srcInfo.ModTime,
VersionID: srcInfo.VersionID,
EvalMetadataFn: func(oi *ObjectInfo) error {
EvalMetadataFn: func(oi *ObjectInfo, gerr error) (dsc ReplicateDecision, err error) {
oi.UserDefined[archiveTypeMetadataKey] = at
oi.UserDefined[archiveInfoMetadataKey] = zipInfoStr
return nil
return dsc, nil
},
}