diff --git a/cmd/bucket-replication-utils.go b/cmd/bucket-replication-utils.go index 283862f7a..5b11eea81 100644 --- a/cmd/bucket-replication-utils.go +++ b/cmd/bucket-replication-utils.go @@ -797,9 +797,8 @@ func (ri ReplicateObjectInfo) ToMRFEntry() MRFReplicateEntry { } } -func getReplicationStatsPath(nodeName string) string { - nodeStr := strings.ReplaceAll(nodeName, ":", "_") - return bucketMetaPrefix + SlashSeparator + replicationDir + SlashSeparator + nodeStr + ".stats" +func getReplicationStatsPath() string { + return bucketMetaPrefix + SlashSeparator + replicationDir + SlashSeparator + "replication.stats" } const ( diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index af022c718..ccdcb437a 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1526,6 +1526,7 @@ type ReplicationPool struct { mrfWorkerKillCh chan struct{} mrfReplicaCh chan ReplicationWorkerOperation mrfSaveCh chan MRFReplicateEntry + mrfStopCh chan struct{} mrfWorkerSize int saveStateCh chan struct{} mrfWorkerWg sync.WaitGroup @@ -1582,6 +1583,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool mrfWorkerKillCh: make(chan struct{}, failedWorkers), resyncer: newresyncer(), mrfSaveCh: make(chan MRFReplicateEntry, 100000), + mrfStopCh: make(chan struct{}, 1), saveStateCh: make(chan struct{}, 1), ctx: ctx, objLayer: o, @@ -2831,6 +2833,7 @@ func (p *ReplicationPool) persistMRF() { saveMRFToDisk(false) mTimer.Reset(mrfTimeInterval) case <-p.ctx.Done(): + p.mrfStopCh <- struct{}{} close(p.mrfSaveCh) saveMRFToDisk(true) return @@ -2860,8 +2863,13 @@ func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) { select { case <-GlobalContext.Done(): return - case p.mrfSaveCh <- entry: + case <-p.mrfStopCh: + return default: + select { + case p.mrfSaveCh <- entry: + default: + } } } @@ -3005,7 +3013,7 @@ func (p *ReplicationPool) loadStatsFromDisk() (rs map[string]BucketReplicationSt return map[string]BucketReplicationStats{}, nil } - data, err := readConfig(p.ctx, p.objLayer, getReplicationStatsPath(globalLocalNodeName)) + data, err := readConfig(p.ctx, p.objLayer, getReplicationStatsPath()) if err != nil { if !errors.Is(err, errConfigNotFound) { return rs, nil @@ -3048,6 +3056,8 @@ func (p *ReplicationPool) saveStatsToDisk() { if !p.initialized() { return } + ctx, cancel := globalLeaderLock.GetLock(p.ctx) + defer cancel() sTimer := time.NewTimer(replStatsSaveInterval) defer sTimer.Stop() for { @@ -3059,7 +3069,7 @@ func (p *ReplicationPool) saveStatsToDisk() { } p.saveStats(p.ctx) sTimer.Reset(replStatsSaveInterval) - case <-p.ctx.Done(): + case <-ctx.Done(): return } } @@ -3075,20 +3085,5 @@ func (p *ReplicationPool) saveStats(ctx context.Context) error { if data == nil { return err } - return saveConfig(ctx, p.objLayer, getReplicationStatsPath(globalLocalNodeName), data) -} - -// SaveState saves replication stats and mrf data before server restart -func (p *ReplicationPool) SaveState(ctx context.Context) error { - if !p.initialized() { - return nil - } - go func() { - select { - case p.saveStateCh <- struct{}{}: - case <-p.ctx.Done(): - return - } - }() - return p.saveStats(ctx) + return saveConfig(ctx, p.objLayer, getReplicationStatsPath(), data) } diff --git a/cmd/signals.go b/cmd/signals.go index ae58a9266..a6ba8adad 100644 --- a/cmd/signals.go +++ b/cmd/signals.go @@ -79,11 +79,9 @@ func handleSignals() { logger.LogIf(context.Background(), err) exit(stopProcess()) case osSignal := <-globalOSSignalCh: - globalReplicationPool.SaveState(context.Background()) logger.Info("Exiting on signal: %s", strings.ToUpper(osSignal.String())) exit(stopProcess()) case signal := <-globalServiceSignalCh: - globalReplicationPool.SaveState(context.Background()) switch signal { case serviceRestart: logger.Info("Restarting on service signal")