From 4caed7cc0dc09aa96a52e5b5384fe4152a6d6910 Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Wed, 17 Nov 2021 21:10:57 +0100 Subject: [PATCH] metrics: Add replication latency metrics (#13515) Add a new Prometheus metric for bucket replication latency e.g.: minio_bucket_replication_latency_ns{ bucket="testbucket", operation="upload", range="LESS_THAN_1_MiB", server="127.0.0.1:9001", targetArn="arn:minio:replication::45da043c-14f5-4da4-9316-aba5f77bf730:testbucket"} 2.2015663e+07 Co-authored-by: Klaus Post --- cmd/bucket-replication-stats.go | 28 ++- cmd/bucket-replication-utils.go | 1 + cmd/bucket-replication.go | 12 +- cmd/bucket-stats.go | 39 +++ cmd/bucket-stats_gen.go | 201 +++++++++++++++- cmd/bucket-stats_gen_test.go | 113 +++++++++ cmd/last-minute.go | 187 +++++++++++++++ cmd/last-minute_gen.go | 411 ++++++++++++++++++++++++++++++++ cmd/last-minute_gen_test.go | 236 ++++++++++++++++++ cmd/metrics-v2.go | 40 +++- cmd/metrics.go | 3 + 11 files changed, 1237 insertions(+), 34 deletions(-) create mode 100644 cmd/last-minute.go create mode 100644 cmd/last-minute_gen.go create mode 100644 cmd/last-minute_gen_test.go diff --git a/cmd/bucket-replication-stats.go b/cmd/bucket-replication-stats.go index e2131d8f2..a424a7f5f 100644 --- a/cmd/bucket-replication-stats.go +++ b/cmd/bucket-replication-stats.go @@ -20,7 +20,6 @@ package cmd import ( "context" "sync" - "sync/atomic" "time" "github.com/minio/minio/internal/bucket/replication" @@ -68,16 +67,18 @@ func (r *ReplicationStats) UpdateReplicaStat(bucket string, n int64) { if !ok { bs = &BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)} } - atomic.AddInt64(&bs.ReplicaSize, n) + bs.ReplicaSize += n r.Cache[bucket] = bs } // Update updates in-memory replication statistics with new values. -func (r *ReplicationStats) Update(bucket string, arn string, n int64, status, prevStatus replication.StatusType, opType replication.Type) { +func (r *ReplicationStats) Update(bucket string, arn string, n int64, duration time.Duration, status, prevStatus replication.StatusType, opType replication.Type) { if r == nil { return } - r.RLock() + r.Lock() + defer r.Unlock() + bs, ok := r.Cache[bucket] if !ok { bs = &BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)} @@ -86,36 +87,37 @@ func (r *ReplicationStats) Update(bucket string, arn string, n int64, status, pr if !ok { b = &BucketReplicationStat{} } - r.RUnlock() switch status { case replication.Completed: switch prevStatus { // adjust counters based on previous state case replication.Failed: - atomic.AddInt64(&b.FailedCount, -1) + b.FailedCount-- } if opType == replication.ObjectReplicationType { - atomic.AddInt64(&b.ReplicatedSize, n) + b.ReplicatedSize += n switch prevStatus { case replication.Failed: - atomic.AddInt64(&b.FailedSize, -1*n) + b.FailedSize -= n + } + if duration > 0 { + b.Latency.update(n, duration) } } case replication.Failed: if opType == replication.ObjectReplicationType { if prevStatus == replication.Pending { - atomic.AddInt64(&b.FailedSize, n) - atomic.AddInt64(&b.FailedCount, 1) + b.FailedSize += n + b.FailedCount++ } } case replication.Replica: if opType == replication.ObjectReplicationType { - atomic.AddInt64(&b.ReplicaSize, n) + b.ReplicaSize += n } } - r.Lock() + bs.Stats[arn] = b r.Cache[bucket] = bs - r.Unlock() } // GetInitialUsage get replication metrics available at the time of cluster initialization diff --git a/cmd/bucket-replication-utils.go b/cmd/bucket-replication-utils.go index 2557762e0..b3f71ca91 100644 --- a/cmd/bucket-replication-utils.go +++ b/cmd/bucket-replication-utils.go @@ -34,6 +34,7 @@ import ( type replicatedTargetInfo struct { Arn string Size int64 + Duration time.Duration ReplicationAction replicationAction // full or metadata only OpType replication.Type // whether incoming replication, existing object, healing etc.. ReplicationStatus replication.StatusType diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 6b3d73427..f49271a96 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -433,7 +433,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj // to decrement pending count later. for _, rinfo := range rinfos.Targets { if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus { - globalReplicationStats.Update(dobj.Bucket, rinfo.Arn, 0, replicationStatus, + globalReplicationStats.Update(dobj.Bucket, rinfo.Arn, 0, 0, replicationStatus, prevStatus, replication.DeleteReplicationType) } } @@ -938,7 +938,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje } for _, rinfo := range rinfos.Targets { if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus { - globalReplicationStats.Update(bucket, rinfo.Arn, rinfo.Size, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus, opType) + globalReplicationStats.Update(bucket, rinfo.Arn, rinfo.Size, rinfo.Duration, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus, opType) } } } @@ -963,6 +963,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje // replicateObjectToTarget replicates the specified version of the object to destination bucket // The source object is then updated to reflect the replication status. func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) { + startTime := time.Now() objInfo := ri.ObjectInfo.Clone() bucket := objInfo.Bucket object := objInfo.Name @@ -1100,6 +1101,7 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID) rinfo.ReplicationResynced = true } + rinfo.Duration = time.Since(startTime) }() // use core client to avoid doing multipart on PUT c := &miniogo.Core{Client: tgt.Client} @@ -1634,7 +1636,7 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, } if sz, err := objInfo.GetActualSize(); err == nil { for arn := range dsc.targetsMap { - globalReplicationStats.Update(objInfo.Bucket, arn, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType) + globalReplicationStats.Update(objInfo.Bucket, arn, sz, 0, objInfo.ReplicationStatus, replication.StatusType(""), opType) } } } @@ -1642,10 +1644,10 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer) { globalReplicationPool.queueReplicaDeleteTask(dv) for arn := range dv.ReplicationState.Targets { - globalReplicationStats.Update(dv.Bucket, arn, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType) + globalReplicationStats.Update(dv.Bucket, arn, 0, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType) } for arn := range dv.ReplicationState.PurgeTargets { - globalReplicationStats.Update(dv.Bucket, arn, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType) + globalReplicationStats.Update(dv.Bucket, arn, 0, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType) } } diff --git a/cmd/bucket-stats.go b/cmd/bucket-stats.go index bf6d60b2e..9405d3cbe 100644 --- a/cmd/bucket-stats.go +++ b/cmd/bucket-stats.go @@ -19,10 +19,46 @@ package cmd import ( "sync/atomic" + "time" ) //go:generate msgp -file $GOFILE +// ReplicationLatency holds information of bucket operations latency, such us uploads +type ReplicationLatency struct { + // Single & Multipart PUTs latency + UploadHistogram LastMinuteLatencies +} + +// Merge two replication latency into a new one +func (rl ReplicationLatency) merge(other ReplicationLatency) (newReplLatency ReplicationLatency) { + newReplLatency.UploadHistogram = rl.UploadHistogram.Merge(other.UploadHistogram) + return +} + +// Get upload latency of each object size range +func (rl ReplicationLatency) getUploadLatency() (ret map[string]uint64) { + ret = make(map[string]uint64) + avg := rl.UploadHistogram.GetAvg() + for k, v := range avg { + // Convert nanoseconds to milliseconds + ret[sizeTagToString(k)] = v.avg() / uint64(time.Millisecond) + } + return +} + +// Update replication upload latency with a new value +func (rl *ReplicationLatency) update(size int64, duration time.Duration) { + rl.UploadHistogram.Add(size, duration) +} + +// Clone replication latency +func (rl ReplicationLatency) clone() ReplicationLatency { + return ReplicationLatency{ + UploadHistogram: rl.UploadHistogram.Clone(), + } +} + // BucketStats bucket statistics type BucketStats struct { ReplicationStats BucketReplicationStats @@ -65,6 +101,7 @@ func (brs BucketReplicationStats) Clone() BucketReplicationStats { FailedCount: atomic.LoadInt64(&st.FailedCount), PendingSize: atomic.LoadInt64(&st.PendingSize), PendingCount: atomic.LoadInt64(&st.PendingCount), + Latency: st.Latency.clone(), } } // update total counts across targets @@ -93,6 +130,8 @@ type BucketReplicationStat struct { PendingCount int64 `json:"pendingReplicationCount"` // Total number of failed operations including metadata updates FailedCount int64 `json:"failedReplicationCount"` + // Replication latency information + Latency ReplicationLatency `json:"replicationLatency"` } func (bs *BucketReplicationStat) hasReplicationUsage() bool { diff --git a/cmd/bucket-stats_gen.go b/cmd/bucket-stats_gen.go index c0bec1059..21bf53fe0 100644 --- a/cmd/bucket-stats_gen.go +++ b/cmd/bucket-stats_gen.go @@ -60,6 +60,35 @@ func (z *BucketReplicationStat) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "FailedCount") return } + case "Latency": + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Latency") + return + } + for zb0002 > 0 { + zb0002-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, "Latency") + return + } + switch msgp.UnsafeString(field) { + case "UploadHistogram": + err = z.Latency.UploadHistogram.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Latency", "UploadHistogram") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, "Latency") + return + } + } + } default: err = dc.Skip() if err != nil { @@ -73,9 +102,9 @@ func (z *BucketReplicationStat) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *BucketReplicationStat) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 6 + // map header, size 7 // write "PendingSize" - err = en.Append(0x86, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65) + err = en.Append(0x87, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65) if err != nil { return } @@ -134,15 +163,31 @@ func (z *BucketReplicationStat) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "FailedCount") return } + // write "Latency" + err = en.Append(0xa7, 0x4c, 0x61, 0x74, 0x65, 0x6e, 0x63, 0x79) + if err != nil { + return + } + // map header, size 1 + // write "UploadHistogram" + err = en.Append(0x81, 0xaf, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x67, 0x72, 0x61, 0x6d) + if err != nil { + return + } + err = z.Latency.UploadHistogram.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Latency", "UploadHistogram") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *BucketReplicationStat) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 6 + // map header, size 7 // string "PendingSize" - o = append(o, 0x86, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65) + o = append(o, 0x87, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65) o = msgp.AppendInt64(o, z.PendingSize) // string "ReplicatedSize" o = append(o, 0xae, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65) @@ -159,6 +204,16 @@ func (z *BucketReplicationStat) MarshalMsg(b []byte) (o []byte, err error) { // string "FailedCount" o = append(o, 0xab, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74) o = msgp.AppendInt64(o, z.FailedCount) + // string "Latency" + o = append(o, 0xa7, 0x4c, 0x61, 0x74, 0x65, 0x6e, 0x63, 0x79) + // map header, size 1 + // string "UploadHistogram" + o = append(o, 0x81, 0xaf, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x67, 0x72, 0x61, 0x6d) + o, err = z.Latency.UploadHistogram.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Latency", "UploadHistogram") + return + } return } @@ -216,6 +271,35 @@ func (z *BucketReplicationStat) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "FailedCount") return } + case "Latency": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Latency") + return + } + for zb0002 > 0 { + zb0002-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err, "Latency") + return + } + switch msgp.UnsafeString(field) { + case "UploadHistogram": + bts, err = z.Latency.UploadHistogram.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Latency", "UploadHistogram") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, "Latency") + return + } + } + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -230,7 +314,7 @@ func (z *BucketReplicationStat) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *BucketReplicationStat) Msgsize() (s int) { - s = 1 + 12 + msgp.Int64Size + 15 + msgp.Int64Size + 12 + msgp.Int64Size + 11 + msgp.Int64Size + 13 + msgp.Int64Size + 12 + msgp.Int64Size + s = 1 + 12 + msgp.Int64Size + 15 + msgp.Int64Size + 12 + msgp.Int64Size + 11 + msgp.Int64Size + 13 + msgp.Int64Size + 12 + msgp.Int64Size + 8 + 1 + 16 + z.Latency.UploadHistogram.Msgsize() return } @@ -707,3 +791,110 @@ func (z *BucketStats) Msgsize() (s int) { s = 1 + 17 + z.ReplicationStats.Msgsize() return } + +// DecodeMsg implements msgp.Decodable +func (z *ReplicationLatency) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "UploadHistogram": + err = z.UploadHistogram.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "UploadHistogram") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *ReplicationLatency) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 1 + // write "UploadHistogram" + err = en.Append(0x81, 0xaf, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x67, 0x72, 0x61, 0x6d) + if err != nil { + return + } + err = z.UploadHistogram.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "UploadHistogram") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *ReplicationLatency) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 1 + // string "UploadHistogram" + o = append(o, 0x81, 0xaf, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x67, 0x72, 0x61, 0x6d) + o, err = z.UploadHistogram.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "UploadHistogram") + return + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *ReplicationLatency) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "UploadHistogram": + bts, err = z.UploadHistogram.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "UploadHistogram") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *ReplicationLatency) Msgsize() (s int) { + s = 1 + 16 + z.UploadHistogram.Msgsize() + return +} diff --git a/cmd/bucket-stats_gen_test.go b/cmd/bucket-stats_gen_test.go index 33328d4f4..f50f1b964 100644 --- a/cmd/bucket-stats_gen_test.go +++ b/cmd/bucket-stats_gen_test.go @@ -347,3 +347,116 @@ func BenchmarkDecodeBucketStats(b *testing.B) { } } } + +func TestMarshalUnmarshalReplicationLatency(t *testing.T) { + v := ReplicationLatency{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgReplicationLatency(b *testing.B) { + v := ReplicationLatency{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgReplicationLatency(b *testing.B) { + v := ReplicationLatency{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalReplicationLatency(b *testing.B) { + v := ReplicationLatency{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeReplicationLatency(t *testing.T) { + v := ReplicationLatency{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeReplicationLatency Msgsize() is inaccurate") + } + + vn := ReplicationLatency{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeReplicationLatency(b *testing.B) { + v := ReplicationLatency{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeReplicationLatency(b *testing.B) { + v := ReplicationLatency{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/last-minute.go b/cmd/last-minute.go new file mode 100644 index 000000000..c6d5cc672 --- /dev/null +++ b/cmd/last-minute.go @@ -0,0 +1,187 @@ +// Copyright (c) 2015-2021 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "time" +) + +const ( + sizeLessThan1KiB = iota + sizeLessThan1MiB + sizeLessThan10MiB + sizeLessThan100MiB + sizeLessThan1GiB + sizeGreaterThan1GiB + // Add new entries here + + sizeLastElemMarker +) + +// sizeToTag converts a size to a tag. +func sizeToTag(size int64) int { + switch { + case size < 1024: + return sizeLessThan1KiB + case size < 1024*1024: + return sizeLessThan1MiB + case size < 10*1024*1024: + return sizeLessThan10MiB + case size < 100*1024*1024: + return sizeLessThan100MiB + case size < 1024*1024*1024: + return sizeLessThan1GiB + default: + return sizeGreaterThan1GiB + } +} + +func sizeTagToString(tag int) string { + switch tag { + case sizeLessThan1KiB: + return "LESS_THAN_1_KiB" + case sizeLessThan1MiB: + return "LESS_THAN_1_MiB" + case sizeLessThan10MiB: + return "LESS_THAN_10_MiB" + case sizeLessThan100MiB: + return "LESS_THAN_100_MiB" + case sizeLessThan1GiB: + return "LESS_THAN_1_GiB" + case sizeGreaterThan1GiB: + return "GREATER_THAN_1_GiB" + default: + return "unknown" + } +} + +// AccElem holds information for calculating an average value +type AccElem struct { + Total int64 + N int64 +} + +// add dur to a as a single element. +func (a *AccElem) add(dur time.Duration) { + a.Total += int64(dur) + a.N++ +} + +// merge b into a. +func (a *AccElem) merge(b AccElem) { + a.N += b.N + a.Total += b.Total +} + +// avg converts total to average. +func (a *AccElem) avg() uint64 { + if a.N >= 1 && a.Total > 0 { + return uint64(a.Total / a.N) + } + return 0 +} + +// LastMinuteLatencies keeps track of last minute latencies. +type LastMinuteLatencies struct { + Totals [60][sizeLastElemMarker]AccElem + LastSec int64 +} + +// Clone safely returns a copy for a LastMinuteLatencies structure +func (l *LastMinuteLatencies) Clone() LastMinuteLatencies { + n := LastMinuteLatencies{} + n.LastSec = l.LastSec + for i := range l.Totals { + for j := range l.Totals[i] { + n.Totals[i][j] = AccElem{ + Total: l.Totals[i][j].Total, + N: l.Totals[i][j].N, + } + } + } + return n +} + +// Merge safely merges two LastMinuteLatencies structures into one +func (l LastMinuteLatencies) Merge(o LastMinuteLatencies) (merged LastMinuteLatencies) { + cl := l.Clone() + co := o.Clone() + + if cl.LastSec > co.LastSec { + co.forwardTo(cl.LastSec) + merged.LastSec = cl.LastSec + } else { + cl.forwardTo(co.LastSec) + merged.LastSec = co.LastSec + } + + for i := range cl.Totals { + for j := range cl.Totals[i] { + merged.Totals[i][j] = AccElem{ + Total: cl.Totals[i][j].Total + co.Totals[i][j].Total, + N: cl.Totals[i][j].N + co.Totals[i][j].N, + } + } + } + return merged +} + +// Add latency t from object with the specified size. +func (l *LastMinuteLatencies) Add(size int64, t time.Duration) { + tag := sizeToTag(size) + + // Update... + sec := time.Now().Unix() + l.forwardTo(sec) + + winIdx := sec % 60 + l.Totals[winIdx][tag].add(t) + + l.LastSec = sec +} + +// GetAvg will return the average for each bucket from the last time minute. +// The number of objects is also included. +func (l *LastMinuteLatencies) GetAvg() [sizeLastElemMarker]AccElem { + var res [sizeLastElemMarker]AccElem + sec := time.Now().Unix() + l.forwardTo(sec) + for _, elems := range l.Totals[:] { + for j := range elems { + res[j].merge(elems[j]) + } + } + return res +} + +// forwardTo time t, clearing any entries in between. +func (l *LastMinuteLatencies) forwardTo(t int64) { + if l.LastSec >= t { + return + } + if t-l.LastSec >= 60 { + l.Totals = [60][sizeLastElemMarker]AccElem{} + return + } + for l.LastSec != t { + // Clear next element. + idx := (l.LastSec + 1) % 60 + l.Totals[idx] = [sizeLastElemMarker]AccElem{} + l.LastSec++ + } +} diff --git a/cmd/last-minute_gen.go b/cmd/last-minute_gen.go new file mode 100644 index 000000000..2cb1bdf2d --- /dev/null +++ b/cmd/last-minute_gen.go @@ -0,0 +1,411 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *AccElem) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Total": + z.Total, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "Total") + return + } + case "N": + z.N, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "N") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z AccElem) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 + // write "Total" + err = en.Append(0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) + if err != nil { + return + } + err = en.WriteInt64(z.Total) + if err != nil { + err = msgp.WrapError(err, "Total") + return + } + // write "N" + err = en.Append(0xa1, 0x4e) + if err != nil { + return + } + err = en.WriteInt64(z.N) + if err != nil { + err = msgp.WrapError(err, "N") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z AccElem) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 2 + // string "Total" + o = append(o, 0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) + o = msgp.AppendInt64(o, z.Total) + // string "N" + o = append(o, 0xa1, 0x4e) + o = msgp.AppendInt64(o, z.N) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *AccElem) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Total": + z.Total, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Total") + return + } + case "N": + z.N, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "N") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z AccElem) Msgsize() (s int) { + s = 1 + 6 + msgp.Int64Size + 2 + msgp.Int64Size + return +} + +// DecodeMsg implements msgp.Decodable +func (z *LastMinuteLatencies) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Totals": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Totals") + return + } + if zb0002 != uint32(60) { + err = msgp.ArrayError{Wanted: uint32(60), Got: zb0002} + return + } + for za0001 := range z.Totals { + var zb0003 uint32 + zb0003, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Totals", za0001) + return + } + if zb0003 != uint32(sizeLastElemMarker) { + err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0003} + return + } + for za0002 := range z.Totals[za0001] { + var zb0004 uint32 + zb0004, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Totals", za0001, za0002) + return + } + for zb0004 > 0 { + zb0004-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, "Totals", za0001, za0002) + return + } + switch msgp.UnsafeString(field) { + case "Total": + z.Totals[za0001][za0002].Total, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "Totals", za0001, za0002, "Total") + return + } + case "N": + z.Totals[za0001][za0002].N, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "Totals", za0001, za0002, "N") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, "Totals", za0001, za0002) + return + } + } + } + } + } + case "LastSec": + z.LastSec, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "LastSec") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *LastMinuteLatencies) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 + // write "Totals" + err = en.Append(0x82, 0xa6, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(60)) + if err != nil { + err = msgp.WrapError(err, "Totals") + return + } + for za0001 := range z.Totals { + err = en.WriteArrayHeader(uint32(sizeLastElemMarker)) + if err != nil { + err = msgp.WrapError(err, "Totals", za0001) + return + } + for za0002 := range z.Totals[za0001] { + // map header, size 2 + // write "Total" + err = en.Append(0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) + if err != nil { + return + } + err = en.WriteInt64(z.Totals[za0001][za0002].Total) + if err != nil { + err = msgp.WrapError(err, "Totals", za0001, za0002, "Total") + return + } + // write "N" + err = en.Append(0xa1, 0x4e) + if err != nil { + return + } + err = en.WriteInt64(z.Totals[za0001][za0002].N) + if err != nil { + err = msgp.WrapError(err, "Totals", za0001, za0002, "N") + return + } + } + } + // write "LastSec" + err = en.Append(0xa7, 0x4c, 0x61, 0x73, 0x74, 0x53, 0x65, 0x63) + if err != nil { + return + } + err = en.WriteInt64(z.LastSec) + if err != nil { + err = msgp.WrapError(err, "LastSec") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *LastMinuteLatencies) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 2 + // string "Totals" + o = append(o, 0x82, 0xa6, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x73) + o = msgp.AppendArrayHeader(o, uint32(60)) + for za0001 := range z.Totals { + o = msgp.AppendArrayHeader(o, uint32(sizeLastElemMarker)) + for za0002 := range z.Totals[za0001] { + // map header, size 2 + // string "Total" + o = append(o, 0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) + o = msgp.AppendInt64(o, z.Totals[za0001][za0002].Total) + // string "N" + o = append(o, 0xa1, 0x4e) + o = msgp.AppendInt64(o, z.Totals[za0001][za0002].N) + } + } + // string "LastSec" + o = append(o, 0xa7, 0x4c, 0x61, 0x73, 0x74, 0x53, 0x65, 0x63) + o = msgp.AppendInt64(o, z.LastSec) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *LastMinuteLatencies) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Totals": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Totals") + return + } + if zb0002 != uint32(60) { + err = msgp.ArrayError{Wanted: uint32(60), Got: zb0002} + return + } + for za0001 := range z.Totals { + var zb0003 uint32 + zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Totals", za0001) + return + } + if zb0003 != uint32(sizeLastElemMarker) { + err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0003} + return + } + for za0002 := range z.Totals[za0001] { + var zb0004 uint32 + zb0004, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Totals", za0001, za0002) + return + } + for zb0004 > 0 { + zb0004-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err, "Totals", za0001, za0002) + return + } + switch msgp.UnsafeString(field) { + case "Total": + z.Totals[za0001][za0002].Total, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Totals", za0001, za0002, "Total") + return + } + case "N": + z.Totals[za0001][za0002].N, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Totals", za0001, za0002, "N") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, "Totals", za0001, za0002) + return + } + } + } + } + } + case "LastSec": + z.LastSec, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "LastSec") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *LastMinuteLatencies) Msgsize() (s int) { + s = 1 + 7 + msgp.ArrayHeaderSize + (60 * (sizeLastElemMarker * (9 + msgp.Int64Size + msgp.Int64Size))) + 8 + msgp.Int64Size + return +} diff --git a/cmd/last-minute_gen_test.go b/cmd/last-minute_gen_test.go new file mode 100644 index 000000000..d54aed0e3 --- /dev/null +++ b/cmd/last-minute_gen_test.go @@ -0,0 +1,236 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalAccElem(t *testing.T) { + v := AccElem{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgAccElem(b *testing.B) { + v := AccElem{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgAccElem(b *testing.B) { + v := AccElem{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalAccElem(b *testing.B) { + v := AccElem{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeAccElem(t *testing.T) { + v := AccElem{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeAccElem Msgsize() is inaccurate") + } + + vn := AccElem{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeAccElem(b *testing.B) { + v := AccElem{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeAccElem(b *testing.B) { + v := AccElem{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalLastMinuteLatencies(t *testing.T) { + v := LastMinuteLatencies{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgLastMinuteLatencies(b *testing.B) { + v := LastMinuteLatencies{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgLastMinuteLatencies(b *testing.B) { + v := LastMinuteLatencies{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalLastMinuteLatencies(b *testing.B) { + v := LastMinuteLatencies{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeLastMinuteLatencies(t *testing.T) { + v := LastMinuteLatencies{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeLastMinuteLatencies Msgsize() is inaccurate") + } + + vn := LastMinuteLatencies{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeLastMinuteLatencies(b *testing.B) { + v := LastMinuteLatencies{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeLastMinuteLatencies(b *testing.B) { + v := LastMinuteLatencies{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index 212615b2e..c79edb494 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -99,17 +99,18 @@ const ( total MetricName = "total" freeInodes MetricName = "free_inodes" - failedCount MetricName = "failed_count" - failedBytes MetricName = "failed_bytes" - freeBytes MetricName = "free_bytes" - readBytes MetricName = "read_bytes" - rcharBytes MetricName = "rchar_bytes" - receivedBytes MetricName = "received_bytes" - sentBytes MetricName = "sent_bytes" - totalBytes MetricName = "total_bytes" - usedBytes MetricName = "used_bytes" - writeBytes MetricName = "write_bytes" - wcharBytes MetricName = "wchar_bytes" + failedCount MetricName = "failed_count" + failedBytes MetricName = "failed_bytes" + freeBytes MetricName = "free_bytes" + readBytes MetricName = "read_bytes" + rcharBytes MetricName = "rchar_bytes" + receivedBytes MetricName = "received_bytes" + latencyMilliSec MetricName = "latency_ms" + sentBytes MetricName = "sent_bytes" + totalBytes MetricName = "total_bytes" + usedBytes MetricName = "used_bytes" + writeBytes MetricName = "write_bytes" + wcharBytes MetricName = "wchar_bytes" usagePercent MetricName = "update_percent" @@ -409,6 +410,16 @@ func getBucketUsageObjectsTotalMD() MetricDescription { } } +func getBucketRepLatencyMD() MetricDescription { + return MetricDescription{ + Namespace: bucketMetricNamespace, + Subsystem: replicationSubsystem, + Name: latencyMilliSec, + Help: "Replication latency in milliseconds.", + Type: histogramMetric, + } +} + func getBucketRepFailedBytesMD() MetricDescription { return MetricDescription{ Namespace: bucketMetricNamespace, @@ -1482,6 +1493,13 @@ func getBucketUsageMetrics() MetricsGroup { Value: float64(stat.FailedCount), VariableLabels: map[string]string{"bucket": bucket, "targetArn": arn}, }) + metrics = append(metrics, Metric{ + Description: getBucketRepLatencyMD(), + HistogramBucketLabel: "range", + Histogram: stat.Latency.getUploadLatency(), + VariableLabels: map[string]string{"bucket": bucket, "operation": "upload", "targetArn": arn}, + }) + } } diff --git a/cmd/metrics.go b/cmd/metrics.go index 66f30bad7..d7de59506 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -454,6 +454,7 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic FailedCount: stat.FailedCount + oldst.FailedCount, FailedSize: stat.FailedSize + oldst.FailedSize, ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize, + Latency: stat.Latency.merge(oldst.Latency), } } } @@ -502,6 +503,8 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic // happen since data usage picture can lag behind actual usage state at the time of cluster start st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0)) st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0)) + st.Latency = tgtstat.Latency + s.Stats[arn] = &st s.FailedSize += st.FailedSize s.FailedCount += st.FailedCount