diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 8ea031798..c963608e0 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -64,11 +64,6 @@ const ( var ( globalHealConfig heal.Config - dataScannerLeaderLockTimeout = newDynamicTimeoutWithOpts(dynamicTimeoutOpts{ - timeout: 30 * time.Second, - minimum: 10 * time.Second, - retryInterval: time.Second, - }) // Sleeper values are updated when config is loaded. scannerSleeper = newDynamicSleeper(10, 10*time.Second, true) scannerCycle = uatomic.NewDuration(dataScannerStartDelay) @@ -157,19 +152,9 @@ func saveBackgroundHealInfo(ctx context.Context, objAPI ObjectLayer, info backgr // runDataScanner will start a data scanner. // The function will block until the context is canceled. // There should only ever be one scanner running per cluster. -func runDataScanner(pctx context.Context, objAPI ObjectLayer) { - // Make sure only 1 scanner is running on the cluster. - locker := objAPI.NewNSLock(minioMetaBucket, "scanner/runDataScanner.lock") - lkctx, err := locker.GetLock(pctx, dataScannerLeaderLockTimeout) - if err != nil { - if intDataUpdateTracker.debug { - logger.LogIf(pctx, err) - } - return - } - ctx := lkctx.Context() - defer lkctx.Cancel() - // No unlock for "leader" lock. +func runDataScanner(ctx context.Context, objAPI ObjectLayer) { + ctx, cancel := globalLeaderLock.GetLock(ctx) + defer cancel() // Load current bloom cycle var cycleInfo currentScannerCycle @@ -181,7 +166,7 @@ func runDataScanner(pctx context.Context, objAPI ObjectLayer) { } else if len(buf) > 8 { cycleInfo.next = binary.LittleEndian.Uint64(buf[:8]) buf = buf[8:] - _, err = cycleInfo.UnmarshalMsg(buf) + _, err := cycleInfo.UnmarshalMsg(buf) logger.LogIf(ctx, err) } diff --git a/cmd/globals.go b/cmd/globals.go index c7fe45601..74fe21542 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -291,6 +291,9 @@ var ( // GlobalKMS initialized KMS configuration GlobalKMS kms.KMS + // Common lock for various subsystems performing the leader tasks + globalLeaderLock *sharedLock + // Auto-Encryption, if enabled, turns any non-SSE-C request // into an SSE-S3 request. If enabled a valid, non-empty KMS // configuration must be present. diff --git a/cmd/server-main.go b/cmd/server-main.go index a3a0d28f4..618de8978 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -581,6 +581,8 @@ func serverMain(ctx *cli.Context) { xhttp.SetDeploymentID(globalDeploymentID) xhttp.SetMinIOVersion(Version) + globalLeaderLock = newSharedLock(GlobalContext, newObject, "leader.lock") + // Enable background operations for erasure coding initAutoHeal(GlobalContext, newObject) initHealMRF(GlobalContext, newObject) diff --git a/cmd/shared-lock.go b/cmd/shared-lock.go new file mode 100644 index 000000000..71bed3d2f --- /dev/null +++ b/cmd/shared-lock.go @@ -0,0 +1,86 @@ +// Copyright (c) 2015-2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "context" + "time" +) + +var sharedLockTimeout = newDynamicTimeoutWithOpts(dynamicTimeoutOpts{ + timeout: 30 * time.Second, + minimum: 10 * time.Second, + retryInterval: time.Minute, +}) + +type sharedLock struct { + lockContext chan LockContext +} + +func (ld sharedLock) backgroundRoutine(ctx context.Context, objAPI ObjectLayer, lockName string) { + for { + locker := objAPI.NewNSLock(minioMetaBucket, lockName) + lkctx, err := locker.GetLock(ctx, sharedLockTimeout) + if err != nil { + continue + } + + for { + select { + case <-ctx.Done(): + return + case <-lkctx.Context().Done(): + // The context of the lock is canceled, this can happen + // if one lock lost quorum due to cluster instability + // in that case, try to lock again. + break + case ld.lockContext <- lkctx: + // Send the lock context to anyone asking for it + } + } + } +} + +func mergeContext(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-ctx1.Done(): + case <-ctx2.Done(): + } + + cancel() + }() + return ctx, cancel +} + +func (ld sharedLock) GetLock(ctx context.Context) (context.Context, context.CancelFunc) { + select { + case l := <-ld.lockContext: + return mergeContext(l.Context(), ctx) + } +} + +func newSharedLock(ctx context.Context, objAPI ObjectLayer, lockName string) *sharedLock { + l := &sharedLock{ + lockContext: make(chan LockContext), + } + go l.backgroundRoutine(ctx, objAPI, lockName) + + return l +} diff --git a/cmd/site-replication.go b/cmd/site-replication.go index 20fa35e3d..b0c5ef84f 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -3577,12 +3577,6 @@ func (c *SiteReplicationSys) PeerEditReq(ctx context.Context, arg madmin.PeerInf const siteHealTimeInterval = 10 * time.Second -var siteReplicationHealLockTimeout = newDynamicTimeoutWithOpts(dynamicTimeoutOpts{ - timeout: 30 * time.Second, - minimum: 10 * time.Second, - retryInterval: time.Second, -}) - func (c *SiteReplicationSys) startHealRoutine(ctx context.Context, objAPI ObjectLayer) { r := rand.New(rand.NewSource(time.Now().UnixNano())) // Run the site replication healing in a loop @@ -3598,15 +3592,8 @@ func (c *SiteReplicationSys) startHealRoutine(ctx context.Context, objAPI Object } func (c *SiteReplicationSys) healRoutine(ctx context.Context, objAPI ObjectLayer) { - // Make sure only one node running site replication on the cluster. - locker := objAPI.NewNSLock(minioMetaBucket, "site-replication/heal.lock") - lkctx, err := locker.GetLock(ctx, siteReplicationHealLockTimeout) - if err != nil { - return - } - ctx = lkctx.Context() - defer lkctx.Cancel() - // No unlock for "leader" lock. + ctx, cancel := globalLeaderLock.GetLock(ctx) + defer cancel() healTimer := time.NewTimer(siteHealTimeInterval) defer healTimer.Stop()