From f8650a3493abd6eccc8b96f84e985cd1287b9f08 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 23 May 2022 09:15:30 -0700 Subject: [PATCH] fetch bucket replication stats across peers in single call (#14956) current implementation relied on recursively calling one bucket at a time across all peers, this would be very slow and chatty when there are 100's of buckets which would mean 100*peerCount amount of network operations. This PR attempts to reduce this entire call into `peerCount` amount of network calls only. This functionality addresses also a concern where the Prometheus metrics would significantly slow down when one of the peers is offline. --- cmd/bucket-replication-stats.go | 17 +++ cmd/bucket-replication.go | 29 +++++- cmd/bucket-stats.go | 3 + cmd/bucket-stats_gen.go | 177 ++++++++++++++++++++++++++++++++ cmd/bucket-stats_gen_test.go | 113 ++++++++++++++++++++ cmd/metrics-v2.go | 3 +- cmd/notification.go | 38 +++++++ cmd/peer-rest-client.go | 13 +++ cmd/peer-rest-common.go | 3 +- cmd/peer-rest-server.go | 21 +++- 10 files changed, 411 insertions(+), 6 deletions(-) diff --git a/cmd/bucket-replication-stats.go b/cmd/bucket-replication-stats.go index a3174fd51..5cb65d695 100644 --- a/cmd/bucket-replication-stats.go +++ b/cmd/bucket-replication-stats.go @@ -135,6 +135,23 @@ func (r *ReplicationStats) GetInitialUsage(bucket string) BucketReplicationStats return st.Clone() } +// GetAll returns replication metrics for all buckets at once. +func (r *ReplicationStats) GetAll() map[string]BucketReplicationStats { + if r == nil { + return map[string]BucketReplicationStats{} + } + + r.RLock() + defer r.RUnlock() + + bucketReplicationStats := make(map[string]BucketReplicationStats, len(r.Cache)) + for k, v := range r.Cache { + bucketReplicationStats[k] = v.Clone() + } + + return bucketReplicationStats +} + // Get replication metrics for a bucket from this node since this node came up. func (r *ReplicationStats) Get(bucket string) BucketReplicationStats { if r == nil { diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 7e4ab547a..6b668873f 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1845,9 +1845,26 @@ func resyncTarget(oi ObjectInfo, arn string, resetID string, resetBeforeDate tim return } -// get the most current of in-memory replication stats and data usage info from crawler. -func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) { - bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket) +func getAllLatestReplicationStats(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketReplicationStats) { + peerBucketStatsList := globalNotificationSys.GetClusterAllBucketStats(GlobalContext) + bucketsReplicationStats = make(map[string]BucketReplicationStats, len(bucketsUsage)) + + for bucket, u := range bucketsUsage { + bucketStats := make([]BucketStats, len(peerBucketStatsList)) + for i, peerBucketStats := range peerBucketStatsList { + bucketStat, ok := peerBucketStats[bucket] + if !ok { + continue + } + bucketStats[i] = bucketStat + } + bucketsReplicationStats[bucket] = calculateBucketReplicationStats(bucket, u, bucketStats) + } + + return bucketsReplicationStats +} + +func calculateBucketReplicationStats(bucket string, u BucketUsageInfo, bucketStats []BucketStats) (s BucketReplicationStats) { // accumulate cluster bucket stats stats := make(map[string]*BucketReplicationStat) var totReplicaSize int64 @@ -1916,6 +1933,12 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic return s } +// get the most current of in-memory replication stats and data usage info from crawler. +func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) { + bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket) + return calculateBucketReplicationStats(bucket, u, bucketStats) +} + const resyncTimeInterval = time.Minute * 10 // periodicResyncMetaSave saves in-memory resync meta stats to disk in periodic intervals diff --git a/cmd/bucket-stats.go b/cmd/bucket-stats.go index eef2b2aed..b742c57e4 100644 --- a/cmd/bucket-stats.go +++ b/cmd/bucket-stats.go @@ -51,6 +51,9 @@ func (rl *ReplicationLatency) update(size int64, duration time.Duration) { rl.UploadHistogram.Add(size, duration) } +// BucketStatsMap captures bucket statistics for all buckets +type BucketStatsMap map[string]BucketStats + // BucketStats bucket statistics type BucketStats struct { ReplicationStats BucketReplicationStats diff --git a/cmd/bucket-stats_gen.go b/cmd/bucket-stats_gen.go index 21bf53fe0..23ccc3fdc 100644 --- a/cmd/bucket-stats_gen.go +++ b/cmd/bucket-stats_gen.go @@ -792,6 +792,183 @@ func (z *BucketStats) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *BucketStatsMap) DecodeMsg(dc *msgp.Reader) (err error) { + var zb0003 uint32 + zb0003, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + if (*z) == nil { + (*z) = make(BucketStatsMap, zb0003) + } else if len((*z)) > 0 { + for key := range *z { + delete((*z), key) + } + } + for zb0003 > 0 { + zb0003-- + var zb0001 string + var zb0002 BucketStats + zb0001, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err) + return + } + var field []byte + _ = field + var zb0004 uint32 + zb0004, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + for zb0004 > 0 { + zb0004-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + switch msgp.UnsafeString(field) { + case "ReplicationStats": + err = zb0002.ReplicationStats.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, zb0001, "ReplicationStats") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + } + } + (*z)[zb0001] = zb0002 + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z BucketStatsMap) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteMapHeader(uint32(len(z))) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0005, zb0006 := range z { + err = en.WriteString(zb0005) + if err != nil { + err = msgp.WrapError(err) + return + } + // map header, size 1 + // write "ReplicationStats" + err = en.Append(0x81, 0xb0, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73) + if err != nil { + return + } + err = zb0006.ReplicationStats.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, zb0005, "ReplicationStats") + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z BucketStatsMap) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendMapHeader(o, uint32(len(z))) + for zb0005, zb0006 := range z { + o = msgp.AppendString(o, zb0005) + // map header, size 1 + // string "ReplicationStats" + o = append(o, 0x81, 0xb0, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73) + o, err = zb0006.ReplicationStats.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, zb0005, "ReplicationStats") + return + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *BucketStatsMap) UnmarshalMsg(bts []byte) (o []byte, err error) { + var zb0003 uint32 + zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + if (*z) == nil { + (*z) = make(BucketStatsMap, zb0003) + } else if len((*z)) > 0 { + for key := range *z { + delete((*z), key) + } + } + for zb0003 > 0 { + var zb0001 string + var zb0002 BucketStats + zb0003-- + zb0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + var field []byte + _ = field + var zb0004 uint32 + zb0004, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + for zb0004 > 0 { + zb0004-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + switch msgp.UnsafeString(field) { + case "ReplicationStats": + bts, err = zb0002.ReplicationStats.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, zb0001, "ReplicationStats") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + } + } + (*z)[zb0001] = zb0002 + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z BucketStatsMap) Msgsize() (s int) { + s = msgp.MapHeaderSize + if z != nil { + for zb0005, zb0006 := range z { + _ = zb0006 + s += msgp.StringPrefixSize + len(zb0005) + 1 + 17 + zb0006.ReplicationStats.Msgsize() + } + } + return +} + // DecodeMsg implements msgp.Decodable func (z *ReplicationLatency) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte diff --git a/cmd/bucket-stats_gen_test.go b/cmd/bucket-stats_gen_test.go index f50f1b964..f50ff31d0 100644 --- a/cmd/bucket-stats_gen_test.go +++ b/cmd/bucket-stats_gen_test.go @@ -348,6 +348,119 @@ func BenchmarkDecodeBucketStats(b *testing.B) { } } +func TestMarshalUnmarshalBucketStatsMap(t *testing.T) { + v := BucketStatsMap{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgBucketStatsMap(b *testing.B) { + v := BucketStatsMap{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgBucketStatsMap(b *testing.B) { + v := BucketStatsMap{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalBucketStatsMap(b *testing.B) { + v := BucketStatsMap{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeBucketStatsMap(t *testing.T) { + v := BucketStatsMap{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeBucketStatsMap Msgsize() is inaccurate") + } + + vn := BucketStatsMap{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeBucketStatsMap(b *testing.B) { + v := BucketStatsMap{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeBucketStatsMap(b *testing.B) { + v := BucketStatsMap{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalReplicationLatency(t *testing.T) { v := ReplicationLatency{} bts, err := v.MarshalMsg(nil) diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index 56cadd43b..1dd884adc 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -1604,8 +1604,9 @@ func getBucketUsageMetrics() *MetricsGroup { Value: float64(time.Since(dataUsageInfo.LastUpdate)), }) + bucketReplStats := getAllLatestReplicationStats(dataUsageInfo.BucketsUsage) for bucket, usage := range dataUsageInfo.BucketsUsage { - stats := getLatestReplicationStats(bucket, usage) + stats := bucketReplStats[bucket] quota, _ := globalBucketQuotaSys.Get(ctx, bucket) diff --git a/cmd/notification.go b/cmd/notification.go index 6f076bfa7..4566ae5ef 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -601,6 +601,44 @@ func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName } } +// GetClusterAllBucketStats - returns bucket stats for all buckets from all remote peers. +func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []BucketStatsMap { + ng := WithNPeers(len(sys.peerClients)) + replicationStats := make([]BucketStatsMap, len(sys.peerClients)) + for index, client := range sys.peerClients { + index := index + client := client + ng.Go(ctx, func() error { + if client == nil { + return errPeerNotReachable + } + bsMap, err := client.GetAllBucketStats() + if err != nil { + return err + } + replicationStats[index] = bsMap + return nil + }, index, *client.host) + } + for _, nErr := range ng.Wait() { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String()) + if nErr.Err != nil { + logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err) + } + } + + replicationStatsList := globalReplicationStats.GetAll() + bucketStatsMap := make(map[string]BucketStats, len(replicationStatsList)) + for k, replicationStats := range replicationStatsList { + bucketStatsMap[k] = BucketStats{ + ReplicationStats: replicationStats, + } + } + + replicationStats = append(replicationStats, BucketStatsMap(bucketStatsMap)) + return replicationStats +} + // GetClusterBucketStats - calls GetClusterBucketStats call on all peers for a cluster statistics view. func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketName string) []BucketStats { ng := WithNPeers(len(sys.peerClients)) diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 8d1d85912..5cd885c56 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -510,6 +510,19 @@ func (client *peerRESTClient) GetBucketStats(bucket string) (BucketStats, error) return bs, msgp.Decode(respBody, &bs) } +// GetAllBucketStats - load replication stats for all buckets +func (client *peerRESTClient) GetAllBucketStats() (BucketStatsMap, error) { + values := make(url.Values) + respBody, err := client.call(peerRESTMethodGetAllBucketStats, values, nil, -1) + if err != nil { + return nil, err + } + + bsMap := BucketStatsMap{} + defer http.DrainBody(respBody) + return bsMap, msgp.Decode(respBody, &bsMap) +} + // LoadBucketMetadata - load bucket metadata func (client *peerRESTClient) LoadBucketMetadata(bucket string) error { values := make(url.Values) diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index 9333ebc49..ab828d341 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -18,7 +18,7 @@ package cmd const ( - peerRESTVersion = "v21" // Add netperf + peerRESTVersion = "v22" // Add bulk GetBucketStats peerRESTVersionPrefix = SlashSeparator + peerRESTVersion peerRESTPrefix = minioReservedBucketPath + "/peer" peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix @@ -41,6 +41,7 @@ const ( peerRESTMethodDeleteBucketMetadata = "/deletebucketmetadata" peerRESTMethodLoadBucketMetadata = "/loadbucketmetadata" peerRESTMethodGetBucketStats = "/getbucketstats" + peerRESTMethodGetAllBucketStats = "/getallbucketstats" peerRESTMethodServerUpdate = "/serverupdate" peerRESTMethodSignalService = "/signalservice" peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus" diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index bc613db33..eb1b95bbb 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -561,8 +561,26 @@ func (s *peerRESTServer) ReloadSiteReplicationConfigHandler(w http.ResponseWrite logger.LogIf(r.Context(), globalSiteReplicationSys.Init(ctx, objAPI)) } +// GetAllBucketStatsHandler - fetches bucket replication stats for all buckets from this peer. +func (s *peerRESTServer) GetAllBucketStatsHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + replicationStats := globalReplicationStats.GetAll() + bucketStatsMap := make(map[string]BucketStats, len(replicationStats)) + for k, v := range replicationStats { + bucketStatsMap[k] = BucketStats{ + ReplicationStats: v, + } + } + + logger.LogIf(r.Context(), msgp.Encode(w, BucketStatsMap(bucketStatsMap))) +} + // GetBucketStatsHandler - fetches current in-memory bucket stats, currently only -// returns BucketReplicationStatus +// returns BucketStats, that currently includes ReplicationStats. func (s *peerRESTServer) GetBucketStatsHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { s.writeErrorResponse(w, errors.New("Invalid request")) @@ -1316,6 +1334,7 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodNetInfo).HandlerFunc(httpTraceHdrs(server.NetInfoHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDispatchNetInfo).HandlerFunc(httpTraceHdrs(server.DispatchNetInfoHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCycleBloom).HandlerFunc(httpTraceHdrs(server.CycleServerBloomFilterHandler)) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetAllBucketStats).HandlerFunc(httpTraceHdrs(server.GetAllBucketStatsHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeleteBucketMetadata).HandlerFunc(httpTraceHdrs(server.DeleteBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadBucketMetadata).HandlerFunc(httpTraceHdrs(server.LoadBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetBucketStats).HandlerFunc(httpTraceHdrs(server.GetBucketStatsHandler)).Queries(restQueries(peerRESTBucket)...)