add more bootstrap messages to provide latency (#17650)

- simplify refreshing bucket metadata, wait() to
  depend on how fast the bucket metadata can load.

- simplify resync to start resync in single pass.
This commit is contained in:
Harshavardhana 2023-07-14 04:00:29 -07:00 committed by GitHub
parent bdddf597f6
commit 005a4a275a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 18 deletions

View file

@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
"runtime"
"sync"
"time"
@ -489,6 +488,8 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context) {
const bucketMetadataRefresh = 15 * time.Minute
sleeper := newDynamicSleeper(2, 150*time.Millisecond, false)
t := time.NewTimer(bucketMetadataRefresh)
defer t.Stop()
for {
@ -512,13 +513,16 @@ func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context) {
sys.RemoveStaleBuckets(diskBuckets)
for _, bucket := range buckets {
wait := sleeper.Timer(ctx)
err := sys.loadBucketMetadata(ctx, bucket)
if err != nil {
logger.LogIf(ctx, err)
wait() // wait to proceed to next entry.
continue
}
// Check if there is a spare procs, wait 100ms instead
waitForLowIO(runtime.GOMAXPROCS(0), 100*time.Millisecond, currentHTTPIO)
wait() // wait to proceed to next entry.
}
t.Reset(bucketMetadataRefresh)

View file

@ -2669,7 +2669,9 @@ func (p *ReplicationPool) loadResync(ctx context.Context, buckets []BucketInfo,
defer cancel()
for index := range buckets {
meta, err := loadBucketResyncMetadata(ctx, buckets[index].Name, objAPI)
bucket := buckets[index].Name
meta, err := loadBucketResyncMetadata(ctx, bucket, objAPI)
if err != nil {
if !errors.Is(err, errVolumeNotFound) {
logger.LogIf(ctx, err)
@ -2678,18 +2680,10 @@ func (p *ReplicationPool) loadResync(ctx context.Context, buckets []BucketInfo,
}
p.resyncer.Lock()
p.resyncer.statusMap[buckets[index].Name] = meta
p.resyncer.statusMap[bucket] = meta
p.resyncer.Unlock()
}
for index := range buckets {
bucket := buckets[index].Name
var tgts map[string]TargetReplicationResyncStatus
p.resyncer.RLock()
m, ok := p.resyncer.statusMap[bucket]
if ok {
tgts = m.cloneTgtStats()
}
p.resyncer.RUnlock()
tgts := meta.cloneTgtStats()
for arn, st := range tgts {
switch st.ResyncStatus {
case ResyncFailed, ResyncStarted, ResyncPending:

View file

@ -658,6 +658,7 @@ func serverMain(ctx *cli.Context) {
if err != nil {
logFatalErrs(err, Endpoint{}, true)
}
bootstrapTrace("newObjectLayer (initialized)")
xhttp.SetDeploymentID(globalDeploymentID)
xhttp.SetMinIOVersion(Version)
@ -719,6 +720,7 @@ func serverMain(ctx *cli.Context) {
go func() {
bootstrapTrace("globalIAMSys.Init")
globalIAMSys.Init(GlobalContext, newObject, globalEtcdClient, globalRefreshIAMInterval)
bootstrapTrace("globalIAMSys.Initialized")
// Initialize Console UI
if globalBrowserEnabled {
@ -816,23 +818,26 @@ func serverMain(ctx *cli.Context) {
// Initialize bucket metadata sub-system.
bootstrapTrace("globalBucketMetadataSys.Init")
globalBucketMetadataSys.Init(GlobalContext, buckets, newObject)
bootstrapTrace("globalBucketMetadataSys.Initialized")
// initialize replication resync state.
bootstrapTrace("go initResync")
go globalReplicationPool.initResync(GlobalContext, buckets, newObject)
bootstrapTrace("initResync")
globalReplicationPool.initResync(GlobalContext, buckets, newObject)
// Initialize site replication manager after bucket metadata
bootstrapTrace("globalSiteReplicationSys.Init")
globalSiteReplicationSys.Init(GlobalContext, newObject)
bootstrapTrace("globalSiteReplicationSys.Initialized")
// Initialize quota manager.
bootstrapTrace("globalBucketQuotaSys.Init")
globalBucketQuotaSys.Init(newObject)
bootstrapTrace("globalBucketQuotaSys.Initialized")
// Populate existing buckets to the etcd backend
if globalDNSConfig != nil {
// Background this operation.
bootstrapTrace("initFederatorBackend")
bootstrapTrace("go initFederatorBackend")
go initFederatorBackend(buckets, newObject)
}