diff --git a/cmd/bucket-replication-stats.go b/cmd/bucket-replication-stats.go index a424a7f5f..8959fd520 100644 --- a/cmd/bucket-replication-stats.go +++ b/cmd/bucket-replication-stats.go @@ -51,6 +51,9 @@ func (r *ReplicationStats) Delete(bucket string) { r.Lock() defer r.Unlock() delete(r.Cache, bucket) + + r.ulock.Lock() + defer r.ulock.Unlock() delete(r.UsageCache, bucket) } @@ -82,10 +85,12 @@ func (r *ReplicationStats) Update(bucket string, arn string, n int64, duration t bs, ok := r.Cache[bucket] if !ok { bs = &BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)} + r.Cache[bucket] = bs } b, ok := bs.Stats[arn] if !ok { b = &BucketReplicationStat{} + bs.Stats[arn] = b } switch status { case replication.Completed: @@ -115,9 +120,6 @@ func (r *ReplicationStats) Update(bucket string, arn string, n int64, duration t b.ReplicaSize += n } } - - bs.Stats[arn] = b - r.Cache[bucket] = bs } // GetInitialUsage get replication metrics available at the time of cluster initialization @@ -128,10 +130,10 @@ func (r *ReplicationStats) GetInitialUsage(bucket string) BucketReplicationStats r.ulock.RLock() defer r.ulock.RUnlock() st, ok := r.UsageCache[bucket] - if ok { - return st.Clone() + if !ok { + return BucketReplicationStats{} } - return BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)} + return st.Clone() } // Get replication metrics for a bucket from this node since this node came up. @@ -145,7 +147,7 @@ func (r *ReplicationStats) Get(bucket string) BucketReplicationStats { st, ok := r.Cache[bucket] if !ok { - return BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)} + return BucketReplicationStats{} } return st.Clone() } @@ -162,41 +164,46 @@ func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *Replicatio func (r *ReplicationStats) loadInitialReplicationMetrics(ctx context.Context) { rTimer := time.NewTimer(time.Minute * 1) defer rTimer.Stop() + var ( + dui DataUsageInfo + err error + ) +outer: for { select { case <-ctx.Done(): return case <-rTimer.C: - dui, err := loadDataUsageFromBackend(GlobalContext, newObjectLayerFn()) + dui, err = loadDataUsageFromBackend(GlobalContext, newObjectLayerFn()) if err != nil { continue } - // data usage has not captured any data yet. - if dui.LastUpdate.IsZero() { - continue + // If LastUpdate is set, data usage is available. + if !dui.LastUpdate.IsZero() { + break outer } - m := make(map[string]*BucketReplicationStats) - for bucket, usage := range dui.BucketsUsage { - b := &BucketReplicationStats{ - Stats: make(map[string]*BucketReplicationStat, len(usage.ReplicationInfo)), - } - for arn, uinfo := range usage.ReplicationInfo { - b.Stats[arn] = &BucketReplicationStat{ - FailedSize: int64(uinfo.ReplicationFailedSize), - ReplicatedSize: int64(uinfo.ReplicatedSize), - ReplicaSize: int64(uinfo.ReplicaSize), - FailedCount: int64(uinfo.ReplicationFailedCount), - } - } - b.ReplicaSize += int64(usage.ReplicaSize) - if b.hasReplicationUsage() { - m[bucket] = b - } - } - r.ulock.Lock() - defer r.ulock.Unlock() - r.UsageCache = m - return } } + + m := make(map[string]*BucketReplicationStats) + for bucket, usage := range dui.BucketsUsage { + b := &BucketReplicationStats{ + Stats: make(map[string]*BucketReplicationStat, len(usage.ReplicationInfo)), + } + for arn, uinfo := range usage.ReplicationInfo { + b.Stats[arn] = &BucketReplicationStat{ + FailedSize: int64(uinfo.ReplicationFailedSize), + ReplicatedSize: int64(uinfo.ReplicatedSize), + ReplicaSize: int64(uinfo.ReplicaSize), + FailedCount: int64(uinfo.ReplicationFailedCount), + } + } + b.ReplicaSize += int64(usage.ReplicaSize) + if b.hasReplicationUsage() { + m[bucket] = b + } + } + r.ulock.Lock() + defer r.ulock.Unlock() + r.UsageCache = m } diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index b3e012146..26e08c46e 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1808,22 +1808,15 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic // add initial usage stat to cluster stats usageStat := globalReplicationStats.GetInitialUsage(bucket) totReplicaSize += usageStat.ReplicaSize - if usageStat.Stats != nil { - for arn, stat := range usageStat.Stats { - st := stats[arn] - if st == nil { - st = &BucketReplicationStat{ - ReplicatedSize: stat.ReplicatedSize, - FailedSize: stat.FailedSize, - FailedCount: stat.FailedCount, - } - } else { - st.ReplicatedSize += stat.ReplicatedSize - st.FailedSize += stat.FailedSize - st.FailedCount += stat.FailedCount - } + for arn, stat := range usageStat.Stats { + st, ok := stats[arn] + if !ok { + st = &BucketReplicationStat{} stats[arn] = st } + st.ReplicatedSize += stat.ReplicatedSize + st.FailedSize += stat.FailedSize + st.FailedCount += stat.FailedCount } s = BucketReplicationStats{ Stats: make(map[string]*BucketReplicationStat, len(stats)), diff --git a/cmd/bucket-stats.go b/cmd/bucket-stats.go index 9405d3cbe..da94be00b 100644 --- a/cmd/bucket-stats.go +++ b/cmd/bucket-stats.go @@ -18,7 +18,6 @@ package cmd import ( - "sync/atomic" "time" ) @@ -52,13 +51,6 @@ func (rl *ReplicationLatency) update(size int64, duration time.Duration) { rl.UploadHistogram.Add(size, duration) } -// Clone replication latency -func (rl ReplicationLatency) clone() ReplicationLatency { - return ReplicationLatency{ - UploadHistogram: rl.UploadHistogram.Clone(), - } -} - // BucketStats bucket statistics type BucketStats struct { ReplicationStats BucketReplicationStats @@ -88,29 +80,18 @@ func (brs *BucketReplicationStats) Empty() bool { } // Clone creates a new BucketReplicationStats copy -func (brs BucketReplicationStats) Clone() BucketReplicationStats { - c := BucketReplicationStats{ - Stats: make(map[string]*BucketReplicationStat, len(brs.Stats)), - } - // This is called only by replicationStats cache and already holds a read lock before calling Clone() +func (brs BucketReplicationStats) Clone() (c BucketReplicationStats) { + // This is called only by replicationStats cache and already holds a + // read lock before calling Clone() + + c = brs + // We need to copy the map, so we do not reference the one in `brs`. + c.Stats = make(map[string]*BucketReplicationStat, len(brs.Stats)) for arn, st := range brs.Stats { - c.Stats[arn] = &BucketReplicationStat{ - FailedSize: atomic.LoadInt64(&st.FailedSize), - ReplicatedSize: atomic.LoadInt64(&st.ReplicatedSize), - ReplicaSize: atomic.LoadInt64(&st.ReplicaSize), - FailedCount: atomic.LoadInt64(&st.FailedCount), - PendingSize: atomic.LoadInt64(&st.PendingSize), - PendingCount: atomic.LoadInt64(&st.PendingCount), - Latency: st.Latency.clone(), - } + // make a copy of `*st` + s := *st + c.Stats[arn] = &s } - // update total counts across targets - c.FailedSize = atomic.LoadInt64(&brs.FailedSize) - c.FailedCount = atomic.LoadInt64(&brs.FailedCount) - c.PendingCount = atomic.LoadInt64(&brs.PendingCount) - c.PendingSize = atomic.LoadInt64(&brs.PendingSize) - c.ReplicaSize = atomic.LoadInt64(&brs.ReplicaSize) - c.ReplicatedSize = atomic.LoadInt64(&brs.ReplicatedSize) return c } diff --git a/cmd/last-minute.go b/cmd/last-minute.go index c6d5cc672..d87c589fa 100644 --- a/cmd/last-minute.go +++ b/cmd/last-minute.go @@ -102,39 +102,21 @@ type LastMinuteLatencies struct { LastSec int64 } -// Clone safely returns a copy for a LastMinuteLatencies structure -func (l *LastMinuteLatencies) Clone() LastMinuteLatencies { - n := LastMinuteLatencies{} - n.LastSec = l.LastSec - for i := range l.Totals { - for j := range l.Totals[i] { - n.Totals[i][j] = AccElem{ - Total: l.Totals[i][j].Total, - N: l.Totals[i][j].N, - } - } - } - return n -} - // Merge safely merges two LastMinuteLatencies structures into one func (l LastMinuteLatencies) Merge(o LastMinuteLatencies) (merged LastMinuteLatencies) { - cl := l.Clone() - co := o.Clone() - - if cl.LastSec > co.LastSec { - co.forwardTo(cl.LastSec) - merged.LastSec = cl.LastSec + if l.LastSec > o.LastSec { + o.forwardTo(l.LastSec) + merged.LastSec = l.LastSec } else { - cl.forwardTo(co.LastSec) - merged.LastSec = co.LastSec + l.forwardTo(o.LastSec) + merged.LastSec = o.LastSec } - for i := range cl.Totals { - for j := range cl.Totals[i] { + for i := range merged.Totals { + for j := range merged.Totals[i] { merged.Totals[i][j] = AccElem{ - Total: cl.Totals[i][j].Total + co.Totals[i][j].Total, - N: cl.Totals[i][j].N + co.Totals[i][j].N, + Total: l.Totals[i][j].Total + o.Totals[i][j].Total, + N: l.Totals[i][j].N + o.Totals[i][j].N, } } }