minio/cmd/bucket-replication-stats.go
Klaus Post f1302c40fe
Fix uninitialized replication stats (#20260)
Services are unfrozen before `initBackgroundReplication` is finished. This means that 
the globalReplicationStats write is racy. Switch to an atomic pointer.

Provide the `ReplicationPool` with the stats, so it doesn't have to be grabbed 
from the atomic pointer on every use.

All other loads and checks are nil, and calls return empty values when stats 
still haven't been initialized.
2024-08-15 05:04:40 -07:00

518 lines
14 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/minio/minio/internal/bucket/replication"
"github.com/rcrowley/go-metrics"
)
func (b *BucketReplicationStats) hasReplicationUsage() bool {
for _, s := range b.Stats {
if s.hasReplicationUsage() {
return true
}
}
return false
}
// ReplicationStats holds the global in-memory replication stats
type ReplicationStats struct {
// map of site deployment ID to site replication status
// for site replication - maintain stats at global level
srStats *SRStats
// active worker stats
workers *ActiveWorkerStat
// queue stats cache
qCache queueCache
pCache proxyStatsCache
// mrf backlog stats
mrfStats ReplicationMRFStats
// for bucket replication, continue to use existing cache
Cache map[string]*BucketReplicationStats
mostRecentStats BucketStatsMap
registry metrics.Registry
sync.RWMutex // mutex for Cache
mostRecentStatsMu sync.Mutex // mutex for mostRecentStats
wlock sync.RWMutex // mutex for active workers
movingAvgTicker *time.Ticker // Ticker for calculating moving averages
wTimer *time.Ticker // ticker for calculating active workers
qTimer *time.Ticker // ticker for calculating queue stats
}
func (r *ReplicationStats) trackEWMA() {
for {
select {
case <-r.movingAvgTicker.C:
r.updateMovingAvg()
case <-GlobalContext.Done():
return
}
}
}
func (r *ReplicationStats) updateMovingAvg() {
r.RLock()
for _, s := range r.Cache {
for _, st := range s.Stats {
st.XferRateLrg.measure.updateExponentialMovingAverage(time.Now())
st.XferRateSml.measure.updateExponentialMovingAverage(time.Now())
}
}
r.RUnlock()
}
// ActiveWorkers returns worker stats
func (r *ReplicationStats) ActiveWorkers() ActiveWorkerStat {
if r == nil {
return ActiveWorkerStat{}
}
r.wlock.RLock()
defer r.wlock.RUnlock()
w := r.workers.get()
return ActiveWorkerStat{
Curr: w.Curr,
Max: w.Max,
Avg: w.Avg,
}
}
func (r *ReplicationStats) collectWorkerMetrics(ctx context.Context) {
if r == nil {
return
}
for {
select {
case <-ctx.Done():
return
case <-r.wTimer.C:
r.wlock.Lock()
r.workers.update()
r.wlock.Unlock()
}
}
}
func (r *ReplicationStats) collectQueueMetrics(ctx context.Context) {
if r == nil {
return
}
for {
select {
case <-ctx.Done():
return
case <-r.qTimer.C:
r.qCache.update()
}
}
}
// Delete deletes in-memory replication statistics for a bucket.
func (r *ReplicationStats) Delete(bucket string) {
if r == nil {
return
}
r.Lock()
defer r.Unlock()
delete(r.Cache, bucket)
}
// UpdateReplicaStat updates in-memory replica statistics with new values.
func (r *ReplicationStats) UpdateReplicaStat(bucket string, n int64) {
if r == nil {
return
}
r.Lock()
defer r.Unlock()
bs, ok := r.Cache[bucket]
if !ok {
bs = newBucketReplicationStats()
}
bs.ReplicaSize += n
bs.ReplicaCount++
r.Cache[bucket] = bs
r.srUpdateReplicaStat(n)
}
func (r *ReplicationStats) srUpdateReplicaStat(sz int64) {
if r == nil {
return
}
atomic.AddInt64(&r.srStats.ReplicaSize, sz)
atomic.AddInt64(&r.srStats.ReplicaCount, 1)
}
func (r *ReplicationStats) srUpdate(sr replStat) {
dID, err := globalSiteReplicationSys.getDeplIDForEndpoint(sr.endpoint())
if err == nil {
r.srStats.update(sr, dID)
}
}
// Update updates in-memory replication statistics with new values.
func (r *ReplicationStats) Update(bucket string, ri replicatedTargetInfo, status, prevStatus replication.StatusType) {
if r == nil {
return
}
var rs replStat
switch status {
case replication.Pending:
if ri.OpType.IsDataReplication() && prevStatus != status {
rs.set(ri.Arn, ri.Size, 0, status, ri.OpType, ri.endpoint, ri.secure, ri.Err)
}
case replication.Completed:
if ri.OpType.IsDataReplication() {
rs.set(ri.Arn, ri.Size, ri.Duration, status, ri.OpType, ri.endpoint, ri.secure, ri.Err)
}
case replication.Failed:
if ri.OpType.IsDataReplication() && prevStatus == replication.Pending {
rs.set(ri.Arn, ri.Size, ri.Duration, status, ri.OpType, ri.endpoint, ri.secure, ri.Err)
}
case replication.Replica:
if ri.OpType == replication.ObjectReplicationType {
rs.set(ri.Arn, ri.Size, 0, status, ri.OpType, "", false, ri.Err)
}
}
// update site-replication in-memory stats
if rs.Completed || rs.Failed {
r.srUpdate(rs)
}
r.Lock()
defer r.Unlock()
// update bucket replication in-memory stats
bs, ok := r.Cache[bucket]
if !ok {
bs = newBucketReplicationStats()
r.Cache[bucket] = bs
}
b, ok := bs.Stats[ri.Arn]
if !ok {
b = &BucketReplicationStat{
XferRateLrg: newXferStats(),
XferRateSml: newXferStats(),
}
bs.Stats[ri.Arn] = b
}
switch {
case rs.Completed:
b.ReplicatedSize += rs.TransferSize
b.ReplicatedCount++
if rs.TransferDuration > 0 {
b.Latency.update(rs.TransferSize, rs.TransferDuration)
b.updateXferRate(rs.TransferSize, rs.TransferDuration)
}
case rs.Failed:
b.FailStats.addsize(rs.TransferSize, rs.Err)
case rs.Pending:
}
}
type replStat struct {
Arn string
Completed bool
Pending bool
Failed bool
opType replication.Type
// transfer size
TransferSize int64
// transfer duration
TransferDuration time.Duration
Endpoint string
Secure bool
Err error
}
func (rs *replStat) endpoint() string {
scheme := "http"
if rs.Secure {
scheme = "https"
}
return scheme + "://" + rs.Endpoint
}
func (rs *replStat) set(arn string, n int64, duration time.Duration, status replication.StatusType, opType replication.Type, endpoint string, secure bool, err error) {
rs.Endpoint = endpoint
rs.Secure = secure
rs.TransferSize = n
rs.Arn = arn
rs.TransferDuration = duration
rs.opType = opType
switch status {
case replication.Completed:
rs.Completed = true
case replication.Pending:
rs.Pending = true
case replication.Failed:
rs.Failed = true
rs.Err = err
}
}
// GetAll returns replication metrics for all buckets at once.
func (r *ReplicationStats) GetAll() map[string]BucketReplicationStats {
if r == nil {
return map[string]BucketReplicationStats{}
}
r.RLock()
bucketReplicationStats := make(map[string]BucketReplicationStats, len(r.Cache))
for k, v := range r.Cache {
bucketReplicationStats[k] = v.Clone()
}
r.RUnlock()
for k, v := range bucketReplicationStats {
v.QStat = r.qCache.getBucketStats(k)
bucketReplicationStats[k] = v
}
return bucketReplicationStats
}
func (r *ReplicationStats) getSRMetricsForNode() SRMetricsSummary {
if r == nil {
return SRMetricsSummary{}
}
m := SRMetricsSummary{
Uptime: UTCNow().Unix() - globalBootTime.Unix(),
Queued: r.qCache.getSiteStats(),
ActiveWorkers: r.ActiveWorkers(),
Metrics: r.srStats.get(),
Proxied: r.pCache.getSiteStats(),
ReplicaSize: atomic.LoadInt64(&r.srStats.ReplicaSize),
ReplicaCount: atomic.LoadInt64(&r.srStats.ReplicaCount),
}
return m
}
// Get replication metrics for a bucket from this node since this node came up.
func (r *ReplicationStats) Get(bucket string) BucketReplicationStats {
if r == nil {
return BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
}
r.RLock()
defer r.RUnlock()
st, ok := r.Cache[bucket]
if !ok {
return BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
}
return st.Clone()
}
// NewReplicationStats initialize in-memory replication statistics
func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *ReplicationStats {
r := metrics.NewRegistry()
rs := ReplicationStats{
Cache: make(map[string]*BucketReplicationStats),
qCache: newQueueCache(r),
pCache: newProxyStatsCache(),
srStats: newSRStats(),
movingAvgTicker: time.NewTicker(2 * time.Second),
wTimer: time.NewTicker(2 * time.Second),
qTimer: time.NewTicker(2 * time.Second),
workers: newActiveWorkerStat(r),
registry: r,
}
go rs.collectWorkerMetrics(ctx)
go rs.collectQueueMetrics(ctx)
return &rs
}
func (r *ReplicationStats) getAllLatest(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketStats) {
if r == nil {
return nil
}
peerBucketStatsList := globalNotificationSys.GetClusterAllBucketStats(GlobalContext)
bucketsReplicationStats = make(map[string]BucketStats, len(bucketsUsage))
for bucket := range bucketsUsage {
bucketStats := make([]BucketStats, len(peerBucketStatsList))
for i, peerBucketStats := range peerBucketStatsList {
bucketStat, ok := peerBucketStats.Stats[bucket]
if !ok {
continue
}
bucketStats[i] = bucketStat
}
bucketsReplicationStats[bucket] = r.calculateBucketReplicationStats(bucket, bucketStats)
}
return bucketsReplicationStats
}
func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, bucketStats []BucketStats) (bs BucketStats) {
if r == nil {
bs = BucketStats{
ReplicationStats: BucketReplicationStats{
Stats: make(map[string]*BucketReplicationStat),
},
QueueStats: ReplicationQueueStats{},
ProxyStats: ProxyMetric{},
}
return bs
}
var s BucketReplicationStats
// accumulate cluster bucket stats
stats := make(map[string]*BucketReplicationStat)
var (
totReplicaSize, totReplicatedSize int64
totReplicaCount, totReplicatedCount int64
totFailed RTimedMetrics
tq InQueueMetric
)
for _, bucketStat := range bucketStats {
totReplicaSize += bucketStat.ReplicationStats.ReplicaSize
totReplicaCount += bucketStat.ReplicationStats.ReplicaCount
for _, q := range bucketStat.QueueStats.Nodes {
tq = tq.merge(q.QStats)
}
for arn, stat := range bucketStat.ReplicationStats.Stats {
oldst := stats[arn]
if oldst == nil {
oldst = &BucketReplicationStat{
XferRateLrg: newXferStats(),
XferRateSml: newXferStats(),
}
}
fstats := stat.FailStats.merge(oldst.FailStats)
lrg := oldst.XferRateLrg.merge(*stat.XferRateLrg)
sml := oldst.XferRateSml.merge(*stat.XferRateSml)
stats[arn] = &BucketReplicationStat{
Failed: fstats.toMetric(),
FailStats: fstats,
ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize,
ReplicatedCount: stat.ReplicatedCount + oldst.ReplicatedCount,
Latency: stat.Latency.merge(oldst.Latency),
XferRateLrg: &lrg,
XferRateSml: &sml,
}
totReplicatedSize += stat.ReplicatedSize
totReplicatedCount += stat.ReplicatedCount
totFailed = totFailed.merge(stat.FailStats)
}
}
s = BucketReplicationStats{
Stats: stats,
QStat: tq,
ReplicaSize: totReplicaSize,
ReplicaCount: totReplicaCount,
ReplicatedSize: totReplicatedSize,
ReplicatedCount: totReplicatedCount,
Failed: totFailed.toMetric(),
}
var qs ReplicationQueueStats
for _, bs := range bucketStats {
qs.Nodes = append(qs.Nodes, bs.QueueStats.Nodes...)
}
qs.Uptime = UTCNow().Unix() - globalBootTime.Unix()
var ps ProxyMetric
for _, bs := range bucketStats {
ps.add(bs.ProxyStats)
}
bs = BucketStats{
ReplicationStats: s,
QueueStats: qs,
ProxyStats: ps,
}
r.mostRecentStatsMu.Lock()
if len(r.mostRecentStats.Stats) == 0 {
r.mostRecentStats = BucketStatsMap{Stats: make(map[string]BucketStats, 1), Timestamp: UTCNow()}
}
if len(bs.ReplicationStats.Stats) > 0 {
r.mostRecentStats.Stats[bucket] = bs
}
r.mostRecentStats.Timestamp = UTCNow()
r.mostRecentStatsMu.Unlock()
return bs
}
// get the most current of in-memory replication stats and data usage info from crawler.
func (r *ReplicationStats) getLatestReplicationStats(bucket string) (s BucketStats) {
if r == nil {
return s
}
bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket)
return r.calculateBucketReplicationStats(bucket, bucketStats)
}
func (r *ReplicationStats) incQ(bucket string, sz int64, isDeleteRepl bool, opType replication.Type) {
r.qCache.Lock()
defer r.qCache.Unlock()
v, ok := r.qCache.bucketStats[bucket]
if !ok {
v = newInQueueStats(r.registry, bucket)
}
atomic.AddInt64(&v.nowBytes, sz)
atomic.AddInt64(&v.nowCount, 1)
r.qCache.bucketStats[bucket] = v
atomic.AddInt64(&r.qCache.srQueueStats.nowBytes, sz)
atomic.AddInt64(&r.qCache.srQueueStats.nowCount, 1)
}
func (r *ReplicationStats) decQ(bucket string, sz int64, isDelMarker bool, opType replication.Type) {
r.qCache.Lock()
defer r.qCache.Unlock()
v, ok := r.qCache.bucketStats[bucket]
if !ok {
v = newInQueueStats(r.registry, bucket)
}
atomic.AddInt64(&v.nowBytes, -1*sz)
atomic.AddInt64(&v.nowCount, -1)
r.qCache.bucketStats[bucket] = v
atomic.AddInt64(&r.qCache.srQueueStats.nowBytes, -1*sz)
atomic.AddInt64(&r.qCache.srQueueStats.nowCount, -1)
}
// incProxy increments proxy metrics for proxied calls
func (r *ReplicationStats) incProxy(bucket string, api replProxyAPI, isErr bool) {
if r != nil {
r.pCache.inc(bucket, api, isErr)
}
}
func (r *ReplicationStats) getProxyStats(bucket string) ProxyMetric {
if r == nil {
return ProxyMetric{}
}
return r.pCache.getBucketStats(bucket)
}