From 27d02ea6f76cbc580d7d19bf39adcf50f498a347 Mon Sep 17 00:00:00 2001 From: Poorna Date: Mon, 5 Feb 2024 22:00:45 -0800 Subject: [PATCH] metrics: add replication metrics on proxied requests (#18957) --- cmd/bucket-replication-metrics.go | 142 +++++++++ cmd/bucket-replication-metrics_gen.go | 328 +++++++++++++++++++++ cmd/bucket-replication-metrics_gen_test.go | 113 +++++++ cmd/bucket-replication-stats.go | 21 +- cmd/bucket-replication.go | 3 + cmd/bucket-stats.go | 1 + cmd/bucket-stats_gen.go | 39 ++- cmd/metrics-v2.go | 203 ++++++++++++- cmd/notification.go | 2 + cmd/object-handlers.go | 10 + cmd/peer-rest-server.go | 2 + cmd/site-replication-metrics.go | 2 + cmd/site-replication-metrics_gen.go | 39 ++- cmd/site-replication.go | 1 + docs/metrics/prometheus/list.md | 21 ++ go.mod | 2 + go.sum | 4 +- 17 files changed, 916 insertions(+), 17 deletions(-) diff --git a/cmd/bucket-replication-metrics.go b/cmd/bucket-replication-metrics.go index 7afe2b4c9..3b3b56af6 100644 --- a/cmd/bucket-replication-metrics.go +++ b/cmd/bucket-replication-metrics.go @@ -379,3 +379,145 @@ func (s *SMA) simpleMovingAvg() float64 { const ( defaultWindowSize = 10 ) + +type proxyStatsCache struct { + srProxyStats ProxyMetric + bucketStats map[string]ProxyMetric + sync.RWMutex // mutex for proxy stats +} + +func newProxyStatsCache() proxyStatsCache { + return proxyStatsCache{ + bucketStats: make(map[string]ProxyMetric), + } +} + +func (p *proxyStatsCache) inc(bucket string, api replProxyAPI, isErr bool) { + p.Lock() + defer p.Unlock() + v, ok := p.bucketStats[bucket] + if !ok { + v = ProxyMetric{} + } + switch api { + case putObjectTaggingAPI: + if !isErr { + atomic.AddUint64(&v.PutTagTotal, 1) + atomic.AddUint64(&p.srProxyStats.PutTagTotal, 1) + } else { + atomic.AddUint64(&v.PutTagFailedTotal, 1) + atomic.AddUint64(&p.srProxyStats.PutTagFailedTotal, 1) + } + case getObjectTaggingAPI: + if !isErr { + atomic.AddUint64(&v.GetTagTotal, 1) + atomic.AddUint64(&p.srProxyStats.GetTagTotal, 1) + } else { + atomic.AddUint64(&v.GetTagFailedTotal, 1) + atomic.AddUint64(&p.srProxyStats.GetTagFailedTotal, 1) + } + case removeObjectTaggingAPI: + if !isErr { + atomic.AddUint64(&v.RmvTagTotal, 1) + atomic.AddUint64(&p.srProxyStats.RmvTagTotal, 1) + } else { + atomic.AddUint64(&v.RmvTagFailedTotal, 1) + atomic.AddUint64(&p.srProxyStats.RmvTagFailedTotal, 1) + } + case headObjectAPI: + if !isErr { + atomic.AddUint64(&v.HeadTotal, 1) + atomic.AddUint64(&p.srProxyStats.HeadTotal, 1) + } else { + atomic.AddUint64(&v.HeadFailedTotal, 1) + atomic.AddUint64(&p.srProxyStats.HeadFailedTotal, 1) + } + case getObjectAPI: + if !isErr { + atomic.AddUint64(&v.GetTotal, 1) + atomic.AddUint64(&p.srProxyStats.GetTotal, 1) + } else { + atomic.AddUint64(&v.GetFailedTotal, 1) + atomic.AddUint64(&p.srProxyStats.GetFailedTotal, 1) + } + default: + return + } + p.bucketStats[bucket] = v +} + +func (p *proxyStatsCache) getBucketStats(bucket string) ProxyMetric { + p.RLock() + defer p.RUnlock() + v, ok := p.bucketStats[bucket] + + if !ok { + return ProxyMetric{} + } + return ProxyMetric{ + PutTagTotal: atomic.LoadUint64(&v.PutTagTotal), + GetTagTotal: atomic.LoadUint64(&v.GetTagTotal), + RmvTagTotal: atomic.LoadUint64(&v.RmvTagTotal), + HeadTotal: atomic.LoadUint64(&v.HeadTotal), + GetTotal: atomic.LoadUint64(&v.GetTotal), + + PutTagFailedTotal: atomic.LoadUint64(&v.PutTagFailedTotal), + GetTagFailedTotal: atomic.LoadUint64(&v.GetTagFailedTotal), + RmvTagFailedTotal: atomic.LoadUint64(&v.RmvTagFailedTotal), + HeadFailedTotal: atomic.LoadUint64(&v.HeadFailedTotal), + GetFailedTotal: atomic.LoadUint64(&v.GetFailedTotal), + } +} + +func (p *proxyStatsCache) getSiteStats() ProxyMetric { + v := p.srProxyStats + return ProxyMetric{ + PutTagTotal: atomic.LoadUint64(&v.PutTagTotal), + GetTagTotal: atomic.LoadUint64(&v.GetTagTotal), + RmvTagTotal: atomic.LoadUint64(&v.RmvTagTotal), + HeadTotal: atomic.LoadUint64(&v.HeadTotal), + GetTotal: atomic.LoadUint64(&v.GetTotal), + PutTagFailedTotal: atomic.LoadUint64(&v.PutTagFailedTotal), + GetTagFailedTotal: atomic.LoadUint64(&v.GetTagFailedTotal), + RmvTagFailedTotal: atomic.LoadUint64(&v.RmvTagFailedTotal), + HeadFailedTotal: atomic.LoadUint64(&v.HeadFailedTotal), + GetFailedTotal: atomic.LoadUint64(&v.GetFailedTotal), + } +} + +type replProxyAPI string + +const ( + putObjectTaggingAPI replProxyAPI = "PutObjectTagging" + getObjectTaggingAPI replProxyAPI = "GetObjectTagging" + removeObjectTaggingAPI replProxyAPI = "RemoveObjectTagging" + headObjectAPI replProxyAPI = "HeadObject" + getObjectAPI replProxyAPI = "GetObject" +) + +// ProxyMetric holds stats for replication proxying +type ProxyMetric struct { + PutTagTotal uint64 `json:"putTaggingProxyTotal" msg:"ptc"` + GetTagTotal uint64 `json:"getTaggingProxyTotal" msg:"gtc"` + RmvTagTotal uint64 `json:"removeTaggingProxyTotal" msg:"rtc"` + GetTotal uint64 `json:"getProxyTotal" msg:"gc"` + HeadTotal uint64 `json:"headProxyTotal" msg:"hc"` + PutTagFailedTotal uint64 `json:"putTaggingProxyFailed" msg:"ptf"` + GetTagFailedTotal uint64 `json:"getTaggingProxyFailed" msg:"gtf"` + RmvTagFailedTotal uint64 `json:"removeTaggingProxyFailed" msg:"rtf"` + GetFailedTotal uint64 `json:"getProxyFailed" msg:"gf"` + HeadFailedTotal uint64 `json:"headProxyFailed" msg:"hf"` +} + +func (p *ProxyMetric) add(p2 ProxyMetric) { + atomic.AddUint64(&p.GetTotal, p2.GetTotal) + atomic.AddUint64(&p.HeadTotal, p2.HeadTotal) + atomic.AddUint64(&p.GetTagTotal, p2.GetTagTotal) + atomic.AddUint64(&p.PutTagTotal, p2.PutTagTotal) + atomic.AddUint64(&p.RmvTagTotal, p2.RmvTagTotal) + atomic.AddUint64(&p.GetFailedTotal, p2.GetFailedTotal) + atomic.AddUint64(&p.HeadFailedTotal, p2.HeadFailedTotal) + atomic.AddUint64(&p.GetTagFailedTotal, p2.GetTagFailedTotal) + atomic.AddUint64(&p.PutTagFailedTotal, p2.PutTagFailedTotal) + atomic.AddUint64(&p.RmvTagFailedTotal, p2.RmvTagFailedTotal) +} diff --git a/cmd/bucket-replication-metrics_gen.go b/cmd/bucket-replication-metrics_gen.go index c67a3c097..f0443ab8a 100644 --- a/cmd/bucket-replication-metrics_gen.go +++ b/cmd/bucket-replication-metrics_gen.go @@ -635,6 +635,334 @@ func (z InQueueStats) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *ProxyMetric) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "ptc": + z.PutTagTotal, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "PutTagTotal") + return + } + case "gtc": + z.GetTagTotal, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "GetTagTotal") + return + } + case "rtc": + z.RmvTagTotal, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "RmvTagTotal") + return + } + case "gc": + z.GetTotal, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "GetTotal") + return + } + case "hc": + z.HeadTotal, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "HeadTotal") + return + } + case "ptf": + z.PutTagFailedTotal, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "PutTagFailedTotal") + return + } + case "gtf": + z.GetTagFailedTotal, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "GetTagFailedTotal") + return + } + case "rtf": + z.RmvTagFailedTotal, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "RmvTagFailedTotal") + return + } + case "gf": + z.GetFailedTotal, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "GetFailedTotal") + return + } + case "hf": + z.HeadFailedTotal, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "HeadFailedTotal") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *ProxyMetric) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 10 + // write "ptc" + err = en.Append(0x8a, 0xa3, 0x70, 0x74, 0x63) + if err != nil { + return + } + err = en.WriteUint64(z.PutTagTotal) + if err != nil { + err = msgp.WrapError(err, "PutTagTotal") + return + } + // write "gtc" + err = en.Append(0xa3, 0x67, 0x74, 0x63) + if err != nil { + return + } + err = en.WriteUint64(z.GetTagTotal) + if err != nil { + err = msgp.WrapError(err, "GetTagTotal") + return + } + // write "rtc" + err = en.Append(0xa3, 0x72, 0x74, 0x63) + if err != nil { + return + } + err = en.WriteUint64(z.RmvTagTotal) + if err != nil { + err = msgp.WrapError(err, "RmvTagTotal") + return + } + // write "gc" + err = en.Append(0xa2, 0x67, 0x63) + if err != nil { + return + } + err = en.WriteUint64(z.GetTotal) + if err != nil { + err = msgp.WrapError(err, "GetTotal") + return + } + // write "hc" + err = en.Append(0xa2, 0x68, 0x63) + if err != nil { + return + } + err = en.WriteUint64(z.HeadTotal) + if err != nil { + err = msgp.WrapError(err, "HeadTotal") + return + } + // write "ptf" + err = en.Append(0xa3, 0x70, 0x74, 0x66) + if err != nil { + return + } + err = en.WriteUint64(z.PutTagFailedTotal) + if err != nil { + err = msgp.WrapError(err, "PutTagFailedTotal") + return + } + // write "gtf" + err = en.Append(0xa3, 0x67, 0x74, 0x66) + if err != nil { + return + } + err = en.WriteUint64(z.GetTagFailedTotal) + if err != nil { + err = msgp.WrapError(err, "GetTagFailedTotal") + return + } + // write "rtf" + err = en.Append(0xa3, 0x72, 0x74, 0x66) + if err != nil { + return + } + err = en.WriteUint64(z.RmvTagFailedTotal) + if err != nil { + err = msgp.WrapError(err, "RmvTagFailedTotal") + return + } + // write "gf" + err = en.Append(0xa2, 0x67, 0x66) + if err != nil { + return + } + err = en.WriteUint64(z.GetFailedTotal) + if err != nil { + err = msgp.WrapError(err, "GetFailedTotal") + return + } + // write "hf" + err = en.Append(0xa2, 0x68, 0x66) + if err != nil { + return + } + err = en.WriteUint64(z.HeadFailedTotal) + if err != nil { + err = msgp.WrapError(err, "HeadFailedTotal") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *ProxyMetric) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 10 + // string "ptc" + o = append(o, 0x8a, 0xa3, 0x70, 0x74, 0x63) + o = msgp.AppendUint64(o, z.PutTagTotal) + // string "gtc" + o = append(o, 0xa3, 0x67, 0x74, 0x63) + o = msgp.AppendUint64(o, z.GetTagTotal) + // string "rtc" + o = append(o, 0xa3, 0x72, 0x74, 0x63) + o = msgp.AppendUint64(o, z.RmvTagTotal) + // string "gc" + o = append(o, 0xa2, 0x67, 0x63) + o = msgp.AppendUint64(o, z.GetTotal) + // string "hc" + o = append(o, 0xa2, 0x68, 0x63) + o = msgp.AppendUint64(o, z.HeadTotal) + // string "ptf" + o = append(o, 0xa3, 0x70, 0x74, 0x66) + o = msgp.AppendUint64(o, z.PutTagFailedTotal) + // string "gtf" + o = append(o, 0xa3, 0x67, 0x74, 0x66) + o = msgp.AppendUint64(o, z.GetTagFailedTotal) + // string "rtf" + o = append(o, 0xa3, 0x72, 0x74, 0x66) + o = msgp.AppendUint64(o, z.RmvTagFailedTotal) + // string "gf" + o = append(o, 0xa2, 0x67, 0x66) + o = msgp.AppendUint64(o, z.GetFailedTotal) + // string "hf" + o = append(o, 0xa2, 0x68, 0x66) + o = msgp.AppendUint64(o, z.HeadFailedTotal) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *ProxyMetric) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "ptc": + z.PutTagTotal, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "PutTagTotal") + return + } + case "gtc": + z.GetTagTotal, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "GetTagTotal") + return + } + case "rtc": + z.RmvTagTotal, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "RmvTagTotal") + return + } + case "gc": + z.GetTotal, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "GetTotal") + return + } + case "hc": + z.HeadTotal, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "HeadTotal") + return + } + case "ptf": + z.PutTagFailedTotal, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "PutTagFailedTotal") + return + } + case "gtf": + z.GetTagFailedTotal, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "GetTagFailedTotal") + return + } + case "rtf": + z.RmvTagFailedTotal, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "RmvTagFailedTotal") + return + } + case "gf": + z.GetFailedTotal, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "GetFailedTotal") + return + } + case "hf": + z.HeadFailedTotal, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "HeadFailedTotal") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *ProxyMetric) Msgsize() (s int) { + s = 1 + 4 + msgp.Uint64Size + 4 + msgp.Uint64Size + 4 + msgp.Uint64Size + 3 + msgp.Uint64Size + 3 + msgp.Uint64Size + 4 + msgp.Uint64Size + 4 + msgp.Uint64Size + 4 + msgp.Uint64Size + 3 + msgp.Uint64Size + 3 + msgp.Uint64Size + return +} + // DecodeMsg implements msgp.Decodable func (z *QStat) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte diff --git a/cmd/bucket-replication-metrics_gen_test.go b/cmd/bucket-replication-metrics_gen_test.go index 3401d7dba..629649e76 100644 --- a/cmd/bucket-replication-metrics_gen_test.go +++ b/cmd/bucket-replication-metrics_gen_test.go @@ -348,6 +348,119 @@ func BenchmarkDecodeInQueueStats(b *testing.B) { } } +func TestMarshalUnmarshalProxyMetric(t *testing.T) { + v := ProxyMetric{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgProxyMetric(b *testing.B) { + v := ProxyMetric{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgProxyMetric(b *testing.B) { + v := ProxyMetric{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalProxyMetric(b *testing.B) { + v := ProxyMetric{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeProxyMetric(t *testing.T) { + v := ProxyMetric{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeProxyMetric Msgsize() is inaccurate") + } + + vn := ProxyMetric{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeProxyMetric(b *testing.B) { + v := ProxyMetric{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeProxyMetric(b *testing.B) { + v := ProxyMetric{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalQStat(t *testing.T) { v := QStat{} bts, err := v.MarshalMsg(nil) diff --git a/cmd/bucket-replication-stats.go b/cmd/bucket-replication-stats.go index ca65bb58e..8e40fc28f 100644 --- a/cmd/bucket-replication-stats.go +++ b/cmd/bucket-replication-stats.go @@ -45,6 +45,8 @@ type ReplicationStats struct { workers *ActiveWorkerStat // queue stats cache qCache queueCache + + pCache proxyStatsCache // mrf backlog stats mrfStats ReplicationMRFStats // for bucket replication, continue to use existing cache @@ -305,6 +307,7 @@ func (r *ReplicationStats) getSRMetricsForNode() SRMetricsSummary { Queued: r.qCache.getSiteStats(), ActiveWorkers: r.ActiveWorkers(), Metrics: r.srStats.get(), + Proxied: r.pCache.getSiteStats(), ReplicaSize: atomic.LoadInt64(&r.srStats.ReplicaSize), ReplicaCount: atomic.LoadInt64(&r.srStats.ReplicaCount), } @@ -333,6 +336,7 @@ func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *Replicatio rs := ReplicationStats{ Cache: make(map[string]*BucketReplicationStats), qCache: newQueueCache(r), + pCache: newProxyStatsCache(), srStats: newSRStats(), movingAvgTicker: time.NewTicker(2 * time.Second), wTimer: time.NewTicker(2 * time.Second), @@ -371,6 +375,7 @@ func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, bucket Stats: make(map[string]*BucketReplicationStat), }, QueueStats: ReplicationQueueStats{}, + ProxyStats: ProxyMetric{}, } return bs } @@ -430,11 +435,16 @@ func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, bucket for _, bs := range bucketStats { qs.Nodes = append(qs.Nodes, bs.QueueStats.Nodes...) } - qs.Uptime = UTCNow().Unix() - globalBootTime.Unix() + + var ps ProxyMetric + for _, bs := range bucketStats { + ps.add(bs.ProxyStats) + } bs = BucketStats{ ReplicationStats: s, QueueStats: qs, + ProxyStats: ps, } r.mostRecentStatsMu.Lock() if len(r.mostRecentStats.Stats) == 0 { @@ -482,3 +492,12 @@ func (r *ReplicationStats) decQ(bucket string, sz int64, isDelMarker bool, opTyp atomic.AddInt64(&r.qCache.srQueueStats.nowBytes, -1*sz) atomic.AddInt64(&r.qCache.srQueueStats.nowCount, -1) } + +// incProxy increments proxy metrics for proxied calls +func (r *ReplicationStats) incProxy(bucket string, api replProxyAPI, isErr bool) { + r.pCache.inc(bucket, api, isErr) +} + +func (r *ReplicationStats) getProxyStats(bucket string) ProxyMetric { + return r.pCache.getBucketStats(bucket) +} diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 566e42323..fe987a202 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -2235,6 +2235,7 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, rs *HTTPRa if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header return nil, oi, proxy } + var perr error for _, t := range proxyTargets.Targets { tgt = globalBucketTargetSys.GetRemoteTargetClient(bucket, t.Arn) if tgt == nil || globalBucketTargetSys.isOffline(tgt.EndpointURL()) { @@ -2264,6 +2265,7 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, rs *HTTPRa objInfo, err := tgt.StatObject(ctx, t.TargetBucket, object, gopts) if err != nil { + perr = err if isErrInvalidRange(ErrorRespToObjectError(err, bucket, object)) { return nil, oi, proxyResult{Err: err} } @@ -2300,6 +2302,7 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, rs *HTTPRa } return tgt, oi, proxyResult{Proxy: true} } + proxy.Err = perr return nil, oi, proxy } diff --git a/cmd/bucket-stats.go b/cmd/bucket-stats.go index 7c4b94e36..d691af93b 100644 --- a/cmd/bucket-stats.go +++ b/cmd/bucket-stats.go @@ -154,6 +154,7 @@ type BucketStats struct { Uptime int64 `json:"uptime"` ReplicationStats BucketReplicationStats `json:"currStats"` // current replication stats since cluster startup QueueStats ReplicationQueueStats `json:"queueStats"` // replication queue stats + ProxyStats ProxyMetric `json:"proxyStats"` } // BucketReplicationStats represents inline replication statistics diff --git a/cmd/bucket-stats_gen.go b/cmd/bucket-stats_gen.go index 85997dcf6..1fca700f1 100644 --- a/cmd/bucket-stats_gen.go +++ b/cmd/bucket-stats_gen.go @@ -1142,6 +1142,12 @@ func (z *BucketStats) DecodeMsg(dc *msgp.Reader) (err error) { } } } + case "ProxyStats": + err = z.ProxyStats.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "ProxyStats") + return + } default: err = dc.Skip() if err != nil { @@ -1155,9 +1161,9 @@ func (z *BucketStats) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *BucketStats) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 3 + // map header, size 4 // write "Uptime" - err = en.Append(0x83, 0xa6, 0x55, 0x70, 0x74, 0x69, 0x6d, 0x65) + err = en.Append(0x84, 0xa6, 0x55, 0x70, 0x74, 0x69, 0x6d, 0x65) if err != nil { return } @@ -1209,15 +1215,25 @@ func (z *BucketStats) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "QueueStats", "Uptime") return } + // write "ProxyStats" + err = en.Append(0xaa, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x53, 0x74, 0x61, 0x74, 0x73) + if err != nil { + return + } + err = z.ProxyStats.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "ProxyStats") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *BucketStats) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 3 + // map header, size 4 // string "Uptime" - o = append(o, 0x83, 0xa6, 0x55, 0x70, 0x74, 0x69, 0x6d, 0x65) + o = append(o, 0x84, 0xa6, 0x55, 0x70, 0x74, 0x69, 0x6d, 0x65) o = msgp.AppendInt64(o, z.Uptime) // string "ReplicationStats" o = append(o, 0xb0, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73) @@ -1242,6 +1258,13 @@ func (z *BucketStats) MarshalMsg(b []byte) (o []byte, err error) { // string "Uptime" o = append(o, 0xa6, 0x55, 0x70, 0x74, 0x69, 0x6d, 0x65) o = msgp.AppendInt64(o, z.QueueStats.Uptime) + // string "ProxyStats" + o = append(o, 0xaa, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x53, 0x74, 0x61, 0x74, 0x73) + o, err = z.ProxyStats.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "ProxyStats") + return + } return } @@ -1323,6 +1346,12 @@ func (z *BucketStats) UnmarshalMsg(bts []byte) (o []byte, err error) { } } } + case "ProxyStats": + bts, err = z.ProxyStats.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "ProxyStats") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -1341,7 +1370,7 @@ func (z *BucketStats) Msgsize() (s int) { for za0001 := range z.QueueStats.Nodes { s += z.QueueStats.Nodes[za0001].Msgsize() } - s += 7 + msgp.Int64Size + s += 7 + msgp.Int64Size + 11 + z.ProxyStats.Msgsize() return } diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index ce9418d07..3a1f2f06f 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -225,10 +225,20 @@ const ( linkOfflineDuration MetricName = "link_offline_duration_seconds" linkDowntimeTotalDuration MetricName = "link_downtime_duration_seconds" - avgInQueueCount MetricName = "average_queued_count" - avgInQueueBytes MetricName = "average_queued_bytes" - maxInQueueCount MetricName = "max_queued_count" - maxInQueueBytes MetricName = "max_queued_bytes" + avgInQueueCount MetricName = "average_queued_count" + avgInQueueBytes MetricName = "average_queued_bytes" + maxInQueueCount MetricName = "max_queued_count" + maxInQueueBytes MetricName = "max_queued_bytes" + proxiedGetRequestsTotal MetricName = "proxied_get_requests_total" + proxiedHeadRequestsTotal MetricName = "proxied_head_requests_total" + proxiedPutTaggingRequestsTotal MetricName = "proxied_put_tagging_requests_total" + proxiedGetTaggingRequestsTotal MetricName = "proxied_get_tagging_requests_total" + proxiedDeleteTaggingRequestsTotal MetricName = "proxied_delete_tagging_requests_total" + proxiedGetRequestsFailures MetricName = "proxied_get_requests_failures" + proxiedHeadRequestsFailures MetricName = "proxied_head_requests_failures" + proxiedPutTaggingRequestFailures MetricName = "proxied_put_tagging_requests_failures" + proxiedGetTaggingRequestFailures MetricName = "proxied_get_tagging_requests_failures" + proxiedDeleteTaggingRequestFailures MetricName = "proxied_delete_tagging_requests_failures" freeBytes MetricName = "free_bytes" readBytes MetricName = "read_bytes" @@ -1124,6 +1134,106 @@ func getClusterReplMaxTransferRateMD() MetricDescription { } } +func getClusterReplProxiedGetOperationsMD(ns MetricNamespace) MetricDescription { + return MetricDescription{ + Namespace: ns, + Subsystem: replicationSubsystem, + Name: proxiedGetRequestsTotal, + Help: "Number of GET requests proxied to replication target", + Type: counterMetric, + } +} + +func getClusterReplProxiedHeadOperationsMD(ns MetricNamespace) MetricDescription { + return MetricDescription{ + Namespace: ns, + Subsystem: replicationSubsystem, + Name: proxiedHeadRequestsTotal, + Help: "Number of HEAD requests proxied to replication target", + Type: counterMetric, + } +} + +func getClusterReplProxiedPutTaggingOperationsMD(ns MetricNamespace) MetricDescription { + return MetricDescription{ + Namespace: ns, + Subsystem: replicationSubsystem, + Name: proxiedPutTaggingRequestsTotal, + Help: "Number of PUT tagging requests proxied to replication target", + Type: counterMetric, + } +} + +func getClusterReplProxiedGetTaggingOperationsMD(ns MetricNamespace) MetricDescription { + return MetricDescription{ + Namespace: ns, + Subsystem: replicationSubsystem, + Name: proxiedGetTaggingRequestsTotal, + Help: "Number of GET tagging requests proxied to replication target", + Type: counterMetric, + } +} + +func getClusterReplProxiedRmvTaggingOperationsMD(ns MetricNamespace) MetricDescription { + return MetricDescription{ + Namespace: ns, + Subsystem: replicationSubsystem, + Name: proxiedDeleteTaggingRequestsTotal, + Help: "Number of DELETE tagging requests proxied to replication target", + Type: counterMetric, + } +} + +func getClusterReplProxiedGetFailedOperationsMD(ns MetricNamespace) MetricDescription { + return MetricDescription{ + Namespace: ns, + Subsystem: replicationSubsystem, + Name: proxiedGetRequestsFailures, + Help: "Number of failures in GET requests proxied to replication target", + Type: counterMetric, + } +} + +func getClusterReplProxiedHeadFailedOperationsMD(ns MetricNamespace) MetricDescription { + return MetricDescription{ + Namespace: ns, + Subsystem: replicationSubsystem, + Name: proxiedHeadRequestsFailures, + Help: "Number of failures in HEAD requests proxied to replication target", + Type: counterMetric, + } +} + +func getClusterReplProxiedPutTaggingFailedOperationsMD(ns MetricNamespace) MetricDescription { + return MetricDescription{ + Namespace: ns, + Subsystem: replicationSubsystem, + Name: proxiedPutTaggingRequestFailures, + Help: "Number of failures in PUT tagging proxy requests to replication target", + Type: counterMetric, + } +} + +func getClusterReplProxiedGetTaggingFailedOperationsMD(ns MetricNamespace) MetricDescription { + return MetricDescription{ + Namespace: ns, + Subsystem: replicationSubsystem, + Name: proxiedGetTaggingRequestFailures, + Help: "Number of failures in GET tagging proxy requests to replication target", + Type: counterMetric, + } +} + +func getClusterReplProxiedRmvTaggingFailedOperationsMD(ns MetricNamespace) MetricDescription { + return MetricDescription{ + Namespace: ns, + Subsystem: replicationSubsystem, + Name: proxiedDeleteTaggingRequestFailures, + Help: "Number of failures in DELETE tagging proxy requests to replication target", + Type: counterMetric, + } +} + func getBucketObjectDistributionMD() MetricDescription { return MetricDescription{ Namespace: bucketMetricNamespace, @@ -2381,6 +2491,46 @@ func getReplicationSiteMetrics(opts MetricsGroupOpts) *MetricsGroup { }) } } + ml = append(ml, Metric{ + Description: getClusterReplProxiedGetOperationsMD(clusterMetricNamespace), + Value: float64(m.Proxied.GetTotal), + }) + ml = append(ml, Metric{ + Description: getClusterReplProxiedHeadOperationsMD(clusterMetricNamespace), + Value: float64(m.Proxied.HeadTotal), + }) + ml = append(ml, Metric{ + Description: getClusterReplProxiedPutTaggingOperationsMD(clusterMetricNamespace), + Value: float64(m.Proxied.PutTagTotal), + }) + ml = append(ml, Metric{ + Description: getClusterReplProxiedGetTaggingOperationsMD(clusterMetricNamespace), + Value: float64(m.Proxied.GetTagTotal), + }) + ml = append(ml, Metric{ + Description: getClusterReplProxiedRmvTaggingOperationsMD(clusterMetricNamespace), + Value: float64(m.Proxied.RmvTagTotal), + }) + ml = append(ml, Metric{ + Description: getClusterReplProxiedGetFailedOperationsMD(clusterMetricNamespace), + Value: float64(m.Proxied.GetFailedTotal), + }) + ml = append(ml, Metric{ + Description: getClusterReplProxiedHeadFailedOperationsMD(clusterMetricNamespace), + Value: float64(m.Proxied.HeadFailedTotal), + }) + ml = append(ml, Metric{ + Description: getClusterReplProxiedPutTaggingFailedOperationsMD(clusterMetricNamespace), + Value: float64(m.Proxied.PutTagFailedTotal), + }) + ml = append(ml, Metric{ + Description: getClusterReplProxiedGetTaggingFailedOperationsMD(clusterMetricNamespace), + Value: float64(m.Proxied.GetTagFailedTotal), + }) + ml = append(ml, Metric{ + Description: getClusterReplProxiedRmvTaggingFailedOperationsMD(clusterMetricNamespace), + Value: float64(m.Proxied.RmvTagFailedTotal), + }) } return ml @@ -3098,6 +3248,51 @@ func getBucketUsageMetrics(opts MetricsGroupOpts) *MetricsGroup { Value: float64(stats.ReplicaCount), VariableLabels: map[string]string{"bucket": bucket}, }) + metrics = append(metrics, Metric{ + Description: getClusterReplProxiedGetOperationsMD(bucketMetricNamespace), + Value: float64(s.ProxyStats.GetTotal), + VariableLabels: map[string]string{"bucket": bucket}, + }) + metrics = append(metrics, Metric{ + Description: getClusterReplProxiedHeadOperationsMD(bucketMetricNamespace), + Value: float64(s.ProxyStats.HeadTotal), + VariableLabels: map[string]string{"bucket": bucket}, + }) + metrics = append(metrics, Metric{ + Description: getClusterReplProxiedPutTaggingOperationsMD(bucketMetricNamespace), + Value: float64(s.ProxyStats.PutTagTotal), + VariableLabels: map[string]string{"bucket": bucket}, + }) + metrics = append(metrics, Metric{ + Description: getClusterReplProxiedGetTaggingOperationsMD(bucketMetricNamespace), + Value: float64(s.ProxyStats.GetTagTotal), + VariableLabels: map[string]string{"bucket": bucket}, + }) + metrics = append(metrics, Metric{ + Description: getClusterReplProxiedRmvTaggingOperationsMD(bucketMetricNamespace), + Value: float64(s.ProxyStats.RmvTagTotal), + VariableLabels: map[string]string{"bucket": bucket}, + }) + metrics = append(metrics, Metric{ + Description: getClusterReplProxiedGetFailedOperationsMD(bucketMetricNamespace), + Value: float64(s.ProxyStats.GetFailedTotal), + }) + metrics = append(metrics, Metric{ + Description: getClusterReplProxiedHeadFailedOperationsMD(bucketMetricNamespace), + Value: float64(s.ProxyStats.HeadFailedTotal), + }) + metrics = append(metrics, Metric{ + Description: getClusterReplProxiedPutTaggingFailedOperationsMD(bucketMetricNamespace), + Value: float64(s.ProxyStats.PutTagFailedTotal), + }) + metrics = append(metrics, Metric{ + Description: getClusterReplProxiedGetTaggingFailedOperationsMD(bucketMetricNamespace), + Value: float64(s.ProxyStats.GetTagFailedTotal), + }) + metrics = append(metrics, Metric{ + Description: getClusterReplProxiedRmvTaggingFailedOperationsMD(clusterMetricNamespace), + Value: float64(s.ProxyStats.RmvTagFailedTotal), + }) } if stats.hasReplicationUsage() { for arn, stat := range stats.Stats { diff --git a/cmd/notification.go b/cmd/notification.go index 2dfb2ed22..4585c2806 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -572,6 +572,7 @@ func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []Buck for k, replicationStats := range replicationStatsList { bucketStatsMap.Stats[k] = BucketStats{ ReplicationStats: replicationStats, + ProxyStats: globalReplicationStats.getProxyStats(k), } } @@ -607,6 +608,7 @@ func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketNam bucketStats = append(bucketStats, BucketStats{ ReplicationStats: globalReplicationStats.Get(bucketName), QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{globalReplicationStats.getNodeQueueStats(bucketName)}}, + ProxyStats: globalReplicationStats.getProxyStats(bucketName), }) return bucketStats } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index b0794af3d..0303a94ab 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -490,9 +490,11 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj ) proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { + globalReplicationStats.incProxy(bucket, getObjectAPI, false) // proxy to replication target if active-active replication is in place. reader, proxy, perr = proxyGetToReplicationTarget(ctx, bucket, object, rs, r.Header, opts, proxytgts) if perr != nil { + globalReplicationStats.incProxy(bucket, getObjectAPI, true) proxyGetErr := ErrorRespToObjectError(perr, bucket, object) if !isErrBucketNotFound(proxyGetErr) && !isErrObjectNotFound(proxyGetErr) && !isErrVersionNotFound(proxyGetErr) && !isErrPreconditionFailed(proxyGetErr) && !isErrInvalidRange(proxyGetErr) { @@ -1018,12 +1020,14 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob // proxy HEAD to replication target if active-active replication configured on bucket proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { + globalReplicationStats.incProxy(bucket, headObjectAPI, false) var oi ObjectInfo oi, proxy = proxyHeadToReplicationTarget(ctx, bucket, object, rs, opts, proxytgts) if proxy.Proxy { objInfo = oi } if proxy.Err != nil { + globalReplicationStats.incProxy(bucket, headObjectAPI, true) writeErrorResponseHeadersOnly(w, toAPIError(ctx, proxy.Err)) return } @@ -3310,9 +3314,11 @@ func (api objectAPIHandlers) GetObjectTaggingHandler(w http.ResponseWriter, r *h if isErrObjectNotFound(err) || isErrVersionNotFound(err) { proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { + globalReplicationStats.incProxy(bucket, getObjectTaggingAPI, false) // proxy to replication target if site replication is in place. tags, gerr := proxyGetTaggingToRepTarget(ctx, bucket, object, opts, proxytgts) if gerr.Err != nil { + globalReplicationStats.incProxy(bucket, getObjectTaggingAPI, true) writeErrorResponse(ctx, w, toAPIError(ctx, gerr.Err), r.URL) return } // overlay tags from peer site. @@ -3411,9 +3417,11 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h if isErrObjectNotFound(err) || isErrVersionNotFound(err) { proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { + globalReplicationStats.incProxy(bucket, putObjectTaggingAPI, false) // proxy to replication target if site replication is in place. perr := proxyTaggingToRepTarget(ctx, bucket, object, tags, opts, proxytgts) if perr.Err != nil { + globalReplicationStats.incProxy(bucket, putObjectTaggingAPI, true) writeErrorResponse(ctx, w, toAPIError(ctx, perr.Err), r.URL) return } @@ -3506,9 +3514,11 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r if isErrObjectNotFound(err) || isErrVersionNotFound(err) { proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { + globalReplicationStats.incProxy(bucket, removeObjectTaggingAPI, false) // proxy to replication target if active-active replication is in place. perr := proxyTaggingToRepTarget(ctx, bucket, object, nil, opts, proxytgts) if perr.Err != nil { + globalReplicationStats.incProxy(bucket, removeObjectTaggingAPI, true) writeErrorResponse(ctx, w, toAPIError(ctx, perr.Err), r.URL) return } diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index b94acda62..4ccc341d1 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -557,6 +557,7 @@ func (s *peerRESTServer) GetAllBucketStatsHandler(w http.ResponseWriter, r *http for k, v := range replicationStats { bucketStatsMap[k] = BucketStats{ ReplicationStats: v, + ProxyStats: globalReplicationStats.getProxyStats(k), } } logger.LogIf(r.Context(), msgp.Encode(w, &BucketStatsMap{Stats: bucketStatsMap, Timestamp: UTCNow()})) @@ -580,6 +581,7 @@ func (s *peerRESTServer) GetBucketStatsHandler(w http.ResponseWriter, r *http.Re bs := BucketStats{ ReplicationStats: globalReplicationStats.Get(bucketName), QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{globalReplicationStats.getNodeQueueStats(bucketName)}}, + ProxyStats: globalReplicationStats.getProxyStats(bucketName), } logger.LogIf(r.Context(), msgp.Encode(w, &bs)) } diff --git a/cmd/site-replication-metrics.go b/cmd/site-replication-metrics.go index 3f1d61d94..4dd3b8d3b 100644 --- a/cmd/site-replication-metrics.go +++ b/cmd/site-replication-metrics.go @@ -282,6 +282,8 @@ type SRMetricsSummary struct { ReplicaCount int64 `json:"replicaCount"` // Queued operations Queued InQueueMetric `json:"queued"` + // Proxy stats + Proxied ProxyMetric `json:"proxied"` // replication metrics summary for each site replication peer Metrics map[string]SRMetric `json:"replMetrics"` // uptime of node being queried for site replication metrics diff --git a/cmd/site-replication-metrics_gen.go b/cmd/site-replication-metrics_gen.go index d18d6e2fd..7cba78109 100644 --- a/cmd/site-replication-metrics_gen.go +++ b/cmd/site-replication-metrics_gen.go @@ -823,6 +823,12 @@ func (z *SRMetricsSummary) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Queued") return } + case "Proxied": + err = z.Proxied.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Proxied") + return + } case "Metrics": var zb0002 uint32 zb0002, err = dc.ReadMapHeader() @@ -872,9 +878,9 @@ func (z *SRMetricsSummary) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *SRMetricsSummary) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 6 + // map header, size 7 // write "ActiveWorkers" - err = en.Append(0x86, 0xad, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73) + err = en.Append(0x87, 0xad, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73) if err != nil { return } @@ -913,6 +919,16 @@ func (z *SRMetricsSummary) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Queued") return } + // write "Proxied" + err = en.Append(0xa7, 0x50, 0x72, 0x6f, 0x78, 0x69, 0x65, 0x64) + if err != nil { + return + } + err = z.Proxied.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Proxied") + return + } // write "Metrics" err = en.Append(0xa7, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73) if err != nil { @@ -951,9 +967,9 @@ func (z *SRMetricsSummary) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *SRMetricsSummary) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 6 + // map header, size 7 // string "ActiveWorkers" - o = append(o, 0x86, 0xad, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73) + o = append(o, 0x87, 0xad, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73) o, err = z.ActiveWorkers.MarshalMsg(o) if err != nil { err = msgp.WrapError(err, "ActiveWorkers") @@ -972,6 +988,13 @@ func (z *SRMetricsSummary) MarshalMsg(b []byte) (o []byte, err error) { err = msgp.WrapError(err, "Queued") return } + // string "Proxied" + o = append(o, 0xa7, 0x50, 0x72, 0x6f, 0x78, 0x69, 0x65, 0x64) + o, err = z.Proxied.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Proxied") + return + } // string "Metrics" o = append(o, 0xa7, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73) o = msgp.AppendMapHeader(o, uint32(len(z.Metrics))) @@ -1031,6 +1054,12 @@ func (z *SRMetricsSummary) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Queued") return } + case "Proxied": + bts, err = z.Proxied.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Proxied") + return + } case "Metrics": var zb0002 uint32 zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) @@ -1081,7 +1110,7 @@ func (z *SRMetricsSummary) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *SRMetricsSummary) Msgsize() (s int) { - s = 1 + 14 + z.ActiveWorkers.Msgsize() + 12 + msgp.Int64Size + 13 + msgp.Int64Size + 7 + z.Queued.Msgsize() + 8 + msgp.MapHeaderSize + s = 1 + 14 + z.ActiveWorkers.Msgsize() + 12 + msgp.Int64Size + 13 + msgp.Int64Size + 7 + z.Queued.Msgsize() + 8 + z.Proxied.Msgsize() + 8 + msgp.MapHeaderSize if z.Metrics != nil { for za0001, za0002 := range z.Metrics { _ = za0002 diff --git a/cmd/site-replication.go b/cmd/site-replication.go index ef9a84670..55e9dd5c0 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -6016,6 +6016,7 @@ func (c *SiteReplicationSys) getSiteMetrics(ctx context.Context) (madmin.SRMetri } sm.ReplicaCount += peer.ReplicaCount sm.ReplicaSize += peer.ReplicaSize + sm.Proxied.Add(madmin.ReplProxyMetric(peer.Proxied)) for dID, v := range peer.Metrics { v2, ok := sm.Metrics[dID] if !ok { diff --git a/docs/metrics/prometheus/list.md b/docs/metrics/prometheus/list.md index 8ae8cb618..2ceed69b7 100644 --- a/docs/metrics/prometheus/list.md +++ b/docs/metrics/prometheus/list.md @@ -109,6 +109,17 @@ For deployments with [bucket](https://min.io/docs/minio/linux/administration/buc | `minio_cluster_replication_sent_bytes` | (_Site Replication Only_) Total number of bytes replicated to the target cluster. | | `minio_cluster_replication_sent_count` | (_Site Replication Only_) Total number of objects replicated to the target cluster. | | `minio_cluster_replication_credential_errors` | (_Site Replication Only_) Total number of replication credential errors since server start | +| `minio_cluster_replication_proxied_get_requests_total` | (_Site Replication Only_)Number of GET requests proxied to replication target | +| `minio_cluster_replication_proxied_head_requests_total` | (_Site Replication Only_)Number of HEAD requests proxied to replication target | +| `minio_cluster_replication_proxied_delete_tagging_requests_total` | (_Site Replication Only_)Number of DELETE tagging requests proxied to replication target | +| `minio_cluster_replication_proxied_get_tagging_requests_total` | (_Site Replication Only_)Number of GET tagging requests proxied to replication target | +| `minio_cluster_replication_proxied_put_tagging_requests_total` | (_Site Replication Only_)Number of PUT tagging requests proxied to replication target | +| `minio_cluster_replication_proxied_get_requests_failures` | (_Site Replication Only_)Number of failures in GET requests proxied to replication target | +| `minio_cluster_replication_proxied_head_requests_failures` | (_Site Replication Only_)Number of failures in HEAD requests proxied to replication target | +| `minio_cluster_replication_proxied_delete_tagging_requests_failures` | (_Site Replication Only_)Number of failures proxying DELETE tagging requests to replication target | +| `minio_cluster_replication_proxied_get_tagging_requests_failures` | (_Site Replication Only_)Number of failures proxying GET tagging requests to replication target | +| `minio_cluster_replication_proxied_put_tagging_requests_failures` | (_Site Replication Only_)Number of failures proxying PUT tagging requests to replication target | + ## Healing Metrics @@ -290,6 +301,16 @@ For deployments with [Site Replication](https://min.io/docs/minio/linux/operatio | `minio_bucket_replication_sent_bytes` | Total number of bytes replicated to the target bucket. | | `minio_bucket_replication_sent_count` | Total number of objects replicated to the target bucket. | | `minio_bucket_replication_credential_errors` | Total number of replication credential errors since server start | +| `minio_bucket_replication_proxied_get_requests_total` | Number of GET requests proxied to replication target | +| `minio_bucket_replication_proxied_head_requests_total` | Number of HEAD requests proxied to replication target | +| `minio_bucket_replication_proxied_delete_tagging_requests_total` | Number of DELETE tagging requests proxied to replication target | +| `minio_bucket_replication_proxied_get_tagging_requests_total` | Number of GET tagging requests proxied to replication target | +| `minio_bucket_replication_proxied_put_tagging_requests_total` | Number of PUT tagging requests proxied to replication target | +| `minio_bucket_replication_proxied_get_requests_failures` | Number of failures in GET requests proxied to replication target | +| `minio_bucket_replication_proxied_head_requests_failures` | Number of failures in HEAD requests proxied to replication target | +| `minio_bucket_replication_proxied_delete_tagging_requests_failures` | Number of failures in DELETE tagging proxy requests to replication target | +| `minio_bucket_replication_proxied_get_tagging_requests_failures` |Number of failures in GET tagging proxy requests to replication target | +| `minio_bucket_replication_proxied_put_tagging_requests_failures` | Number of failures in PUT tagging proxy requests to replication target | ## Traffic Metrics diff --git a/go.mod b/go.mod index 7e097b3ea..99a386665 100644 --- a/go.mod +++ b/go.mod @@ -260,3 +260,5 @@ require ( gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect ) + +replace github.com/minio/madmin-go/v3 => github.com/poornas/madmin-go/v3 v3.0.0-20240205194748-c24ddca6b68a diff --git a/go.sum b/go.sum index 8a5a9ab4d..3db13e7b5 100644 --- a/go.sum +++ b/go.sum @@ -443,8 +443,6 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/minio/kes-go v0.2.0 h1:HA33arq9s3MErbsj3PAXFVfFo4U4yw7lTKQ5kWFrpCA= github.com/minio/kes-go v0.2.0/go.mod h1:VorHLaIYis9/MxAHAtXN4d8PUMNKhIxTIlvFt0hBOEo= -github.com/minio/madmin-go/v3 v3.0.43 h1:AkniczVEkBErQ94MikyPINGoOjFWhuP8xH5KmFpAaO0= -github.com/minio/madmin-go/v3 v3.0.43/go.mod h1:ZDF7kf5fhmxLhbGTqyq5efs4ao0v4eWf7nOuef/ljJs= github.com/minio/mc v0.0.0-20240129194012-12f446e1de57 h1:FO4a9XVuLcIS5s11efycWkBNrfIz4HtDQgUhR+xmLsQ= github.com/minio/mc v0.0.0-20240129194012-12f446e1de57/go.mod h1:MmDLdb7NWd/OYhcKcXKvwErq2GNa/Zq6xtTWuhdC4II= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= @@ -548,6 +546,8 @@ github.com/pkg/xattr v0.4.9 h1:5883YPCtkSd8LFbs13nXplj9g9tlrwoJRjgpgMu1/fE= github.com/pkg/xattr v0.4.9/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/poornas/madmin-go/v3 v3.0.0-20240205194748-c24ddca6b68a h1:AR4wv/f8sXMwPlM2SBN3pNkuluftGrEhInr+/WbBaso= +github.com/poornas/madmin-go/v3 v3.0.0-20240205194748-c24ddca6b68a/go.mod h1:ZDF7kf5fhmxLhbGTqyq5efs4ao0v4eWf7nOuef/ljJs= github.com/posener/complete v1.2.3 h1:NP0eAhjcjImqslEwo/1hq7gpajME0fTLTezBKDqfXqo= github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=