From 9341201132210ef64e07cb7a85b5b9cec814ba8d Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 12 May 2022 07:20:58 -0700 Subject: [PATCH] logger lock should be more granular (#14901) This PR simplifies few things by splitting the locks between audit, logger targets to avoid potential contention between them. any failures inside audit/logger HTTP targets must only log to console instead of other targets to avoid cyclical dependency. avoids unneeded atomic variables instead uses RWLock to differentiate a more common read phase v/s lock phase. --- cmd/config-current.go | 4 +-- cmd/test-utils_test.go | 11 +++--- internal/logger/audit.go | 8 ++--- internal/logger/logger.go | 38 ++++++++++++++++---- internal/logger/logonce.go | 38 ++++++++++++++++++++ internal/logger/targets.go | 73 ++++++++++++++++++-------------------- 6 files changed, 114 insertions(+), 58 deletions(-) diff --git a/cmd/config-current.go b/cmd/config-current.go index 99d2cd6fd..76b428967 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -639,7 +639,7 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf userAgent := getUserAgent(getMinioMode()) for n, l := range loggerCfg.HTTP { if l.Enabled { - l.LogOnce = logger.LogOnceIf + l.LogOnce = logger.LogOnceConsoleIf l.UserAgent = userAgent l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) loggerCfg.HTTP[n] = l @@ -657,7 +657,7 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf userAgent := getUserAgent(getMinioMode()) for n, l := range loggerCfg.AuditWebhook { if l.Enabled { - l.LogOnce = logger.LogOnceIf + l.LogOnce = logger.LogOnceConsoleIf l.UserAgent = userAgent l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) loggerCfg.AuditWebhook[n] = l diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index fb34ec2c9..6a6bc7324 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -53,6 +53,7 @@ import ( "sync" "testing" "time" + "unsafe" "github.com/fatih/color" @@ -1183,12 +1184,11 @@ func newTestSignedRequestV4(method, urlStr string, contentLength int64, body io. return req, nil } -// Function to generate random string for bucket/object names. -func randString(n int) string { - src := rand.NewSource(UTCNow().UnixNano()) +var src = rand.NewSource(time.Now().UnixNano()) +func randString(n int) string { b := make([]byte, n) - // A rand.Int63() generates 63 random bits, enough for letterIdxMax letters! + // A src.Int63() generates 63 random bits, enough for letterIdxMax characters! for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; { if remain == 0 { cache, remain = src.Int63(), letterIdxMax @@ -1200,7 +1200,8 @@ func randString(n int) string { cache >>= letterIdxBits remain-- } - return string(b) + + return *(*string)(unsafe.Pointer(&b)) } // generate random object name. diff --git a/internal/logger/audit.go b/internal/logger/audit.go index 898014317..213775e78 100644 --- a/internal/logger/audit.go +++ b/internal/logger/audit.go @@ -24,7 +24,6 @@ import ( "io" "net/http" "strconv" - "sync/atomic" "time" "github.com/klauspost/compress/gzhttp" @@ -158,13 +157,12 @@ func GetAuditEntry(ctx context.Context) *audit.Entry { // AuditLog - logs audit logs to all audit targets. func AuditLog(ctx context.Context, w http.ResponseWriter, r *http.Request, reqClaims map[string]interface{}, filterKeys ...string) { - // Fast exit if there is not audit target configured - if atomic.LoadInt32(&nAuditTargets) == 0 { + auditTgts := AuditTargets() + if len(auditTgts) == 0 { return } var entry audit.Entry - if w != nil && r != nil { reqInfo := GetReqInfo(ctx) if reqInfo == nil { @@ -234,7 +232,7 @@ func AuditLog(ctx context.Context, w http.ResponseWriter, r *http.Request, reqCl } // Send audit logs only to http targets. - for _, t := range AuditTargets() { + for _, t := range auditTgts { if err := t.Send(entry, string(All)); err != nil { LogAlwaysIf(context.Background(), fmt.Errorf("event(%v) was not sent to Audit target (%v): %v", entry, t, err), All) } diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 9f80ace50..68b342c45 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -278,12 +278,7 @@ func LogIf(ctx context.Context, err error, errKind ...interface{}) { logIf(ctx, err, errKind...) } -// logIf prints a detailed error message during -// the execution of the server. -func logIf(ctx context.Context, err error, errKind ...interface{}) { - if Disable { - return - } +func errToEntry(ctx context.Context, err error, errKind ...interface{}) log.Entry { logKind := string(Minio) if len(errKind) > 0 { if ek, ok := errKind[0].(Kind); ok { @@ -357,8 +352,37 @@ func logIf(ctx context.Context, err error, errKind ...interface{}) { entry.Trace.Variables = make(map[string]interface{}) } + return entry +} + +// consoleLogIf prints a detailed error message during +// the execution of the server. +func consoleLogIf(ctx context.Context, err error, errKind ...interface{}) { + if Disable { + return + } + + if consoleTgt != nil { + entry := errToEntry(ctx, err, errKind...) + consoleTgt.Send(entry, entry.LogKind) + } +} + +// logIf prints a detailed error message during +// the execution of the server. +func logIf(ctx context.Context, err error, errKind ...interface{}) { + if Disable { + return + } + + systemTgts := SystemTargets() + if len(systemTgts) == 0 { + return + } + + entry := errToEntry(ctx, err, errKind...) // Iterate over all logger targets to send the log entry - for _, t := range SystemTargets() { + for _, t := range systemTgts { if err := t.Send(entry, entry.LogKind); err != nil { if consoleTgt != nil { entry.Trace.Message = fmt.Sprintf("event(%#v) was not sent to Logger target (%#v): %#v", entry, t, err) diff --git a/internal/logger/logonce.go b/internal/logger/logonce.go index 55a2b062c..e15eaf93f 100644 --- a/internal/logger/logonce.go +++ b/internal/logger/logonce.go @@ -31,6 +31,27 @@ type logOnceType struct { sync.Mutex } +func (l *logOnceType) logOnceConsoleIf(ctx context.Context, err error, id interface{}, errKind ...interface{}) { + if err == nil { + return + } + l.Lock() + shouldLog := false + prevErr := l.IDMap[id] + if prevErr == nil { + l.IDMap[id] = err + shouldLog = true + } else if prevErr.Error() != err.Error() { + l.IDMap[id] = err + shouldLog = true + } + l.Unlock() + + if shouldLog { + consoleLogIf(ctx, err, errKind...) + } +} + // One log message per error. func (l *logOnceType) logOnceIf(ctx context.Context, err error, id interface{}, errKind ...interface{}) { if err == nil { @@ -91,3 +112,20 @@ func LogOnceIf(ctx context.Context, err error, id interface{}, errKind ...interf logOnce.logOnceIf(ctx, err, id, errKind...) } + +// LogOnceConsoleIf - similar to LogOnceIf but exclusively only logs to console target. +func LogOnceConsoleIf(ctx context.Context, err error, id interface{}, errKind ...interface{}) { + if err == nil { + return + } + + if errors.Is(err, context.Canceled) { + return + } + + if err.Error() == http.ErrServerClosed.Error() || err.Error() == "disk not found" { + return + } + + logOnce.logOnceConsoleIf(ctx, err, id, errKind...) +} diff --git a/internal/logger/targets.go b/internal/logger/targets.go index 9337bed5f..7df33a0eb 100644 --- a/internal/logger/targets.go +++ b/internal/logger/targets.go @@ -19,7 +19,6 @@ package logger import ( "sync" - "sync/atomic" "github.com/minio/minio/internal/logger/target/http" "github.com/minio/minio/internal/logger/target/kafka" @@ -39,8 +38,8 @@ type Target interface { } var ( - // swapMu must be held while reading slice info or swapping targets or auditTargets. - swapMu sync.Mutex + swapAuditMuRW sync.RWMutex + swapSystemMuRW sync.RWMutex // systemTargets is the set of enabled loggers. // Must be immutable at all times. @@ -49,33 +48,25 @@ var ( // This is always set represent /dev/console target consoleTgt Target - - nTargets int32 // atomic count of len(targets) ) // SystemTargets returns active targets. // Returned slice may not be modified in any way. func SystemTargets() []Target { - if atomic.LoadInt32(&nTargets) == 0 { - // Lock free if none... - return nil - } - swapMu.Lock() + swapSystemMuRW.RLock() + defer swapSystemMuRW.RUnlock() + res := systemTargets - swapMu.Unlock() return res } // AuditTargets returns active audit targets. // Returned slice may not be modified in any way. func AuditTargets() []Target { - if atomic.LoadInt32(&nAuditTargets) == 0 { - // Lock free if none... - return nil - } - swapMu.Lock() + swapAuditMuRW.RLock() + defer swapAuditMuRW.RUnlock() + res := auditTargets - swapMu.Unlock() return res } @@ -83,8 +74,7 @@ func AuditTargets() []Target { // Must be immutable at all times. // Can be swapped to another while holding swapMu var ( - auditTargets = []Target{} - nAuditTargets int32 // atomic count of len(auditTargets) + auditTargets = []Target{} ) // AddSystemTarget adds a new logger target to the @@ -93,7 +83,10 @@ func AddSystemTarget(t Target) error { if err := t.Init(); err != nil { return err } - swapMu.Lock() + + swapSystemMuRW.Lock() + defer swapSystemMuRW.Unlock() + if consoleTgt == nil { if t.Type() == types.TargetConsole { consoleTgt = t @@ -102,8 +95,6 @@ func AddSystemTarget(t Target) error { updated := append(make([]Target, 0, len(systemTargets)+1), systemTargets...) updated = append(updated, t) systemTargets = updated - atomic.StoreInt32(&nTargets, int32(len(updated))) - swapMu.Unlock() return nil } @@ -142,24 +133,26 @@ func initKafkaTargets(cfgMap map[string]kafka.Config) (tgts []Target, err error) // UpdateSystemTargets swaps targets with newly loaded ones from the cfg func UpdateSystemTargets(cfg Config) error { - updated, err := initSystemTargets(cfg.HTTP) + newTgts, err := initSystemTargets(cfg.HTTP) if err != nil { return err } - swapMu.Lock() + swapSystemMuRW.Lock() + defer swapSystemMuRW.Unlock() + for _, tgt := range systemTargets { // Preserve console target when dynamically updating // other HTTP targets, console target is always present. if tgt.Type() == types.TargetConsole { - updated = append(updated, tgt) + newTgts = append(newTgts, tgt) break } } - atomic.StoreInt32(&nTargets, int32(len(updated))) + cancelAllSystemTargets() // cancel running targets - systemTargets = updated - swapMu.Unlock() + systemTargets = newTgts + return nil } @@ -183,18 +176,19 @@ func existingAuditTargets(t types.TargetType) []Target { // UpdateAuditWebhookTargets swaps audit webhook targets with newly loaded ones from the cfg func UpdateAuditWebhookTargets(cfg Config) error { - updated, err := initSystemTargets(cfg.AuditWebhook) + newTgts, err := initSystemTargets(cfg.AuditWebhook) if err != nil { return err } - // retain kafka targets - updated = append(existingAuditTargets(types.TargetKafka), updated...) - swapMu.Lock() - atomic.StoreInt32(&nAuditTargets, int32(len(updated))) + // retain kafka targets + swapAuditMuRW.Lock() + defer swapAuditMuRW.Unlock() + + newTgts = append(existingAuditTargets(types.TargetKafka), newTgts...) cancelAuditTargetType(types.TargetHTTP) // cancel running targets - auditTargets = updated - swapMu.Unlock() + auditTargets = newTgts + return nil } @@ -204,13 +198,14 @@ func UpdateAuditKafkaTargets(cfg Config) error { if err != nil { return err } + + swapAuditMuRW.Lock() + defer swapAuditMuRW.Unlock() + // retain HTTP targets updated = append(existingAuditTargets(types.TargetHTTP), updated...) - - swapMu.Lock() - atomic.StoreInt32(&nAuditTargets, int32(len(updated))) cancelAuditTargetType(types.TargetKafka) // cancel running targets auditTargets = updated - swapMu.Unlock() + return nil }