From 8a07000e58a0bd842f17ce520b365dae3011d36c Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Tue, 15 Nov 2022 16:59:21 +0100 Subject: [PATCH] fix: refactor getReplicationDiff for safe use (#16051) --- cmd/bucket-replication.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 80021ef1b..7cc8869e1 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -2604,31 +2604,38 @@ func saveResyncStatus(ctx context.Context, bucket string, brs BucketReplicationR return saveConfig(ctx, objectAPI, configFile, buf) } -// getReplicationDiff returns unreplicated objects in a channel -func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, opts madmin.ReplDiffOpts) (diffCh chan madmin.DiffInfo, err error) { - objInfoCh := make(chan ObjectInfo) - if err := objAPI.Walk(ctx, bucket, opts.Prefix, objInfoCh, ObjectOptions{}); err != nil { - logger.LogIf(ctx, err) - return diffCh, err - } +// getReplicationDiff returns un-replicated objects in a channel. +// If a non-nil channel is returned it must be consumed fully or +// the provided context must be canceled. +func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, opts madmin.ReplDiffOpts) (chan madmin.DiffInfo, error) { cfg, err := getReplicationConfig(ctx, bucket) if err != nil { logger.LogIf(ctx, err) - return diffCh, err + return nil, err } tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) if err != nil { logger.LogIf(ctx, err) - return diffCh, err + return nil, err + } + + objInfoCh := make(chan ObjectInfo, 10) + if err := objAPI.Walk(ctx, bucket, opts.Prefix, objInfoCh, ObjectOptions{}); err != nil { + logger.LogIf(ctx, err) + return nil, err } rcfg := replicationConfig{ Config: cfg, remotes: tgts, } - diffCh = make(chan madmin.DiffInfo, 4000) + diffCh := make(chan madmin.DiffInfo, 4000) go func() { defer close(diffCh) for obj := range objInfoCh { + if contextCanceled(ctx) { + // Just consume input... + continue + } // Ignore object prefixes which are excluded // from versioning via the MinIO bucket versioning extension. if globalBucketVersioningSys.PrefixSuspended(bucket, obj.Name) { @@ -2682,7 +2689,7 @@ func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, Targets: tgtsMap, }: case <-ctx.Done(): - return + continue } } }