diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 9aa3377bf..e8cefdf84 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -138,6 +138,10 @@ func (h *Target) ping(ctx context.Context) bool { if err := h.send(ctx, []byte(`{}`), webhookCallTimeout); err != nil { return !xnet.IsNetworkOrHostDown(err, false) && !xnet.IsConnRefusedErr(err) } + // We are online. + h.workerStartMu.Lock() + h.lastStarted = time.Now() + h.workerStartMu.Unlock() go h.startHTTPLogger(ctx) return true } @@ -199,14 +203,6 @@ func (h *Target) init(ctx context.Context) (err error) { return } if h.ping(ctx) { - // We are online. - if atomic.CompareAndSwapInt32(&h.status, statusOffline, statusOnline) { - h.workerStartMu.Lock() - h.lastStarted = time.Now() - h.workerStartMu.Unlock() - atomic.AddInt64(&h.workers, 1) - go h.startHTTPLogger(ctx) - } return } } @@ -214,14 +210,6 @@ func (h *Target) init(ctx context.Context) (err error) { }) return err } - - if atomic.CompareAndSwapInt32(&h.status, statusOffline, statusOnline) { - h.workerStartMu.Lock() - h.lastStarted = time.Now() - h.workerStartMu.Unlock() - atomic.AddInt64(&h.workers, 1) - go h.startHTTPLogger(ctx) - } return nil } @@ -306,6 +294,9 @@ func (h *Target) logEntry(ctx context.Context, entry interface{}) { } func (h *Target) startHTTPLogger(ctx context.Context) { + atomic.AddInt64(&h.workers, 1) + defer atomic.AddInt64(&h.workers, -1) + h.logChMu.RLock() logCh := h.logCh if logCh != nil { @@ -314,15 +305,12 @@ func (h *Target) startHTTPLogger(ctx context.Context) { defer h.wg.Done() } h.logChMu.RUnlock() - - defer atomic.AddInt64(&h.workers, -1) - if logCh == nil { return } + // Send messages until channel is closed. for entry := range logCh { - atomic.AddInt64(&h.totalMessages, 1) h.logEntry(ctx, entry) } } @@ -394,8 +382,10 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error { return nil } +retry: select { case h.logCh <- entry: + atomic.AddInt64(&h.totalMessages, 1) case <-ctx.Done(): // return error only for context timedout. if errors.Is(ctx.Err(), context.DeadlineExceeded) { @@ -407,24 +397,13 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error { if nWorkers < maxWorkers { // Only have one try to start at the same time. h.workerStartMu.Lock() - defer h.workerStartMu.Unlock() - // Start one max every second. if time.Since(h.lastStarted) > time.Second { - if atomic.CompareAndSwapInt64(&h.workers, nWorkers, nWorkers+1) { - // Start another logger. - h.lastStarted = time.Now() - go h.startHTTPLogger(ctx) - } + h.lastStarted = time.Now() + go h.startHTTPLogger(ctx) } - select { - case h.logCh <- entry: - case <-ctx.Done(): - // return error only for context timedout. - if errors.Is(ctx.Err(), context.DeadlineExceeded) { - return ctx.Err() - } - } - return nil + h.workerStartMu.Unlock() + + goto retry } atomic.AddInt64(&h.totalMessages, 1) atomic.AddInt64(&h.failedMessages, 1)