From a6057c35cc05eba06c531e7c2f833e0b0880614f Mon Sep 17 00:00:00 2001 From: Poorna Date: Tue, 7 Mar 2023 08:13:28 -0800 Subject: [PATCH] Avoid peer notification when peer is offline, tune retries (#16737) --- cmd/notification.go | 41 +++++++++++++++++++++++++---------------- cmd/peer-rest-client.go | 4 ++++ 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/cmd/notification.go b/cmd/notification.go index 22889727e..a45d3fd49 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -56,14 +56,23 @@ type NotificationPeerErr struct { // // A zero NotificationGroup is valid and does not cancel on error. type NotificationGroup struct { - wg sync.WaitGroup - errs []NotificationPeerErr + wg sync.WaitGroup + errs []NotificationPeerErr + retryCount int } // WithNPeers returns a new NotificationGroup with length of errs slice upto nerrs, // upon Wait() errors are returned collected from all tasks. func WithNPeers(nerrs int) *NotificationGroup { - return &NotificationGroup{errs: make([]NotificationPeerErr, nerrs)} + return &NotificationGroup{errs: make([]NotificationPeerErr, nerrs), retryCount: 3} +} + +// WithRetries sets the retry count for all function calls from the Go method. +func (g *NotificationGroup) WithRetries(retryCount int) *NotificationGroup { + if g != nil { + g.retryCount = retryCount + } + return g } // Wait blocks until all function calls from the Go method have returned, then @@ -85,17 +94,17 @@ func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, a g.errs[index] = NotificationPeerErr{ Host: addr, } - for i := 0; i < 3; i++ { + for i := 0; i < g.retryCount; i++ { if err := f(); err != nil { g.errs[index].Err = err // Last iteration log the error. - if i == 2 { + if i == g.retryCount-1 { reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String()) ctx := logger.SetReqInfo(ctx, reqInfo) logger.LogIf(ctx, err) } // Wait for one second and no need wait after last attempt. - if i < 2 { + if i < g.retryCount-1 { time.Sleep(1 * time.Second) } continue @@ -107,7 +116,7 @@ func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, a // DeletePolicy - deletes policy across all peers. func (sys *NotificationSys) DeletePolicy(policyName string) []NotificationPeerErr { - ng := WithNPeers(len(sys.peerClients)) + ng := WithNPeers(len(sys.peerClients)).WithRetries(1) for idx, client := range sys.peerClients { if client == nil { continue @@ -122,7 +131,7 @@ func (sys *NotificationSys) DeletePolicy(policyName string) []NotificationPeerEr // LoadPolicy - reloads a specific modified policy across all peers func (sys *NotificationSys) LoadPolicy(policyName string) []NotificationPeerErr { - ng := WithNPeers(len(sys.peerClients)) + ng := WithNPeers(len(sys.peerClients)).WithRetries(1) for idx, client := range sys.peerClients { if client == nil { continue @@ -137,7 +146,7 @@ func (sys *NotificationSys) LoadPolicy(policyName string) []NotificationPeerErr // LoadPolicyMapping - reloads a policy mapping across all peers func (sys *NotificationSys) LoadPolicyMapping(userOrGroup string, userType IAMUserType, isGroup bool) []NotificationPeerErr { - ng := WithNPeers(len(sys.peerClients)) + ng := WithNPeers(len(sys.peerClients)).WithRetries(1) for idx, client := range sys.peerClients { if client == nil { continue @@ -152,7 +161,7 @@ func (sys *NotificationSys) LoadPolicyMapping(userOrGroup string, userType IAMUs // DeleteUser - deletes a specific user across all peers func (sys *NotificationSys) DeleteUser(accessKey string) []NotificationPeerErr { - ng := WithNPeers(len(sys.peerClients)) + ng := WithNPeers(len(sys.peerClients)).WithRetries(1) for idx, client := range sys.peerClients { if client == nil { continue @@ -167,7 +176,7 @@ func (sys *NotificationSys) DeleteUser(accessKey string) []NotificationPeerErr { // LoadUser - reloads a specific user across all peers func (sys *NotificationSys) LoadUser(accessKey string, temp bool) []NotificationPeerErr { - ng := WithNPeers(len(sys.peerClients)) + ng := WithNPeers(len(sys.peerClients)).WithRetries(1) for idx, client := range sys.peerClients { if client == nil { continue @@ -182,7 +191,7 @@ func (sys *NotificationSys) LoadUser(accessKey string, temp bool) []Notification // LoadGroup - loads a specific group on all peers. func (sys *NotificationSys) LoadGroup(group string) []NotificationPeerErr { - ng := WithNPeers(len(sys.peerClients)) + ng := WithNPeers(len(sys.peerClients)).WithRetries(1) for idx, client := range sys.peerClients { if client == nil { continue @@ -195,7 +204,7 @@ func (sys *NotificationSys) LoadGroup(group string) []NotificationPeerErr { // DeleteServiceAccount - deletes a specific service account across all peers func (sys *NotificationSys) DeleteServiceAccount(accessKey string) []NotificationPeerErr { - ng := WithNPeers(len(sys.peerClients)) + ng := WithNPeers(len(sys.peerClients)).WithRetries(1) for idx, client := range sys.peerClients { if client == nil { continue @@ -210,7 +219,7 @@ func (sys *NotificationSys) DeleteServiceAccount(accessKey string) []Notificatio // LoadServiceAccount - reloads a specific service account across all peers func (sys *NotificationSys) LoadServiceAccount(accessKey string) []NotificationPeerErr { - ng := WithNPeers(len(sys.peerClients)) + ng := WithNPeers(len(sys.peerClients)).WithRetries(1) for idx, client := range sys.peerClients { if client == nil { continue @@ -468,7 +477,7 @@ func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName // GetClusterAllBucketStats - returns bucket stats for all buckets from all remote peers. func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []BucketStatsMap { - ng := WithNPeers(len(sys.peerClients)) + ng := WithNPeers(len(sys.peerClients)).WithRetries(1) replicationStats := make([]BucketStatsMap, len(sys.peerClients)) for index, client := range sys.peerClients { index := index @@ -509,7 +518,7 @@ func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []Buck // GetClusterBucketStats - calls GetClusterBucketStats call on all peers for a cluster statistics view. func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketName string) []BucketStats { - ng := WithNPeers(len(sys.peerClients)) + ng := WithNPeers(len(sys.peerClients)).WithRetries(1) bucketStats := make([]BucketStats, len(sys.peerClients)) for index, client := range sys.peerClients { index := index diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 32eb04492..6d2e83bdb 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -57,6 +57,10 @@ func (client *peerRESTClient) call(method string, values url.Values, body io.Rea // permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() // after verifying format.json func (client *peerRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { + if client == nil || !client.IsOnline() { + return nil, errPeerNotReachable + } + if values == nil { values = make(url.Values) }