diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index ea55c968f..a27f57d16 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -22,6 +22,7 @@ import ( "fmt" "runtime" "strconv" + "time" "github.com/minio/madmin-go/v2" "github.com/minio/minio/internal/logger" @@ -60,13 +61,43 @@ func activeListeners() int { return int(globalHTTPListen.Subscribers()) + int(globalTrace.Subscribers()) } -func waitForLowHTTPReq() { - var currentIO func() int - if httpServer := newHTTPServerFn(); httpServer != nil { - currentIO = httpServer.GetRequestCount +func waitForLowIO(maxIO int, maxWait time.Duration, currentIO func() int) { + // No need to wait run at full speed. + if maxIO <= 0 { + return } - globalHealConfig.Wait(currentIO, activeListeners) + const waitTick = 100 * time.Millisecond + + tmpMaxWait := maxWait + + for currentIO() >= maxIO { + if tmpMaxWait > 0 { + if tmpMaxWait < waitTick { + time.Sleep(tmpMaxWait) + } else { + time.Sleep(waitTick) + } + tmpMaxWait -= waitTick + } + if tmpMaxWait <= 0 { + return + } + } +} + +func currentHTTPIO() int { + httpServer := newHTTPServerFn() + if httpServer == nil { + return 0 + } + + return httpServer.GetRequestCount() - activeListeners() +} + +func waitForLowHTTPReq() { + maxIO, maxWait, _ := globalHealConfig.Clone() + waitForLowIO(maxIO, maxWait, currentHTTPIO) } func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) { diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index ba83dd811..8da234720 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "runtime" "sync" "time" @@ -40,6 +41,8 @@ import ( // BucketMetadataSys captures all bucket metadata for a given cluster. type BucketMetadataSys struct { + objAPI ObjectLayer + sync.RWMutex metadataMap map[string]BucketMetadata } @@ -386,39 +389,41 @@ func (sys *BucketMetadataSys) Init(ctx context.Context, buckets []BucketInfo, ob return errServerNotInitialized } + sys.objAPI = objAPI + // Load bucket metadata sys in background - go sys.load(ctx, buckets, objAPI) + go sys.init(ctx, buckets) + return nil +} + +func (sys *BucketMetadataSys) loadBucketMetadata(ctx context.Context, bucket BucketInfo) error { + meta, err := loadBucketMetadata(ctx, sys.objAPI, bucket.Name) + if err != nil { + return err + } + + sys.Lock() + sys.metadataMap[bucket.Name] = meta + sys.Unlock() + + globalEventNotifier.set(bucket, meta) // set notification targets + globalBucketTargetSys.set(bucket, meta) // set remote replication targets + return nil } // concurrently load bucket metadata to speed up loading bucket metadata. -func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) { +func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []BucketInfo) { g := errgroup.WithNErrs(len(buckets)) for index := range buckets { index := index g.Go(func() error { - _, _ = objAPI.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{ + _, _ = sys.objAPI.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{ // Ensure heal opts for bucket metadata be deep healed all the time. ScanMode: madmin.HealDeepScan, Recreate: true, }) - meta, err := loadBucketMetadata(ctx, objAPI, buckets[index].Name) - if err != nil { - if !globalIsErasure && !globalIsDistErasure && errors.Is(err, errVolumeNotFound) { - meta = newBucketMetadata(buckets[index].Name) - } else { - return err - } - } - sys.Lock() - sys.metadataMap[buckets[index].Name] = meta - sys.Unlock() - - globalEventNotifier.set(buckets[index], meta) // set notification targets - - globalBucketTargetSys.set(buckets[index], meta) // set remote replication targets - - return nil + return sys.loadBucketMetadata(ctx, buckets[index]) }, index) } for _, err := range g.Wait() { @@ -428,17 +433,51 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck } } +func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context) { + const bucketMetadataRefresh = 15 * time.Minute + + t := time.NewTimer(bucketMetadataRefresh) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + buckets, err := sys.objAPI.ListBuckets(ctx, BucketOptions{}) + if err != nil { + logger.LogIf(ctx, err) + continue + } + for i := range buckets { + err := sys.loadBucketMetadata(ctx, buckets[i]) + if err != nil { + logger.LogIf(ctx, err) + continue + } + // Check if there is a spare core, wait 100ms instead + waitForLowIO(runtime.NumCPU(), 100*time.Millisecond, currentHTTPIO) + } + + t.Reset(bucketMetadataRefresh) + } + } +} + // Loads bucket metadata for all buckets into BucketMetadataSys. -func (sys *BucketMetadataSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) { +func (sys *BucketMetadataSys) init(ctx context.Context, buckets []BucketInfo) { count := 100 // load 100 bucket metadata at a time. for { if len(buckets) < count { - sys.concurrentLoad(ctx, buckets, objAPI) - return + sys.concurrentLoad(ctx, buckets) + break } - sys.concurrentLoad(ctx, buckets[:count], objAPI) + sys.concurrentLoad(ctx, buckets[:count]) buckets = buckets[count:] } + + if globalIsDistErasure { + go sys.refreshBucketsMetadataLoop(ctx) + } } // Reset the state of the BucketMetadataSys. diff --git a/internal/config/heal/heal.go b/internal/config/heal/heal.go index 6c9f2ce2f..b13d72400 100644 --- a/internal/config/heal/heal.go +++ b/internal/config/heal/heal.go @@ -70,39 +70,11 @@ func (opts Config) BitrotScanCycle() (d time.Duration) { return opts.cache.bitrotCycle } -// Wait waits for IOCount to go down or max sleep to elapse before returning. -// usually used in healing paths to wait for specified amount of time to -// throttle healing. -func (opts Config) Wait(currentIO func() int, activeListeners func() int) { +// Clone safely the heal configuration +func (opts Config) Clone() (int, time.Duration, string) { configMutex.RLock() - maxIO, maxWait := opts.IOCount, opts.Sleep - configMutex.RUnlock() - - // No need to wait run at full speed. - if maxIO <= 0 { - return - } - - // At max 10 attempts to wait with 100 millisecond interval before proceeding - waitTick := 100 * time.Millisecond - - tmpMaxWait := maxWait - - if currentIO != nil { - for currentIO() >= maxIO+activeListeners() { - if tmpMaxWait > 0 { - if tmpMaxWait < waitTick { - time.Sleep(tmpMaxWait) - } else { - time.Sleep(waitTick) - } - tmpMaxWait -= waitTick - } - if tmpMaxWait <= 0 { - return - } - } - } + defer configMutex.RUnlock() + return opts.IOCount, opts.Sleep, opts.Bitrot } // Update updates opts with nopts