persist replication stats with leader lock (#16282)

This commit is contained in:
Poorna 2022-12-22 14:25:13 -08:00 committed by GitHub
parent 48152a56ac
commit de0b43de32
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 24 deletions

View file

@ -797,9 +797,8 @@ func (ri ReplicateObjectInfo) ToMRFEntry() MRFReplicateEntry {
} }
} }
func getReplicationStatsPath(nodeName string) string { func getReplicationStatsPath() string {
nodeStr := strings.ReplaceAll(nodeName, ":", "_") return bucketMetaPrefix + SlashSeparator + replicationDir + SlashSeparator + "replication.stats"
return bucketMetaPrefix + SlashSeparator + replicationDir + SlashSeparator + nodeStr + ".stats"
} }
const ( const (

View file

@ -1526,6 +1526,7 @@ type ReplicationPool struct {
mrfWorkerKillCh chan struct{} mrfWorkerKillCh chan struct{}
mrfReplicaCh chan ReplicationWorkerOperation mrfReplicaCh chan ReplicationWorkerOperation
mrfSaveCh chan MRFReplicateEntry mrfSaveCh chan MRFReplicateEntry
mrfStopCh chan struct{}
mrfWorkerSize int mrfWorkerSize int
saveStateCh chan struct{} saveStateCh chan struct{}
mrfWorkerWg sync.WaitGroup mrfWorkerWg sync.WaitGroup
@ -1582,6 +1583,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
mrfWorkerKillCh: make(chan struct{}, failedWorkers), mrfWorkerKillCh: make(chan struct{}, failedWorkers),
resyncer: newresyncer(), resyncer: newresyncer(),
mrfSaveCh: make(chan MRFReplicateEntry, 100000), mrfSaveCh: make(chan MRFReplicateEntry, 100000),
mrfStopCh: make(chan struct{}, 1),
saveStateCh: make(chan struct{}, 1), saveStateCh: make(chan struct{}, 1),
ctx: ctx, ctx: ctx,
objLayer: o, objLayer: o,
@ -2831,6 +2833,7 @@ func (p *ReplicationPool) persistMRF() {
saveMRFToDisk(false) saveMRFToDisk(false)
mTimer.Reset(mrfTimeInterval) mTimer.Reset(mrfTimeInterval)
case <-p.ctx.Done(): case <-p.ctx.Done():
p.mrfStopCh <- struct{}{}
close(p.mrfSaveCh) close(p.mrfSaveCh)
saveMRFToDisk(true) saveMRFToDisk(true)
return return
@ -2860,8 +2863,13 @@ func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) {
select { select {
case <-GlobalContext.Done(): case <-GlobalContext.Done():
return return
case p.mrfSaveCh <- entry: case <-p.mrfStopCh:
return
default: default:
select {
case p.mrfSaveCh <- entry:
default:
}
} }
} }
@ -3005,7 +3013,7 @@ func (p *ReplicationPool) loadStatsFromDisk() (rs map[string]BucketReplicationSt
return map[string]BucketReplicationStats{}, nil return map[string]BucketReplicationStats{}, nil
} }
data, err := readConfig(p.ctx, p.objLayer, getReplicationStatsPath(globalLocalNodeName)) data, err := readConfig(p.ctx, p.objLayer, getReplicationStatsPath())
if err != nil { if err != nil {
if !errors.Is(err, errConfigNotFound) { if !errors.Is(err, errConfigNotFound) {
return rs, nil return rs, nil
@ -3048,6 +3056,8 @@ func (p *ReplicationPool) saveStatsToDisk() {
if !p.initialized() { if !p.initialized() {
return return
} }
ctx, cancel := globalLeaderLock.GetLock(p.ctx)
defer cancel()
sTimer := time.NewTimer(replStatsSaveInterval) sTimer := time.NewTimer(replStatsSaveInterval)
defer sTimer.Stop() defer sTimer.Stop()
for { for {
@ -3059,7 +3069,7 @@ func (p *ReplicationPool) saveStatsToDisk() {
} }
p.saveStats(p.ctx) p.saveStats(p.ctx)
sTimer.Reset(replStatsSaveInterval) sTimer.Reset(replStatsSaveInterval)
case <-p.ctx.Done(): case <-ctx.Done():
return return
} }
} }
@ -3075,20 +3085,5 @@ func (p *ReplicationPool) saveStats(ctx context.Context) error {
if data == nil { if data == nil {
return err return err
} }
return saveConfig(ctx, p.objLayer, getReplicationStatsPath(globalLocalNodeName), data) return saveConfig(ctx, p.objLayer, getReplicationStatsPath(), data)
}
// SaveState saves replication stats and mrf data before server restart
func (p *ReplicationPool) SaveState(ctx context.Context) error {
if !p.initialized() {
return nil
}
go func() {
select {
case p.saveStateCh <- struct{}{}:
case <-p.ctx.Done():
return
}
}()
return p.saveStats(ctx)
} }

View file

@ -79,11 +79,9 @@ func handleSignals() {
logger.LogIf(context.Background(), err) logger.LogIf(context.Background(), err)
exit(stopProcess()) exit(stopProcess())
case osSignal := <-globalOSSignalCh: case osSignal := <-globalOSSignalCh:
globalReplicationPool.SaveState(context.Background())
logger.Info("Exiting on signal: %s", strings.ToUpper(osSignal.String())) logger.Info("Exiting on signal: %s", strings.ToUpper(osSignal.String()))
exit(stopProcess()) exit(stopProcess())
case signal := <-globalServiceSignalCh: case signal := <-globalServiceSignalCh:
globalReplicationPool.SaveState(context.Background())
switch signal { switch signal {
case serviceRestart: case serviceRestart:
logger.Info("Restarting on service signal") logger.Info("Restarting on service signal")