fix; race in bucket replication stats (#13942)

- r.ulock was not locked when r.UsageCache was being modified

Bonus:

- simplify code by removing some unnecessary clone methods - we can 
do this because go arrays are values (not pointers/references) that are 
automatically copied on assignment.

- remove some unnecessary map allocation calls
This commit is contained in:
Aditya Manthramurthy 2021-12-17 15:33:13 -08:00 committed by GitHub
parent 13441ad0f8
commit 997e808088
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 66 additions and 103 deletions

View file

@ -51,6 +51,9 @@ func (r *ReplicationStats) Delete(bucket string) {
r.Lock()
defer r.Unlock()
delete(r.Cache, bucket)
r.ulock.Lock()
defer r.ulock.Unlock()
delete(r.UsageCache, bucket)
}
@ -82,10 +85,12 @@ func (r *ReplicationStats) Update(bucket string, arn string, n int64, duration t
bs, ok := r.Cache[bucket]
if !ok {
bs = &BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
r.Cache[bucket] = bs
}
b, ok := bs.Stats[arn]
if !ok {
b = &BucketReplicationStat{}
bs.Stats[arn] = b
}
switch status {
case replication.Completed:
@ -115,9 +120,6 @@ func (r *ReplicationStats) Update(bucket string, arn string, n int64, duration t
b.ReplicaSize += n
}
}
bs.Stats[arn] = b
r.Cache[bucket] = bs
}
// GetInitialUsage get replication metrics available at the time of cluster initialization
@ -128,10 +130,10 @@ func (r *ReplicationStats) GetInitialUsage(bucket string) BucketReplicationStats
r.ulock.RLock()
defer r.ulock.RUnlock()
st, ok := r.UsageCache[bucket]
if ok {
return st.Clone()
if !ok {
return BucketReplicationStats{}
}
return BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
return st.Clone()
}
// Get replication metrics for a bucket from this node since this node came up.
@ -145,7 +147,7 @@ func (r *ReplicationStats) Get(bucket string) BucketReplicationStats {
st, ok := r.Cache[bucket]
if !ok {
return BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
return BucketReplicationStats{}
}
return st.Clone()
}
@ -162,41 +164,46 @@ func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *Replicatio
func (r *ReplicationStats) loadInitialReplicationMetrics(ctx context.Context) {
rTimer := time.NewTimer(time.Minute * 1)
defer rTimer.Stop()
var (
dui DataUsageInfo
err error
)
outer:
for {
select {
case <-ctx.Done():
return
case <-rTimer.C:
dui, err := loadDataUsageFromBackend(GlobalContext, newObjectLayerFn())
dui, err = loadDataUsageFromBackend(GlobalContext, newObjectLayerFn())
if err != nil {
continue
}
// data usage has not captured any data yet.
if dui.LastUpdate.IsZero() {
continue
// If LastUpdate is set, data usage is available.
if !dui.LastUpdate.IsZero() {
break outer
}
m := make(map[string]*BucketReplicationStats)
for bucket, usage := range dui.BucketsUsage {
b := &BucketReplicationStats{
Stats: make(map[string]*BucketReplicationStat, len(usage.ReplicationInfo)),
}
for arn, uinfo := range usage.ReplicationInfo {
b.Stats[arn] = &BucketReplicationStat{
FailedSize: int64(uinfo.ReplicationFailedSize),
ReplicatedSize: int64(uinfo.ReplicatedSize),
ReplicaSize: int64(uinfo.ReplicaSize),
FailedCount: int64(uinfo.ReplicationFailedCount),
}
}
b.ReplicaSize += int64(usage.ReplicaSize)
if b.hasReplicationUsage() {
m[bucket] = b
}
}
r.ulock.Lock()
defer r.ulock.Unlock()
r.UsageCache = m
return
}
}
m := make(map[string]*BucketReplicationStats)
for bucket, usage := range dui.BucketsUsage {
b := &BucketReplicationStats{
Stats: make(map[string]*BucketReplicationStat, len(usage.ReplicationInfo)),
}
for arn, uinfo := range usage.ReplicationInfo {
b.Stats[arn] = &BucketReplicationStat{
FailedSize: int64(uinfo.ReplicationFailedSize),
ReplicatedSize: int64(uinfo.ReplicatedSize),
ReplicaSize: int64(uinfo.ReplicaSize),
FailedCount: int64(uinfo.ReplicationFailedCount),
}
}
b.ReplicaSize += int64(usage.ReplicaSize)
if b.hasReplicationUsage() {
m[bucket] = b
}
}
r.ulock.Lock()
defer r.ulock.Unlock()
r.UsageCache = m
}

View file

@ -1808,22 +1808,15 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic
// add initial usage stat to cluster stats
usageStat := globalReplicationStats.GetInitialUsage(bucket)
totReplicaSize += usageStat.ReplicaSize
if usageStat.Stats != nil {
for arn, stat := range usageStat.Stats {
st := stats[arn]
if st == nil {
st = &BucketReplicationStat{
ReplicatedSize: stat.ReplicatedSize,
FailedSize: stat.FailedSize,
FailedCount: stat.FailedCount,
}
} else {
st.ReplicatedSize += stat.ReplicatedSize
st.FailedSize += stat.FailedSize
st.FailedCount += stat.FailedCount
}
for arn, stat := range usageStat.Stats {
st, ok := stats[arn]
if !ok {
st = &BucketReplicationStat{}
stats[arn] = st
}
st.ReplicatedSize += stat.ReplicatedSize
st.FailedSize += stat.FailedSize
st.FailedCount += stat.FailedCount
}
s = BucketReplicationStats{
Stats: make(map[string]*BucketReplicationStat, len(stats)),

View file

@ -18,7 +18,6 @@
package cmd
import (
"sync/atomic"
"time"
)
@ -52,13 +51,6 @@ func (rl *ReplicationLatency) update(size int64, duration time.Duration) {
rl.UploadHistogram.Add(size, duration)
}
// Clone replication latency
func (rl ReplicationLatency) clone() ReplicationLatency {
return ReplicationLatency{
UploadHistogram: rl.UploadHistogram.Clone(),
}
}
// BucketStats bucket statistics
type BucketStats struct {
ReplicationStats BucketReplicationStats
@ -88,29 +80,18 @@ func (brs *BucketReplicationStats) Empty() bool {
}
// Clone creates a new BucketReplicationStats copy
func (brs BucketReplicationStats) Clone() BucketReplicationStats {
c := BucketReplicationStats{
Stats: make(map[string]*BucketReplicationStat, len(brs.Stats)),
}
// This is called only by replicationStats cache and already holds a read lock before calling Clone()
func (brs BucketReplicationStats) Clone() (c BucketReplicationStats) {
// This is called only by replicationStats cache and already holds a
// read lock before calling Clone()
c = brs
// We need to copy the map, so we do not reference the one in `brs`.
c.Stats = make(map[string]*BucketReplicationStat, len(brs.Stats))
for arn, st := range brs.Stats {
c.Stats[arn] = &BucketReplicationStat{
FailedSize: atomic.LoadInt64(&st.FailedSize),
ReplicatedSize: atomic.LoadInt64(&st.ReplicatedSize),
ReplicaSize: atomic.LoadInt64(&st.ReplicaSize),
FailedCount: atomic.LoadInt64(&st.FailedCount),
PendingSize: atomic.LoadInt64(&st.PendingSize),
PendingCount: atomic.LoadInt64(&st.PendingCount),
Latency: st.Latency.clone(),
}
// make a copy of `*st`
s := *st
c.Stats[arn] = &s
}
// update total counts across targets
c.FailedSize = atomic.LoadInt64(&brs.FailedSize)
c.FailedCount = atomic.LoadInt64(&brs.FailedCount)
c.PendingCount = atomic.LoadInt64(&brs.PendingCount)
c.PendingSize = atomic.LoadInt64(&brs.PendingSize)
c.ReplicaSize = atomic.LoadInt64(&brs.ReplicaSize)
c.ReplicatedSize = atomic.LoadInt64(&brs.ReplicatedSize)
return c
}

View file

@ -102,39 +102,21 @@ type LastMinuteLatencies struct {
LastSec int64
}
// Clone safely returns a copy for a LastMinuteLatencies structure
func (l *LastMinuteLatencies) Clone() LastMinuteLatencies {
n := LastMinuteLatencies{}
n.LastSec = l.LastSec
for i := range l.Totals {
for j := range l.Totals[i] {
n.Totals[i][j] = AccElem{
Total: l.Totals[i][j].Total,
N: l.Totals[i][j].N,
}
}
}
return n
}
// Merge safely merges two LastMinuteLatencies structures into one
func (l LastMinuteLatencies) Merge(o LastMinuteLatencies) (merged LastMinuteLatencies) {
cl := l.Clone()
co := o.Clone()
if cl.LastSec > co.LastSec {
co.forwardTo(cl.LastSec)
merged.LastSec = cl.LastSec
if l.LastSec > o.LastSec {
o.forwardTo(l.LastSec)
merged.LastSec = l.LastSec
} else {
cl.forwardTo(co.LastSec)
merged.LastSec = co.LastSec
l.forwardTo(o.LastSec)
merged.LastSec = o.LastSec
}
for i := range cl.Totals {
for j := range cl.Totals[i] {
for i := range merged.Totals {
for j := range merged.Totals[i] {
merged.Totals[i][j] = AccElem{
Total: cl.Totals[i][j].Total + co.Totals[i][j].Total,
N: cl.Totals[i][j].N + co.Totals[i][j].N,
Total: l.Totals[i][j].Total + o.Totals[i][j].Total,
N: l.Totals[i][j].N + o.Totals[i][j].N,
}
}
}