fix: failed messages counting in audit_http metrics (#18075)

all retries must not be counted as failed messages,
a failed message is a single counter not for all
retries, this PR fixes this.

Also we do not need to retry 10-times, instead we should
retry at max 3 times with some jitter to deliver the
messages.
This commit is contained in:
Harshavardhana 2023-09-21 11:24:56 -07:00 committed by GitHub
parent 74cfb207c1
commit 1472875670
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"math"
"math/rand"
"net/http"
"net/url"
"os"
@ -153,7 +154,7 @@ func (h *Target) Init(ctx context.Context) (err error) {
if h.config.QueueDir != "" {
return h.initQueueStoreOnce.DoWithContext(ctx, h.initQueueStore)
}
return h.initLogChannel(ctx)
return h.init(ctx)
}
func (h *Target) initQueueStore(ctx context.Context) (err error) {
@ -170,7 +171,7 @@ func (h *Target) initQueueStore(ctx context.Context) (err error) {
return
}
func (h *Target) initLogChannel(ctx context.Context) (err error) {
func (h *Target) init(ctx context.Context) (err error) {
switch atomic.LoadInt32(&h.status) {
case statusOnline:
return nil
@ -182,8 +183,10 @@ func (h *Target) initLogChannel(ctx context.Context) (err error) {
// Start a goroutine that will continue to check if we can reach
h.revive.Do(func() {
go func() {
t := time.NewTicker(time.Second)
// Avoid stamping herd, add jitter.
t := time.NewTicker(time.Second + time.Duration(rand.Int63n(int64(5*time.Second))))
defer t.Stop()
for range t.C {
if atomic.LoadInt32(&h.status) != statusOffline {
return
@ -261,27 +264,29 @@ func (h *Target) logEntry(ctx context.Context, entry interface{}) {
return
}
const maxTries = 3
tries := 0
for {
if tries > 0 {
if tries >= 10 || atomic.LoadInt32(&h.status) == statusClosed {
// Don't retry when closing...
return
}
// sleep = (tries+2) ^ 2 milliseconds.
sleep := time.Duration(math.Pow(float64(tries+2), 2)) * time.Millisecond
if sleep > time.Second {
sleep = time.Second
}
time.Sleep(sleep)
}
tries++
if err := h.send(ctx, logJSON, webhookCallTimeout); err != nil {
h.config.LogOnce(ctx, err, h.Endpoint())
atomic.AddInt64(&h.failedMessages, 1)
} else {
for tries < maxTries {
if atomic.LoadInt32(&h.status) == statusClosed {
// Don't retry when closing...
return
}
// sleep = (tries+2) ^ 2 milliseconds.
sleep := time.Duration(math.Pow(float64(tries+2), 2)) * time.Millisecond
if sleep > time.Second {
sleep = time.Second
}
time.Sleep(sleep)
tries++
err := h.send(ctx, logJSON, webhookCallTimeout)
if err == nil {
return
}
h.config.LogOnce(ctx, err, h.Endpoint())
}
if tries == maxTries {
// Even with multiple retries, count failed messages as only one.
atomic.AddInt64(&h.failedMessages, 1)
}
}