From 05685863e38ce9f38905f5143a051e7b7eed6c0e Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Mon, 16 May 2022 21:32:36 +0100 Subject: [PATCH] Cancel old logger/audit targets outside lock (#14927) When configuring a new target, such as an audit target, the server waits until all audit events are sent to the audit target before doing the swap from the old to the new audit target. Therefore current S3 operations can suffer from this since the audit swap lock will be held. This behavior is unnecessary as the new audit target can enter in a functional mode immediately and the old audit will just cancel itself at its own pace. --- internal/logger/targets.go | 88 ++++++++++++++++---------------------- 1 file changed, 38 insertions(+), 50 deletions(-) diff --git a/internal/logger/targets.go b/internal/logger/targets.go index 7df33a0eb..16aebc92b 100644 --- a/internal/logger/targets.go +++ b/internal/logger/targets.go @@ -99,12 +99,6 @@ func AddSystemTarget(t Target) error { return nil } -func cancelAllSystemTargets() { - for _, tgt := range systemTargets { - tgt.Cancel() - } -} - func initSystemTargets(cfgMap map[string]http.Config) (tgts []Target, err error) { for _, l := range cfgMap { if l.Enabled { @@ -131,6 +125,26 @@ func initKafkaTargets(cfgMap map[string]kafka.Config) (tgts []Target, err error) return tgts, err } +// Split targets into two groups: +// group1 contains all targets of type t +// group2 contains the remaining targets +func splitTargets(targets []Target, t types.TargetType) (group1 []Target, group2 []Target) { + for _, target := range targets { + if target.Type() == t { + group1 = append(group1, target) + } else { + group2 = append(group2, target) + } + } + return +} + +func cancelTargets(targets []Target) { + for _, target := range targets { + target.Cancel() + } +} + // UpdateSystemTargets swaps targets with newly loaded ones from the cfg func UpdateSystemTargets(cfg Config) error { newTgts, err := initSystemTargets(cfg.HTTP) @@ -139,73 +153,47 @@ func UpdateSystemTargets(cfg Config) error { } swapSystemMuRW.Lock() - defer swapSystemMuRW.Unlock() - - for _, tgt := range systemTargets { - // Preserve console target when dynamically updating - // other HTTP targets, console target is always present. - if tgt.Type() == types.TargetConsole { - newTgts = append(newTgts, tgt) - break - } - } - - cancelAllSystemTargets() // cancel running targets + consoleTargets, otherTargets := splitTargets(systemTargets, types.TargetConsole) + newTgts = append(newTgts, consoleTargets...) systemTargets = newTgts + swapSystemMuRW.Unlock() + cancelTargets(otherTargets) // cancel running targets return nil } -func cancelAuditTargetType(t types.TargetType) { - for _, tgt := range auditTargets { - if tgt.Type() == t { - tgt.Cancel() - } - } -} - -func existingAuditTargets(t types.TargetType) []Target { - tgts := make([]Target, 0, len(auditTargets)) - for _, tgt := range auditTargets { - if tgt.Type() == t { - tgts = append(tgts, tgt) - } - } - return tgts -} - // UpdateAuditWebhookTargets swaps audit webhook targets with newly loaded ones from the cfg func UpdateAuditWebhookTargets(cfg Config) error { - newTgts, err := initSystemTargets(cfg.AuditWebhook) + newWebhookTgts, err := initSystemTargets(cfg.AuditWebhook) if err != nil { return err } - // retain kafka targets swapAuditMuRW.Lock() - defer swapAuditMuRW.Unlock() - - newTgts = append(existingAuditTargets(types.TargetKafka), newTgts...) - cancelAuditTargetType(types.TargetHTTP) // cancel running targets - auditTargets = newTgts + // Retain kafka targets + oldWebhookTgts, otherTgts := splitTargets(auditTargets, types.TargetHTTP) + newWebhookTgts = append(newWebhookTgts, otherTgts...) + auditTargets = newWebhookTgts + swapAuditMuRW.Unlock() + cancelTargets(oldWebhookTgts) // cancel running targets return nil } // UpdateAuditKafkaTargets swaps audit kafka targets with newly loaded ones from the cfg func UpdateAuditKafkaTargets(cfg Config) error { - updated, err := initKafkaTargets(cfg.AuditKafka) + newKafkaTgts, err := initKafkaTargets(cfg.AuditKafka) if err != nil { return err } swapAuditMuRW.Lock() - defer swapAuditMuRW.Unlock() - - // retain HTTP targets - updated = append(existingAuditTargets(types.TargetHTTP), updated...) - cancelAuditTargetType(types.TargetKafka) // cancel running targets - auditTargets = updated + // Retain webhook targets + oldKafkaTgts, otherTgts := splitTargets(auditTargets, types.TargetKafka) + newKafkaTgts = append(newKafkaTgts, otherTgts...) + auditTargets = newKafkaTgts + swapAuditMuRW.Unlock() + cancelTargets(oldKafkaTgts) // cancel running targets return nil }