Cleanup remote targets automatically on replication config removal. (#16221)

This commit is contained in:
Poorna 2022-12-14 03:24:06 -08:00 committed by GitHub
parent c73ea27ed7
commit d37e514733
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 62 additions and 29 deletions

View file

@ -201,7 +201,18 @@ func (a adminAPIHandlers) SetRemoteTargetHandler(w http.ResponseWriter, r *http.
if update { if update {
ops = madmin.GetTargetUpdateOps(r.Form) ops = madmin.GetTargetUpdateOps(r.Form)
} else { } else {
target.Arn = globalBucketTargetSys.getRemoteARN(bucket, &target) var exists bool // true if arn exists
target.Arn, exists = globalBucketTargetSys.getRemoteARN(bucket, &target)
if exists && target.Arn != "" { // return pre-existing ARN
data, err := json.Marshal(target.Arn)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
// Write success response.
writeSuccessResponseJSON(w, data)
return
}
} }
if target.Arn == "" { if target.Arn == "" {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL) writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL)

View file

@ -169,6 +169,21 @@ func (api objectAPIHandlers) DeleteBucketReplicationConfigHandler(w http.Respons
return return
} }
targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
for _, tgt := range targets.Targets {
if err := globalBucketTargetSys.RemoveTarget(ctx, bucket, tgt.Arn); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
}
if _, err := globalBucketMetadataSys.Delete(ctx, bucket, bucketTargetsFile); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Write success response. // Write success response.
writeSuccessResponseHeadersOnly(w) writeSuccessResponseHeadersOnly(w)
} }

View file

@ -217,10 +217,13 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
// validate if target credentials are ok // validate if target credentials are ok
exists, err := clnt.BucketExists(ctx, tgt.TargetBucket) exists, err := clnt.BucketExists(ctx, tgt.TargetBucket)
if err != nil { if err != nil {
if minio.ToErrorResponse(err).Code == "NoSuchBucket" { switch minio.ToErrorResponse(err).Code {
case "NoSuchBucket":
return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket} return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket}
case "AccessDenied":
return RemoteTargetConnectionErr{Bucket: tgt.TargetBucket, AccessKey: tgt.Credentials.AccessKey, Err: err}
} }
return RemoteTargetConnectionErr{Bucket: tgt.TargetBucket, Err: err} return RemoteTargetConnectionErr{Bucket: tgt.TargetBucket, AccessKey: tgt.Credentials.AccessKey, Err: err}
} }
if !exists { if !exists {
return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket} return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket}
@ -231,7 +234,7 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
} }
vcfg, err := clnt.GetBucketVersioning(ctx, tgt.TargetBucket) vcfg, err := clnt.GetBucketVersioning(ctx, tgt.TargetBucket)
if err != nil { if err != nil {
return RemoteTargetConnectionErr{Bucket: tgt.TargetBucket, Err: err} return RemoteTargetConnectionErr{Bucket: tgt.TargetBucket, Err: err, AccessKey: tgt.Credentials.AccessKey}
} }
if !vcfg.Enabled() { if !vcfg.Enabled() {
return BucketRemoteTargetNotVersioned{Bucket: tgt.TargetBucket} return BucketRemoteTargetNotVersioned{Bucket: tgt.TargetBucket}
@ -470,20 +473,20 @@ func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*T
} }
// getRemoteARN gets existing ARN for an endpoint or generates a new one. // getRemoteARN gets existing ARN for an endpoint or generates a new one.
func (sys *BucketTargetSys) getRemoteARN(bucket string, target *madmin.BucketTarget) string { func (sys *BucketTargetSys) getRemoteARN(bucket string, target *madmin.BucketTarget) (arn string, exists bool) {
if target == nil { if target == nil {
return "" return
} }
tgts := sys.targetsMap[bucket] tgts := sys.targetsMap[bucket]
for _, tgt := range tgts { for _, tgt := range tgts {
if tgt.Type == target.Type && tgt.TargetBucket == target.TargetBucket && target.URL().String() == tgt.URL().String() { if tgt.Type == target.Type && tgt.TargetBucket == target.TargetBucket && target.URL().String() == tgt.URL().String() && tgt.Credentials.AccessKey == target.Credentials.AccessKey {
return tgt.Arn return tgt.Arn, true
} }
} }
if !target.Type.IsValid() { if !target.Type.IsValid() {
return "" return
} }
return generateARN(target) return generateARN(target), false
} }
// getRemoteARNForPeer returns the remote target for a peer site in site replication // getRemoteARNForPeer returns the remote target for a peer site in site replication

View file

@ -424,14 +424,15 @@ func (e BucketRemoteTargetNotFound) Error() string {
// RemoteTargetConnectionErr remote target connection failure. // RemoteTargetConnectionErr remote target connection failure.
type RemoteTargetConnectionErr struct { type RemoteTargetConnectionErr struct {
Err error Err error
Bucket string Bucket string
Endpoint string Endpoint string
AccessKey string
} }
func (e RemoteTargetConnectionErr) Error() string { func (e RemoteTargetConnectionErr) Error() string {
if e.Bucket != "" { if e.Bucket != "" {
return fmt.Sprintf("Remote service endpoint offline or target bucket/remote service credentials invalid: %s \n\t%s", e.Bucket, e.Err.Error()) return fmt.Sprintf("Remote service endpoint offline, target bucket: %s or remote service credentials: %s invalid \n\t%s", e.Bucket, e.AccessKey, e.Err.Error())
} }
return fmt.Sprintf("Remote service endpoint %s not available\n\t%s", e.Endpoint, e.Err.Error()) return fmt.Sprintf("Remote service endpoint %s not available\n\t%s", e.Endpoint, e.Err.Error())
} }

View file

@ -852,21 +852,24 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
Region: "", Region: "",
ReplicationSync: false, ReplicationSync: false,
} }
bucketTarget.Arn = globalBucketTargetSys.getRemoteARN(bucket, &bucketTarget) var exists bool // true if ARN already exists
err := globalBucketTargetSys.SetTarget(ctx, bucket, &bucketTarget, false) bucketTarget.Arn, exists = globalBucketTargetSys.getRemoteARN(bucket, &bucketTarget)
if err != nil { if !exists { // persist newly generated ARN to targets and metadata on disk
return c.annotatePeerErr(peer.Name, "Bucket target creation error", err) err := globalBucketTargetSys.SetTarget(ctx, bucket, &bucketTarget, false)
} if err != nil {
targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) return c.annotatePeerErr(peer.Name, "Bucket target creation error", err)
if err != nil { }
return err targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
} if err != nil {
tgtBytes, err := json.Marshal(&targets) return err
if err != nil { }
return err tgtBytes, err := json.Marshal(&targets)
} if err != nil {
if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil { return err
return err }
if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil {
return err
}
} }
targetARN = bucketTarget.Arn targetARN = bucketTarget.Arn
} }