From acc9c033edfbac9b2c4a97b0c764add7c961c155 Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Sat, 24 Dec 2022 04:49:07 +0100 Subject: [PATCH] debug: Add X-Amz-Request-ID to lock/unlock calls (#16309) --- cmd/admin-handlers.go | 4 +-- cmd/background-newdisks-heal-ops.go | 2 +- cmd/bucket-replication.go | 4 +-- cmd/callhome.go | 2 +- cmd/disk-cache-backend.go | 16 +++++----- cmd/disk-cache.go | 2 +- cmd/erasure-healing.go | 4 +-- cmd/erasure-multipart.go | 16 +++++----- cmd/erasure-object.go | 16 +++++----- cmd/erasure-server-pool-rebalance.go | 2 +- cmd/erasure-server-pool.go | 19 +++++------ cmd/namespace-lock.go | 32 +++++++++---------- cmd/object-handlers.go | 2 +- cmd/server-main.go | 4 +-- internal/dsync/drwmutex.go | 43 +++++++++++++++++-------- internal/dsync/drwmutex_test.go | 32 +++++++++---------- internal/dsync/dsync_test.go | 48 +++++++++++++++------------- 17 files changed, 133 insertions(+), 115 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 333ed5c25..23c23a993 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1170,7 +1170,7 @@ func (a adminAPIHandlers) NetperfHandler(w http.ResponseWriter, r *http.Request) writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(toAPIErrorCode(ctx, err)), r.URL) return } - defer nsLock.Unlock(lkctx.Cancel) + defer nsLock.Unlock(lkctx) durationStr := r.Form.Get(peerRESTDuration) duration, err := time.ParseDuration(durationStr) @@ -2286,7 +2286,7 @@ func (a adminAPIHandlers) HealthInfoHandler(w http.ResponseWriter, r *http.Reque return } - defer nsLock.Unlock(lkctx.Cancel) + defer nsLock.Unlock(lkctx) healthCtx, healthCancel := context.WithTimeout(lkctx.Context(), deadline) defer healthCancel() diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index cca0c85c1..9e4b10ee5 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -316,7 +316,7 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint return err } ctx = lkctx.Context() - defer locker.Unlock(lkctx.Cancel) + defer locker.Unlock(lkctx) buckets, _ := z.ListBuckets(ctx, BucketOptions{}) // Buckets data are dispersed in multiple zones/sets, make diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index ccdcb437a..198f29a54 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -425,7 +425,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj return } ctx = lkctx.Context() - defer lk.Unlock(lkctx.Cancel) + defer lk.Unlock(lkctx) var wg sync.WaitGroup var rinfos replicatedInfos @@ -937,7 +937,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje return } ctx = lkctx.Context() - defer lk.Unlock(lkctx.Cancel) + defer lk.Unlock(lkctx) var wg sync.WaitGroup var rinfos replicatedInfos diff --git a/cmd/callhome.go b/cmd/callhome.go index 53eb29825..b1a10da42 100644 --- a/cmd/callhome.go +++ b/cmd/callhome.go @@ -79,7 +79,7 @@ func runCallhome(ctx context.Context, objAPI ObjectLayer) bool { } ctx = lkctx.Context() - defer locker.Unlock(lkctx.Cancel) + defer locker.Unlock(lkctx) callhomeTimer := time.NewTimer(globalCallhomeConfig.FrequencyDur()) defer callhomeTimer.Stop() diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index dce5578ec..2ab9b79ae 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -512,7 +512,7 @@ func (c *diskCache) statCachedMeta(ctx context.Context, cacheObjPath string) (me return } ctx = lkctx.Context() - defer cLock.RUnlock(lkctx.Cancel) + defer cLock.RUnlock(lkctx) return c.statCache(ctx, cacheObjPath) } @@ -597,7 +597,7 @@ func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, met return err } ctx = lkctx.Context() - defer cLock.Unlock(lkctx.Cancel) + defer cLock.Unlock(lkctx) if err = c.saveMetadata(ctx, bucket, object, meta, actualSize, rs, rsFileName, incHitsOnly); err != nil { return err } @@ -842,7 +842,7 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read return oi, err } ctx = lkctx.Context() - defer cLock.Unlock(lkctx.Cancel) + defer cLock.Unlock(lkctx) return c.put(ctx, bucket, object, data, size, rs, opts, incHitsOnly, writeback) } @@ -1076,7 +1076,7 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang return nil, numHits, err } ctx = lkctx.Context() - defer cLock.RUnlock(lkctx.Cancel) + defer cLock.RUnlock(lkctx) var objInfo ObjectInfo var rngInfo RangeInfo @@ -1213,7 +1213,7 @@ func (c *diskCache) Delete(ctx context.Context, bucket, object string) (err erro if err != nil { return err } - defer cLock.Unlock(lkctx.Cancel) + defer cLock.Unlock(lkctx) return removeAll(cacheObjPath) } @@ -1324,7 +1324,7 @@ func (c *diskCache) PutObjectPart(ctx context.Context, bucket, object, uploadID } ctx = lkctx.Context() - defer partIDLock.Unlock(lkctx.Cancel) + defer partIDLock.Unlock(lkctx) meta, _, _, err := c.statCache(ctx, uploadIDDir) // Case where object not yet cached if err != nil { @@ -1381,7 +1381,7 @@ func (c *diskCache) SavePartMetadata(ctx context.Context, bucket, object, upload if err != nil { return err } - defer uploadLock.Unlock(ulkctx.Cancel) + defer uploadLock.Unlock(ulkctx) metaPath := pathJoin(uploadDir, cacheMetaJSONFile) f, err := OpenFile(metaPath, os.O_RDWR|writeMode, 0o666) @@ -1480,7 +1480,7 @@ func (c *diskCache) CompleteMultipartUpload(ctx context.Context, bucket, object, } ctx = lkctx.Context() - defer cLock.Unlock(lkctx.Cancel) + defer cLock.Unlock(lkctx) mpartCachePath := getMultipartCacheSHADir(c.dir, bucket, object) uploadIDDir := path.Join(mpartCachePath, uploadID) diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index 4171ee04b..904c7407a 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -728,7 +728,7 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r * if cerr != nil { return putObjectFn(ctx, bucket, object, r, opts) } - defer cLock.Unlock(lkctx.Cancel) + defer cLock.Unlock(lkctx) // Initialize pipe to stream data to backend pipeReader, pipeWriter := io.Pipe() hashReader, err := hash.NewReader(pipeReader, size, "", "", r.ActualSize()) diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 38430a1a3..3aea80302 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -324,7 +324,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object return result, err } ctx = lkctx.Context() - defer lk.Unlock(lkctx.Cancel) + defer lk.Unlock(lkctx) } // Re-read when we have lock... @@ -695,7 +695,7 @@ func (er *erasureObjects) checkAbandonedParts(ctx context.Context, bucket string return err } ctx = lkctx.Context() - defer lk.Unlock(lkctx.Cancel) + defer lk.Unlock(lkctx) } var wg sync.WaitGroup for _, disk := range er.getDisks() { diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 38fdc0ce3..9e8aaa000 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -349,7 +349,7 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, } rctx := lkctx.Context() obj, err := er.getObjectInfo(rctx, bucket, object, opts) - lk.RUnlock(lkctx.Cancel) + lk.RUnlock(lkctx) if err != nil && !isErrVersionNotFound(err) { return nil, err } @@ -557,7 +557,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo return PartInfo{}, err } rctx := rlkctx.Context() - defer uploadIDRLock.RUnlock(rlkctx.Cancel) + defer uploadIDRLock.RUnlock(rlkctx) uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) // Validates if upload ID exists. @@ -583,7 +583,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo return PartInfo{}, err } pctx := plkctx.Context() - defer partIDLock.Unlock(plkctx.Cancel) + defer partIDLock.Unlock(plkctx) onlineDisks := er.getDisks() writeQuorum := fi.WriteQuorum(er.defaultWQuorum()) @@ -755,7 +755,7 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u return MultipartInfo{}, err } ctx = lkctx.Context() - defer uploadIDLock.RUnlock(lkctx.Cancel) + defer uploadIDLock.RUnlock(lkctx) fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false) if err != nil { @@ -782,7 +782,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up return ListPartsInfo{}, err } ctx = lkctx.Context() - defer uploadIDLock.RUnlock(lkctx.Cancel) + defer uploadIDLock.RUnlock(lkctx) fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false) if err != nil { @@ -915,7 +915,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str return oi, err } wctx := wlkctx.Context() - defer uploadIDLock.Unlock(wlkctx.Cancel) + defer uploadIDLock.Unlock(wlkctx) fi, partsMetadata, err := er.checkUploadIDExists(wctx, bucket, object, uploadID, true) if err != nil { @@ -1184,7 +1184,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str return oi, err } ctx = lkctx.Context() - defer lk.Unlock(lkctx.Cancel) + defer lk.Unlock(lkctx) // Write final `xl.meta` at uploadID location onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum) @@ -1263,7 +1263,7 @@ func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, objec return err } ctx = lkctx.Context() - defer lk.Unlock(lkctx.Cancel) + defer lk.Unlock(lkctx) // Validates if upload ID exists. if _, _, err = er.checkUploadIDExists(ctx, bucket, object, uploadID, false); err != nil { diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 5e5ae62fd..870ba0c1a 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -81,7 +81,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d return oi, err } ctx = lkctx.Context() - defer lk.Unlock(lkctx.Cancel) + defer lk.Unlock(lkctx) } // Read metadata associated with the object from all disks. storageDisks := er.getDisks() @@ -206,14 +206,14 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri return nil, err } ctx = lkctx.Context() - nsUnlocker = func() { lock.Unlock(lkctx.Cancel) } + nsUnlocker = func() { lock.Unlock(lkctx) } case readLock: lkctx, err := lock.GetRLock(ctx, globalOperationTimeout) if err != nil { return nil, err } ctx = lkctx.Context() - nsUnlocker = func() { lock.RUnlock(lkctx.Cancel) } + nsUnlocker = func() { lock.RUnlock(lkctx) } } unlockOnDefer = true } @@ -434,7 +434,7 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin return ObjectInfo{}, err } ctx = lkctx.Context() - defer lk.RUnlock(lkctx.Cancel) + defer lk.RUnlock(lkctx) } return er.getObjectInfo(ctx, bucket, object, opts) @@ -1195,7 +1195,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st return ObjectInfo{}, err } ctx = lkctx.Context() - defer lk.Unlock(lkctx.Cancel) + defer lk.Unlock(lkctx) } modTime := opts.MTime @@ -1702,7 +1702,7 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s return ObjectInfo{}, err } ctx = lkctx.Context() - defer lk.Unlock(lkctx.Cancel) + defer lk.Unlock(lkctx) } disks := er.getDisks() @@ -1776,7 +1776,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin return ObjectInfo{}, err } ctx = lkctx.Context() - defer lk.Unlock(lkctx.Cancel) + defer lk.Unlock(lkctx) disks := er.getDisks() @@ -1886,7 +1886,7 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st return err } ctx = lkctx.Context() - defer lk.Unlock(lkctx.Cancel) + defer lk.Unlock(lkctx) fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true) if err != nil { diff --git a/cmd/erasure-server-pool-rebalance.go b/cmd/erasure-server-pool-rebalance.go index ba04cda97..d4f57b008 100644 --- a/cmd/erasure-server-pool-rebalance.go +++ b/cmd/erasure-server-pool-rebalance.go @@ -655,7 +655,7 @@ func (z *erasureServerPools) saveRebalanceStats(ctx context.Context, poolIdx int logger.LogIf(ctx, fmt.Errorf("failed to acquire write lock on %s/%s: %w", minioMetaBucket, rebalMetaName, err)) return err } - defer lock.Unlock(lkCtx.Cancel) + defer lock.Unlock(lkCtx) ctx = lkCtx.Context() noLockOpts := ObjectOptions{NoLock: true} diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 6c1141697..2df31dfff 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -708,8 +708,9 @@ func (z *erasureServerPools) MakeBucket(ctx context.Context, bucket string, opts if err != nil { return err } + ctx = lkctx.Context() - defer lk.Unlock(lkctx.Cancel) + defer lk.Unlock(lkctx) } // Create buckets in parallel across all sets. @@ -789,14 +790,14 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object return nil, err } ctx = lkctx.Context() - nsUnlocker = func() { lock.Unlock(lkctx.Cancel) } + nsUnlocker = func() { lock.Unlock(lkctx) } case readLock: lkctx, err := lock.GetRLock(ctx, globalOperationTimeout) if err != nil { return nil, err } ctx = lkctx.Context() - nsUnlocker = func() { lock.RUnlock(lkctx.Cancel) } + nsUnlocker = func() { lock.RUnlock(lkctx) } } unlockOnDefer = true } @@ -917,7 +918,7 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s return ObjectInfo{}, err } ctx = lkctx.Context() - defer lk.RUnlock(lkctx.Cancel) + defer lk.RUnlock(lkctx) } objInfo, _, err = z.getLatestObjectInfoWithIdx(ctx, bucket, object, opts) @@ -946,7 +947,7 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec return ObjectInfo{}, err } ctx = lkctx.Context() - defer ns.Unlock(lkctx.Cancel) + defer ns.Unlock(lkctx) opts.NoLock = true } @@ -987,7 +988,7 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob return ObjectInfo{}, err } ctx = lkctx.Context() - defer lk.Unlock(lkctx.Cancel) + defer lk.Unlock(lkctx) gopts := opts gopts.NoLock = true @@ -1032,7 +1033,7 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o return dobjects, derrs } ctx = lkctx.Context() - defer multiDeleteLock.Unlock(lkctx.Cancel) + defer multiDeleteLock.Unlock(lkctx) // Fetch location of up to 10 objects concurrently. poolObjIdxMap := map[int][]ObjectToDelete{} @@ -1132,7 +1133,7 @@ func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObjec return ObjectInfo{}, err } ctx = lkctx.Context() - defer ns.Unlock(lkctx.Cancel) + defer ns.Unlock(lkctx) dstOpts.NoLock = true } @@ -1784,7 +1785,7 @@ func (z *erasureServerPools) HealFormat(ctx context.Context, dryRun bool) (madmi return madmin.HealResultItem{}, err } ctx = lkctx.Context() - defer formatLock.Unlock(lkctx.Cancel) + defer formatLock.Unlock(lkctx) r := madmin.HealResultItem{ Type: madmin.HealItemMetadata, diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 586d7cd8e..1f85260b7 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -39,9 +39,9 @@ var globalLockServer *localLocker // RWLocker - locker interface to introduce GetRLock, RUnlock. type RWLocker interface { GetLock(ctx context.Context, timeout *dynamicTimeout) (lkCtx LockContext, timedOutErr error) - Unlock(cancel context.CancelFunc) + Unlock(lkCtx LockContext) GetRLock(ctx context.Context, timeout *dynamicTimeout) (lkCtx LockContext, timedOutErr error) - RUnlock(cancel context.CancelFunc) + RUnlock(lkCtx LockContext) } // LockContext lock context holds the lock backed context and canceler for the context. @@ -182,11 +182,11 @@ func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout } // Unlock - block until write lock is released. -func (di *distLockInstance) Unlock(cancel context.CancelFunc) { - if cancel != nil { - cancel() +func (di *distLockInstance) Unlock(lc LockContext) { + if lc.cancel != nil { + lc.cancel() } - di.rwMutex.Unlock() + di.rwMutex.Unlock(lc.ctx) } // RLock - block until read lock is taken or timeout has occurred. @@ -212,11 +212,11 @@ func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeou } // RUnlock - block until read lock is released. -func (di *distLockInstance) RUnlock(cancel context.CancelFunc) { - if cancel != nil { - cancel() +func (di *distLockInstance) RUnlock(lc LockContext) { + if lc.cancel != nil { + lc.cancel() } - di.rwMutex.RUnlock() + di.rwMutex.RUnlock(lc.ctx) } // localLockInstance - frontend/top-level interface for namespace locks. @@ -270,9 +270,9 @@ func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeou } // Unlock - block until write lock is released. -func (li *localLockInstance) Unlock(cancel context.CancelFunc) { - if cancel != nil { - cancel() +func (li *localLockInstance) Unlock(lc LockContext) { + if lc.cancel != nil { + lc.cancel() } const readLock = false for _, path := range li.paths { @@ -307,9 +307,9 @@ func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeo } // RUnlock - block until read lock is released. -func (li *localLockInstance) RUnlock(cancel context.CancelFunc) { - if cancel != nil { - cancel() +func (li *localLockInstance) RUnlock(lc LockContext) { + if lc.cancel != nil { + lc.cancel() } const readLock = true for _, path := range li.paths { diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 74f34342f..d0696a4b1 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -194,7 +194,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r return } ctx = lkctx.Context() - defer lock.RUnlock(lkctx.Cancel) + defer lock.RUnlock(lkctx) getObjectNInfo := objectAPI.GetObjectNInfo if api.CacheAPI() != nil { diff --git a/cmd/server-main.go b/cmd/server-main.go index be6e49563..5d8cf3e90 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -422,7 +422,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { // Upon success migrating the config, initialize all sub-systems // if all sub-systems initialized successfully return right away if err = initConfigSubsystem(lkctx.Context(), newObject); err == nil { - txnLk.Unlock(lkctx.Cancel) + txnLk.Unlock(lkctx) // All successful return. if globalIsDistErasure { // These messages only meant primarily for distributed setup, so only log during distributed setup. @@ -433,7 +433,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { } // Unlock the transaction lock and allow other nodes to acquire the lock if possible. - txnLk.Unlock(lkctx.Cancel) + txnLk.Unlock(lkctx) if configRetriableErrors(err) { logger.Info("Waiting for all MinIO sub-systems to be initialized.. possible cause (%v)", err) diff --git a/internal/dsync/drwmutex.go b/internal/dsync/drwmutex.go index 17c696398..293e49e22 100644 --- a/internal/dsync/drwmutex.go +++ b/internal/dsync/drwmutex.go @@ -27,6 +27,7 @@ import ( "sync" "time" + "github.com/minio/minio/internal/mcontext" "github.com/minio/pkg/console" ) @@ -429,6 +430,15 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is // Combined timeout for the lock attempt. ctx, cancel := context.WithTimeout(ctx, ds.Timeouts.Acquire) defer cancel() + + // Special context for NetLockers - do not use timeouts. + // Also, pass the trace context info if found for debugging + netLockCtx := context.Background() + tc, ok := ctx.Value(mcontext.ContextTraceKey).(*mcontext.TraceCtxt) + if ok { + netLockCtx = context.WithValue(netLockCtx, mcontext.ContextTraceKey, tc) + } + for index, c := range restClnts { wg.Add(1) // broadcast lock request to all nodes @@ -445,11 +455,11 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is var locked bool var err error if isReadLock { - if locked, err = c.RLock(context.Background(), args); err != nil { + if locked, err = c.RLock(netLockCtx, args); err != nil { log("dsync: Unable to call RLock failed with %s for %#v at %s\n", err, args, c) } } else { - if locked, err = c.Lock(context.Background(), args); err != nil { + if locked, err = c.Lock(netLockCtx, args); err != nil { log("dsync: Unable to call Lock failed with %s for %#v at %s\n", err, args, c) } } @@ -502,7 +512,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is if !quorumLocked { log("dsync: Unable to acquire lock in quorum %#v\n", args) // Release all acquired locks without quorum. - if !releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, names...) { + if !releaseAll(ctx, ds, tolerance, owner, locks, isReadLock, restClnts, names...) { log("Unable to release acquired locks, these locks will expire automatically %#v\n", args) } } @@ -515,7 +525,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is if grantToBeReleased.isLocked() { // release abandoned lock log("Releasing abandoned lock\n") - sendRelease(ds, restClnts[grantToBeReleased.index], + sendRelease(ctx, ds, restClnts[grantToBeReleased.index], owner, grantToBeReleased.lockUID, isReadLock, names...) } } @@ -564,13 +574,13 @@ func checkQuorumLocked(locks *[]string, quorum int) bool { } // releaseAll releases all locks that are marked as locked -func releaseAll(ds *Dsync, tolerance int, owner string, locks *[]string, isReadLock bool, restClnts []NetLocker, names ...string) bool { +func releaseAll(ctx context.Context, ds *Dsync, tolerance int, owner string, locks *[]string, isReadLock bool, restClnts []NetLocker, names ...string) bool { var wg sync.WaitGroup for lockID := range restClnts { wg.Add(1) go func(lockID int) { defer wg.Done() - if sendRelease(ds, restClnts[lockID], owner, (*locks)[lockID], isReadLock, names...) { + if sendRelease(ctx, ds, restClnts[lockID], owner, (*locks)[lockID], isReadLock, names...) { (*locks)[lockID] = "" } }(lockID) @@ -587,7 +597,7 @@ func releaseAll(ds *Dsync, tolerance int, owner string, locks *[]string, isReadL // Unlock unlocks the write lock. // // It is a run-time error if dm is not locked on entry to Unlock. -func (dm *DRWMutex) Unlock() { +func (dm *DRWMutex) Unlock(ctx context.Context) { dm.m.Lock() dm.cancelRefresh() dm.m.Unlock() @@ -620,7 +630,7 @@ func (dm *DRWMutex) Unlock() { tolerance := len(restClnts) / 2 isReadLock := false - for !releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) { + for !releaseAll(ctx, dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) { time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval))) } } @@ -628,7 +638,7 @@ func (dm *DRWMutex) Unlock() { // RUnlock releases a read lock held on dm. // // It is a run-time error if dm is not locked on entry to RUnlock. -func (dm *DRWMutex) RUnlock() { +func (dm *DRWMutex) RUnlock(ctx context.Context) { dm.m.Lock() dm.cancelRefresh() dm.m.Unlock() @@ -661,13 +671,13 @@ func (dm *DRWMutex) RUnlock() { tolerance := len(restClnts) / 2 isReadLock := true - for !releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) { + for !releaseAll(ctx, dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) { time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval))) } } // sendRelease sends a release message to a node that previously granted a lock -func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bool, names ...string) bool { +func sendRelease(ctx context.Context, ds *Dsync, c NetLocker, owner string, uid string, isReadLock bool, names ...string) bool { if c == nil { log("Unable to call RUnlock failed with %s\n", errors.New("netLocker is offline")) return false @@ -683,16 +693,21 @@ func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bo Resources: names, } - ctx, cancel := context.WithTimeout(context.Background(), ds.Timeouts.UnlockCall) + netLockCtx, cancel := context.WithTimeout(context.Background(), ds.Timeouts.UnlockCall) defer cancel() + tc, ok := ctx.Value(mcontext.ContextTraceKey).(*mcontext.TraceCtxt) + if ok { + netLockCtx = context.WithValue(netLockCtx, mcontext.ContextTraceKey, tc) + } + if isReadLock { - if _, err := c.RUnlock(ctx, args); err != nil { + if _, err := c.RUnlock(netLockCtx, args); err != nil { log("dsync: Unable to call RUnlock failed with %s for %#v at %s\n", err, args, c) return false } } else { - if _, err := c.Unlock(ctx, args); err != nil { + if _, err := c.Unlock(netLockCtx, args); err != nil { log("dsync: Unable to call Unlock failed with %s for %#v at %s\n", err, args, c) return false } diff --git a/internal/dsync/drwmutex_test.go b/internal/dsync/drwmutex_test.go index d145f769d..526e1717d 100644 --- a/internal/dsync/drwmutex_test.go +++ b/internal/dsync/drwmutex_test.go @@ -48,13 +48,13 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) { go func() { time.Sleep(2 * testDrwMutexAcquireTimeout) - drwm1.RUnlock() + drwm1.RUnlock(context.Background()) // fmt.Println("1st read lock released, waiting...") }() go func() { time.Sleep(3 * testDrwMutexAcquireTimeout) - drwm2.RUnlock() + drwm2.RUnlock(context.Background()) // fmt.Println("2nd read lock released, waiting...") }() @@ -66,7 +66,7 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) { // fmt.Println("Write lock acquired, waiting...") time.Sleep(testDrwMutexAcquireTimeout) - drwm3.Unlock() + drwm3.Unlock(context.Background()) } // fmt.Println("Write lock failed due to timeout") return @@ -101,7 +101,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) { go func() { time.Sleep(3 * testDrwMutexAcquireTimeout) - drwm1.Unlock() + drwm1.Unlock(context.Background()) // fmt.Println("Initial write lock released, waiting...") }() @@ -113,7 +113,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) { // fmt.Println("2nd write lock acquired, waiting...") time.Sleep(testDrwMutexAcquireTimeout) - drwm2.Unlock() + drwm2.Unlock(context.Background()) } // fmt.Println("2nd write lock failed due to timeout") return @@ -144,7 +144,7 @@ func parallelReader(ctx context.Context, m *DRWMutex, clocked, cunlock, cdone ch if m.GetRLock(ctx, nil, id, source, Options{Timeout: time.Second}) { clocked <- true <-cunlock - m.RUnlock() + m.RUnlock(context.Background()) cdone <- true } } @@ -193,7 +193,7 @@ func reader(resource string, numIterations int, activity *int32, cdone chan bool for i := 0; i < 100; i++ { } atomic.AddInt32(activity, -1) - rwm.RUnlock() + rwm.RUnlock(context.Background()) } } cdone <- true @@ -211,7 +211,7 @@ func writer(resource string, numIterations int, activity *int32, cdone chan bool for i := 0; i < 100; i++ { } atomic.AddInt32(activity, -10000) - rwm.Unlock() + rwm.Unlock(context.Background()) } } cdone <- true @@ -268,7 +268,7 @@ func TestUnlockPanic(t *testing.T) { } }() mu := NewDRWMutex(ds, "test") - mu.Unlock() + mu.Unlock(context.Background()) } // Borrowed from rwmutex_test.go @@ -278,10 +278,10 @@ func TestUnlockPanic2(t *testing.T) { if recover() == nil { t.Fatalf("unlock of unlocked RWMutex did not panic") } - mu.RUnlock() // Unlock, so -test.count > 1 works + mu.RUnlock(context.Background()) // Unlock, so -test.count > 1 works }() mu.RLock(id, source) - mu.Unlock() + mu.Unlock(context.Background()) } // Borrowed from rwmutex_test.go @@ -292,7 +292,7 @@ func TestRUnlockPanic(t *testing.T) { } }() mu := NewDRWMutex(ds, "test") - mu.RUnlock() + mu.RUnlock(context.Background()) } // Borrowed from rwmutex_test.go @@ -302,10 +302,10 @@ func TestRUnlockPanic2(t *testing.T) { if recover() == nil { t.Fatalf("read unlock of unlocked RWMutex did not panic") } - mu.Unlock() // Unlock, so -test.count > 1 works + mu.Unlock(context.Background()) // Unlock, so -test.count > 1 works }() mu.Lock(id, source) - mu.RUnlock() + mu.RUnlock(context.Background()) } // Borrowed from rwmutex_test.go @@ -320,14 +320,14 @@ func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) { foo++ if foo%writeRatio == 0 { rwm.Lock(id, source) - rwm.Unlock() + rwm.Unlock(context.Background()) } else { rwm.RLock(id, source) for i := 0; i != localWork; i++ { foo *= 2 foo /= 2 } - rwm.RUnlock() + rwm.RUnlock(context.Background()) } } _ = foo diff --git a/internal/dsync/dsync_test.go b/internal/dsync/dsync_test.go index 8e6364b23..b4556bbe7 100644 --- a/internal/dsync/dsync_test.go +++ b/internal/dsync/dsync_test.go @@ -69,7 +69,7 @@ func TestSimpleLock(t *testing.T) { // fmt.Println("Lock acquired, waiting...") time.Sleep(testDrwMutexRefreshCallTimeout) - dm.Unlock() + dm.Unlock(context.Background()) } func TestSimpleLockUnlockMultipleTimes(t *testing.T) { @@ -77,23 +77,23 @@ func TestSimpleLockUnlockMultipleTimes(t *testing.T) { dm.Lock(id, source) time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond) - dm.Unlock() + dm.Unlock(context.Background()) dm.Lock(id, source) time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond) - dm.Unlock() + dm.Unlock(context.Background()) dm.Lock(id, source) time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond) - dm.Unlock() + dm.Unlock(context.Background()) dm.Lock(id, source) time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond) - dm.Unlock() + dm.Unlock(context.Background()) dm.Lock(id, source) time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond) - dm.Unlock() + dm.Unlock(context.Background()) } // Test two locks for same resource, one succeeds, one fails (after timeout) @@ -108,7 +108,7 @@ func TestTwoSimultaneousLocksForSameResource(t *testing.T) { time.Sleep(5 * testDrwMutexAcquireTimeout) // fmt.Println("Unlocking dm1") - dm1st.Unlock() + dm1st.Unlock(context.Background()) }() dm2nd.Lock(id, source) @@ -116,7 +116,7 @@ func TestTwoSimultaneousLocksForSameResource(t *testing.T) { // fmt.Printf("2nd lock obtained after 1st lock is released\n") time.Sleep(testDrwMutexRefreshCallTimeout * 2) - dm2nd.Unlock() + dm2nd.Unlock(context.Background()) } // Test three locks for same resource, one succeeds, one fails (after timeout) @@ -134,7 +134,7 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) { time.Sleep(2 * testDrwMutexAcquireTimeout) // fmt.Println("Unlocking dm1") - dm1st.Unlock() + dm1st.Unlock(context.Background()) }() expect += 2 * testDrwMutexAcquireTimeout @@ -151,7 +151,7 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) { time.Sleep(2 * testDrwMutexAcquireTimeout) // fmt.Println("Unlocking dm2") - dm2nd.Unlock() + dm2nd.Unlock(context.Background()) }() dm3rd.Lock(id, source) @@ -159,7 +159,7 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) { // fmt.Printf("3rd lock obtained after 1st & 2nd locks are released\n") time.Sleep(testDrwMutexRefreshCallTimeout) - dm3rd.Unlock() + dm3rd.Unlock(context.Background()) }() expect += 2*testDrwMutexAcquireTimeout + testDrwMutexRefreshCallTimeout @@ -173,7 +173,7 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) { time.Sleep(2 * testDrwMutexAcquireTimeout) // fmt.Println("Unlocking dm3") - dm3rd.Unlock() + dm3rd.Unlock(context.Background()) }() dm2nd.Lock(id, source) @@ -181,7 +181,7 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) { // fmt.Printf("2nd lock obtained after 1st & 3rd locks are released\n") time.Sleep(testDrwMutexRefreshCallTimeout) - dm2nd.Unlock() + dm2nd.Unlock(context.Background()) }() expect += 2*testDrwMutexAcquireTimeout + testDrwMutexRefreshCallTimeout @@ -201,8 +201,8 @@ func TestTwoSimultaneousLocksForDifferentResources(t *testing.T) { dm1.Lock(id, source) dm2.Lock(id, source) - dm1.Unlock() - dm2.Unlock() + dm1.Unlock(context.Background()) + dm2.Unlock(context.Background()) } // Test refreshing lock - refresh should always return true @@ -230,7 +230,7 @@ func TestSuccessfulLockRefresh(t *testing.T) { } // Should be safe operation in all cases - dm.Unlock() + dm.Unlock(context.Background()) } // Test canceling context while quorum servers report lock not found @@ -267,7 +267,7 @@ func TestFailedRefreshLock(t *testing.T) { } // Should be safe operation in all cases - dm.Unlock() + dm.Unlock(context.Background()) } // Test Unlock should not timeout @@ -290,7 +290,9 @@ func TestUnlockShouldNotTimeout(t *testing.T) { unlockReturned := make(chan struct{}, 1) go func() { - dm.Unlock() + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + dm.Unlock(ctx) unlockReturned <- struct{}{} }() @@ -308,7 +310,7 @@ func TestUnlockShouldNotTimeout(t *testing.T) { func HammerMutex(m *DRWMutex, loops int, cdone chan bool) { for i := 0; i < loops; i++ { m.Lock(id, source) - m.Unlock() + m.Unlock(context.Background()) } cdone <- true } @@ -340,7 +342,7 @@ func BenchmarkMutexUncontended(b *testing.B) { mu := PaddedMutex{NewDRWMutex(ds, "")} for pb.Next() { mu.Lock(id, source) - mu.Unlock() + mu.Unlock(context.Background()) } }) } @@ -357,7 +359,7 @@ func benchmarkMutex(b *testing.B, slack, work bool) { foo := 0 for pb.Next() { mu.Lock(id, source) - mu.Unlock() + mu.Unlock(context.Background()) if work { for i := 0; i < 100; i++ { foo *= 2 @@ -406,7 +408,7 @@ func BenchmarkMutexNoSpin(b *testing.B) { m.Lock(id, source) acc0 -= 100 acc1 += 100 - m.Unlock() + m.Unlock(context.Background()) } else { for i := 0; i < len(data); i += 4 { data[i]++ @@ -438,7 +440,7 @@ func BenchmarkMutexSpin(b *testing.B) { m.Lock(id, source) acc0 -= 100 acc1 += 100 - m.Unlock() + m.Unlock(context.Background()) for i := 0; i < len(data); i += 4 { data[i]++ }