fix: when logger queue is full exit quickly upon doneCh (#14928)

Additionally only reload requested sub-system not everything
This commit is contained in:
Harshavardhana 2022-05-16 16:10:51 -07:00 committed by GitHub
parent 05685863e3
commit 040ac5cad8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 59 additions and 24 deletions

View file

@ -104,7 +104,7 @@ func applyDynamic(ctx context.Context, objectAPI ObjectLayer, cfg config.Config,
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
globalNotificationSys.SignalService(serviceReloadDynamic)
globalNotificationSys.SignalConfigReload(subSys)
// Tell the client that dynamic config was applied.
w.Header().Set(madmin.ConfigAppliedHeader, madmin.ConfigAppliedTrue)
}

View file

@ -414,6 +414,21 @@ func (sys *NotificationSys) ServerUpdate(ctx context.Context, u *url.URL, sha256
return ng.Wait()
}
// SignalConfigReload reloads requested sub-system on a remote peer dynamically.
func (sys *NotificationSys) SignalConfigReload(subSys string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
return client.SignalService(serviceReloadDynamic, subSys)
}, idx, *client.host)
}
return ng.Wait()
}
// SignalService - calls signal service RPC call on all peers.
func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))
@ -423,7 +438,7 @@ func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerE
}
client := client
ng.Go(GlobalContext, func() error {
return client.SignalService(sig)
return client.SignalService(sig, "")
}, idx, *client.host)
}
return ng.Wait()
@ -1541,7 +1556,7 @@ func (sys *NotificationSys) ServiceFreeze(ctx context.Context, freeze bool) []No
}
client := client
ng.Go(GlobalContext, func() error {
return client.SignalService(serviceSig)
return client.SignalService(serviceSig, "")
}, idx, *client.host)
}
nerrs := ng.Wait()

View file

@ -688,9 +688,10 @@ func (client *peerRESTClient) ServerUpdate(ctx context.Context, u *url.URL, sha2
}
// SignalService - sends signal to peer nodes.
func (client *peerRESTClient) SignalService(sig serviceSignal) error {
func (client *peerRESTClient) SignalService(sig serviceSignal, subSys string) error {
values := make(url.Values)
values.Set(peerRESTSignal, strconv.Itoa(int(sig)))
values.Set(peerRESTSubSys, subSys)
respBody, err := client.call(peerRESTMethodSignalService, values, nil, -1)
if err != nil {
return err

View file

@ -84,6 +84,7 @@ const (
peerRESTUserOrGroup = "user-or-group"
peerRESTIsGroup = "is-group"
peerRESTSignal = "signal"
peerRESTSubSys = "sub-sys"
peerRESTProfiler = "profiler"
peerRESTTraceErr = "err"
peerRESTTraceInternal = "internal"

View file

@ -837,7 +837,14 @@ func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Req
s.writeErrorResponse(w, err)
return
}
if err = applyDynamicConfig(r.Context(), objAPI, srvCfg); err != nil {
subSys := r.Form.Get(peerRESTSubSys)
// Apply dynamic values.
if subSys == "" {
err = applyDynamicConfig(r.Context(), objAPI, srvCfg)
} else {
err = applyDynamicConfigForSubSys(r.Context(), objAPI, srvCfg, subSys)
}
if err != nil {
s.writeErrorResponse(w, err)
}
return

View file

@ -26,7 +26,6 @@ import (
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
xhttp "github.com/minio/minio/internal/http"
@ -58,8 +57,8 @@ type Config struct {
// buffer is full, new logs are just ignored and an error
// is returned to the caller.
type Target struct {
status int32
wg sync.WaitGroup
doneCh chan struct{}
// Channel of log entries
logCh chan interface{}
@ -115,7 +114,6 @@ func (h *Target) Init() error {
h.config.Endpoint, resp.Status)
}
h.status = 1
go h.startHTTPLogger()
return nil
}
@ -177,11 +175,15 @@ func (h *Target) logEntry(entry interface{}) {
func (h *Target) startHTTPLogger() {
// Create a routine which sends json logs received
// from an internal channel.
h.wg.Add(1)
go func() {
h.wg.Add(1)
defer h.wg.Done()
for entry := range h.logCh {
select {
case entry := <-h.logCh:
h.logEntry(entry)
case <-h.doneCh:
return
}
}()
}
@ -191,6 +193,7 @@ func (h *Target) startHTTPLogger() {
func New(config Config) *Target {
h := &Target{
logCh: make(chan interface{}, config.QueueSize),
doneCh: make(chan struct{}),
config: config,
}
@ -199,12 +202,14 @@ func New(config Config) *Target {
// Send log message 'e' to http target.
func (h *Target) Send(entry interface{}, errKind string) error {
if atomic.LoadInt32(&h.status) == 0 {
// Channel was closed or used before init.
select {
case <-h.doneCh:
return nil
default:
}
select {
case <-h.doneCh:
case h.logCh <- entry:
default:
// log channel is full, do not wait and return
@ -217,9 +222,8 @@ func (h *Target) Send(entry interface{}, errKind string) error {
// Cancel - cancels the target
func (h *Target) Cancel() {
if atomic.CompareAndSwapInt32(&h.status, 1, 0) {
close(h.logCh)
}
close(h.doneCh)
close(h.logCh)
h.wg.Wait()
}

View file

@ -25,7 +25,6 @@ import (
"errors"
"net"
"sync"
"sync/atomic"
"github.com/Shopify/sarama"
saramatls "github.com/Shopify/sarama/tools/tls"
@ -37,8 +36,8 @@ import (
// Target - Kafka target.
type Target struct {
status int32
wg sync.WaitGroup
doneCh chan struct{}
// Channel of log entries
logCh chan interface{}
@ -51,6 +50,13 @@ type Target struct {
// Send log message 'e' to kafka target.
func (h *Target) Send(entry interface{}, errKind string) error {
select {
case <-h.doneCh:
return nil
default:
}
select {
case <-h.doneCh:
case h.logCh <- entry:
default:
// log channel is full, do not wait and return
@ -86,11 +92,15 @@ func (h *Target) logEntry(entry interface{}) {
func (h *Target) startKakfaLogger() {
// Create a routine which sends json logs received
// from an internal channel.
h.wg.Add(1)
go func() {
h.wg.Add(1)
defer h.wg.Done()
for entry := range h.logCh {
select {
case entry := <-h.logCh:
h.logEntry(entry)
case <-h.doneCh:
return
}
}()
}
@ -204,17 +214,14 @@ func (h *Target) Init() error {
}
h.producer = producer
h.status = 1
go h.startKakfaLogger()
return nil
}
// Cancel - cancels the target
func (h *Target) Cancel() {
if atomic.CompareAndSwapInt32(&h.status, 1, 0) {
close(h.logCh)
}
close(h.doneCh)
close(h.logCh)
h.wg.Wait()
}