1
0
mirror of https://github.com/minio/minio synced 2024-07-05 17:08:43 +00:00

debug: Add X-Amz-Request-ID to lock/unlock calls (#16309)

This commit is contained in:
Anis Elleuch 2022-12-24 04:49:07 +01:00 committed by GitHub
parent 8528b265a9
commit acc9c033ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 133 additions and 115 deletions

View File

@ -1170,7 +1170,7 @@ func (a adminAPIHandlers) NetperfHandler(w http.ResponseWriter, r *http.Request)
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(toAPIErrorCode(ctx, err)), r.URL)
return
}
defer nsLock.Unlock(lkctx.Cancel)
defer nsLock.Unlock(lkctx)
durationStr := r.Form.Get(peerRESTDuration)
duration, err := time.ParseDuration(durationStr)
@ -2286,7 +2286,7 @@ func (a adminAPIHandlers) HealthInfoHandler(w http.ResponseWriter, r *http.Reque
return
}
defer nsLock.Unlock(lkctx.Cancel)
defer nsLock.Unlock(lkctx)
healthCtx, healthCancel := context.WithTimeout(lkctx.Context(), deadline)
defer healthCancel()

View File

@ -316,7 +316,7 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
return err
}
ctx = lkctx.Context()
defer locker.Unlock(lkctx.Cancel)
defer locker.Unlock(lkctx)
buckets, _ := z.ListBuckets(ctx, BucketOptions{})
// Buckets data are dispersed in multiple zones/sets, make

View File

@ -425,7 +425,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
return
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
defer lk.Unlock(lkctx)
var wg sync.WaitGroup
var rinfos replicatedInfos
@ -937,7 +937,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
return
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
defer lk.Unlock(lkctx)
var wg sync.WaitGroup
var rinfos replicatedInfos

View File

@ -79,7 +79,7 @@ func runCallhome(ctx context.Context, objAPI ObjectLayer) bool {
}
ctx = lkctx.Context()
defer locker.Unlock(lkctx.Cancel)
defer locker.Unlock(lkctx)
callhomeTimer := time.NewTimer(globalCallhomeConfig.FrequencyDur())
defer callhomeTimer.Stop()

View File

@ -512,7 +512,7 @@ func (c *diskCache) statCachedMeta(ctx context.Context, cacheObjPath string) (me
return
}
ctx = lkctx.Context()
defer cLock.RUnlock(lkctx.Cancel)
defer cLock.RUnlock(lkctx)
return c.statCache(ctx, cacheObjPath)
}
@ -597,7 +597,7 @@ func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, met
return err
}
ctx = lkctx.Context()
defer cLock.Unlock(lkctx.Cancel)
defer cLock.Unlock(lkctx)
if err = c.saveMetadata(ctx, bucket, object, meta, actualSize, rs, rsFileName, incHitsOnly); err != nil {
return err
}
@ -842,7 +842,7 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
return oi, err
}
ctx = lkctx.Context()
defer cLock.Unlock(lkctx.Cancel)
defer cLock.Unlock(lkctx)
return c.put(ctx, bucket, object, data, size, rs, opts, incHitsOnly, writeback)
}
@ -1076,7 +1076,7 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
return nil, numHits, err
}
ctx = lkctx.Context()
defer cLock.RUnlock(lkctx.Cancel)
defer cLock.RUnlock(lkctx)
var objInfo ObjectInfo
var rngInfo RangeInfo
@ -1213,7 +1213,7 @@ func (c *diskCache) Delete(ctx context.Context, bucket, object string) (err erro
if err != nil {
return err
}
defer cLock.Unlock(lkctx.Cancel)
defer cLock.Unlock(lkctx)
return removeAll(cacheObjPath)
}
@ -1324,7 +1324,7 @@ func (c *diskCache) PutObjectPart(ctx context.Context, bucket, object, uploadID
}
ctx = lkctx.Context()
defer partIDLock.Unlock(lkctx.Cancel)
defer partIDLock.Unlock(lkctx)
meta, _, _, err := c.statCache(ctx, uploadIDDir)
// Case where object not yet cached
if err != nil {
@ -1381,7 +1381,7 @@ func (c *diskCache) SavePartMetadata(ctx context.Context, bucket, object, upload
if err != nil {
return err
}
defer uploadLock.Unlock(ulkctx.Cancel)
defer uploadLock.Unlock(ulkctx)
metaPath := pathJoin(uploadDir, cacheMetaJSONFile)
f, err := OpenFile(metaPath, os.O_RDWR|writeMode, 0o666)
@ -1480,7 +1480,7 @@ func (c *diskCache) CompleteMultipartUpload(ctx context.Context, bucket, object,
}
ctx = lkctx.Context()
defer cLock.Unlock(lkctx.Cancel)
defer cLock.Unlock(lkctx)
mpartCachePath := getMultipartCacheSHADir(c.dir, bucket, object)
uploadIDDir := path.Join(mpartCachePath, uploadID)

View File

@ -728,7 +728,7 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *
if cerr != nil {
return putObjectFn(ctx, bucket, object, r, opts)
}
defer cLock.Unlock(lkctx.Cancel)
defer cLock.Unlock(lkctx)
// Initialize pipe to stream data to backend
pipeReader, pipeWriter := io.Pipe()
hashReader, err := hash.NewReader(pipeReader, size, "", "", r.ActualSize())

View File

@ -324,7 +324,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
return result, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
defer lk.Unlock(lkctx)
}
// Re-read when we have lock...
@ -695,7 +695,7 @@ func (er *erasureObjects) checkAbandonedParts(ctx context.Context, bucket string
return err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
defer lk.Unlock(lkctx)
}
var wg sync.WaitGroup
for _, disk := range er.getDisks() {

View File

@ -349,7 +349,7 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
}
rctx := lkctx.Context()
obj, err := er.getObjectInfo(rctx, bucket, object, opts)
lk.RUnlock(lkctx.Cancel)
lk.RUnlock(lkctx)
if err != nil && !isErrVersionNotFound(err) {
return nil, err
}
@ -557,7 +557,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
return PartInfo{}, err
}
rctx := rlkctx.Context()
defer uploadIDRLock.RUnlock(rlkctx.Cancel)
defer uploadIDRLock.RUnlock(rlkctx)
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
// Validates if upload ID exists.
@ -583,7 +583,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
return PartInfo{}, err
}
pctx := plkctx.Context()
defer partIDLock.Unlock(plkctx.Cancel)
defer partIDLock.Unlock(plkctx)
onlineDisks := er.getDisks()
writeQuorum := fi.WriteQuorum(er.defaultWQuorum())
@ -755,7 +755,7 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
return MultipartInfo{}, err
}
ctx = lkctx.Context()
defer uploadIDLock.RUnlock(lkctx.Cancel)
defer uploadIDLock.RUnlock(lkctx)
fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false)
if err != nil {
@ -782,7 +782,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
return ListPartsInfo{}, err
}
ctx = lkctx.Context()
defer uploadIDLock.RUnlock(lkctx.Cancel)
defer uploadIDLock.RUnlock(lkctx)
fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false)
if err != nil {
@ -915,7 +915,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
return oi, err
}
wctx := wlkctx.Context()
defer uploadIDLock.Unlock(wlkctx.Cancel)
defer uploadIDLock.Unlock(wlkctx)
fi, partsMetadata, err := er.checkUploadIDExists(wctx, bucket, object, uploadID, true)
if err != nil {
@ -1184,7 +1184,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
return oi, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
defer lk.Unlock(lkctx)
// Write final `xl.meta` at uploadID location
onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum)
@ -1263,7 +1263,7 @@ func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, objec
return err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
defer lk.Unlock(lkctx)
// Validates if upload ID exists.
if _, _, err = er.checkUploadIDExists(ctx, bucket, object, uploadID, false); err != nil {

View File

@ -81,7 +81,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
return oi, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
defer lk.Unlock(lkctx)
}
// Read metadata associated with the object from all disks.
storageDisks := er.getDisks()
@ -206,14 +206,14 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
return nil, err
}
ctx = lkctx.Context()
nsUnlocker = func() { lock.Unlock(lkctx.Cancel) }
nsUnlocker = func() { lock.Unlock(lkctx) }
case readLock:
lkctx, err := lock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return nil, err
}
ctx = lkctx.Context()
nsUnlocker = func() { lock.RUnlock(lkctx.Cancel) }
nsUnlocker = func() { lock.RUnlock(lkctx) }
}
unlockOnDefer = true
}
@ -434,7 +434,7 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.RUnlock(lkctx.Cancel)
defer lk.RUnlock(lkctx)
}
return er.getObjectInfo(ctx, bucket, object, opts)
@ -1195,7 +1195,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
defer lk.Unlock(lkctx)
}
modTime := opts.MTime
@ -1702,7 +1702,7 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
defer lk.Unlock(lkctx)
}
disks := er.getDisks()
@ -1776,7 +1776,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
defer lk.Unlock(lkctx)
disks := er.getDisks()
@ -1886,7 +1886,7 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st
return err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
defer lk.Unlock(lkctx)
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true)
if err != nil {

View File

@ -655,7 +655,7 @@ func (z *erasureServerPools) saveRebalanceStats(ctx context.Context, poolIdx int
logger.LogIf(ctx, fmt.Errorf("failed to acquire write lock on %s/%s: %w", minioMetaBucket, rebalMetaName, err))
return err
}
defer lock.Unlock(lkCtx.Cancel)
defer lock.Unlock(lkCtx)
ctx = lkCtx.Context()
noLockOpts := ObjectOptions{NoLock: true}

View File

@ -708,8 +708,9 @@ func (z *erasureServerPools) MakeBucket(ctx context.Context, bucket string, opts
if err != nil {
return err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
defer lk.Unlock(lkctx)
}
// Create buckets in parallel across all sets.
@ -789,14 +790,14 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
return nil, err
}
ctx = lkctx.Context()
nsUnlocker = func() { lock.Unlock(lkctx.Cancel) }
nsUnlocker = func() { lock.Unlock(lkctx) }
case readLock:
lkctx, err := lock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return nil, err
}
ctx = lkctx.Context()
nsUnlocker = func() { lock.RUnlock(lkctx.Cancel) }
nsUnlocker = func() { lock.RUnlock(lkctx) }
}
unlockOnDefer = true
}
@ -917,7 +918,7 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.RUnlock(lkctx.Cancel)
defer lk.RUnlock(lkctx)
}
objInfo, _, err = z.getLatestObjectInfoWithIdx(ctx, bucket, object, opts)
@ -946,7 +947,7 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer ns.Unlock(lkctx.Cancel)
defer ns.Unlock(lkctx)
opts.NoLock = true
}
@ -987,7 +988,7 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
defer lk.Unlock(lkctx)
gopts := opts
gopts.NoLock = true
@ -1032,7 +1033,7 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o
return dobjects, derrs
}
ctx = lkctx.Context()
defer multiDeleteLock.Unlock(lkctx.Cancel)
defer multiDeleteLock.Unlock(lkctx)
// Fetch location of up to 10 objects concurrently.
poolObjIdxMap := map[int][]ObjectToDelete{}
@ -1132,7 +1133,7 @@ func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObjec
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer ns.Unlock(lkctx.Cancel)
defer ns.Unlock(lkctx)
dstOpts.NoLock = true
}
@ -1784,7 +1785,7 @@ func (z *erasureServerPools) HealFormat(ctx context.Context, dryRun bool) (madmi
return madmin.HealResultItem{}, err
}
ctx = lkctx.Context()
defer formatLock.Unlock(lkctx.Cancel)
defer formatLock.Unlock(lkctx)
r := madmin.HealResultItem{
Type: madmin.HealItemMetadata,

View File

@ -39,9 +39,9 @@ var globalLockServer *localLocker
// RWLocker - locker interface to introduce GetRLock, RUnlock.
type RWLocker interface {
GetLock(ctx context.Context, timeout *dynamicTimeout) (lkCtx LockContext, timedOutErr error)
Unlock(cancel context.CancelFunc)
Unlock(lkCtx LockContext)
GetRLock(ctx context.Context, timeout *dynamicTimeout) (lkCtx LockContext, timedOutErr error)
RUnlock(cancel context.CancelFunc)
RUnlock(lkCtx LockContext)
}
// LockContext lock context holds the lock backed context and canceler for the context.
@ -182,11 +182,11 @@ func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout
}
// Unlock - block until write lock is released.
func (di *distLockInstance) Unlock(cancel context.CancelFunc) {
if cancel != nil {
cancel()
func (di *distLockInstance) Unlock(lc LockContext) {
if lc.cancel != nil {
lc.cancel()
}
di.rwMutex.Unlock()
di.rwMutex.Unlock(lc.ctx)
}
// RLock - block until read lock is taken or timeout has occurred.
@ -212,11 +212,11 @@ func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeou
}
// RUnlock - block until read lock is released.
func (di *distLockInstance) RUnlock(cancel context.CancelFunc) {
if cancel != nil {
cancel()
func (di *distLockInstance) RUnlock(lc LockContext) {
if lc.cancel != nil {
lc.cancel()
}
di.rwMutex.RUnlock()
di.rwMutex.RUnlock(lc.ctx)
}
// localLockInstance - frontend/top-level interface for namespace locks.
@ -270,9 +270,9 @@ func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeou
}
// Unlock - block until write lock is released.
func (li *localLockInstance) Unlock(cancel context.CancelFunc) {
if cancel != nil {
cancel()
func (li *localLockInstance) Unlock(lc LockContext) {
if lc.cancel != nil {
lc.cancel()
}
const readLock = false
for _, path := range li.paths {
@ -307,9 +307,9 @@ func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeo
}
// RUnlock - block until read lock is released.
func (li *localLockInstance) RUnlock(cancel context.CancelFunc) {
if cancel != nil {
cancel()
func (li *localLockInstance) RUnlock(lc LockContext) {
if lc.cancel != nil {
lc.cancel()
}
const readLock = true
for _, path := range li.paths {

View File

@ -194,7 +194,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
return
}
ctx = lkctx.Context()
defer lock.RUnlock(lkctx.Cancel)
defer lock.RUnlock(lkctx)
getObjectNInfo := objectAPI.GetObjectNInfo
if api.CacheAPI() != nil {

View File

@ -422,7 +422,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
// Upon success migrating the config, initialize all sub-systems
// if all sub-systems initialized successfully return right away
if err = initConfigSubsystem(lkctx.Context(), newObject); err == nil {
txnLk.Unlock(lkctx.Cancel)
txnLk.Unlock(lkctx)
// All successful return.
if globalIsDistErasure {
// These messages only meant primarily for distributed setup, so only log during distributed setup.
@ -433,7 +433,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
}
// Unlock the transaction lock and allow other nodes to acquire the lock if possible.
txnLk.Unlock(lkctx.Cancel)
txnLk.Unlock(lkctx)
if configRetriableErrors(err) {
logger.Info("Waiting for all MinIO sub-systems to be initialized.. possible cause (%v)", err)

View File

@ -27,6 +27,7 @@ import (
"sync"
"time"
"github.com/minio/minio/internal/mcontext"
"github.com/minio/pkg/console"
)
@ -429,6 +430,15 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
// Combined timeout for the lock attempt.
ctx, cancel := context.WithTimeout(ctx, ds.Timeouts.Acquire)
defer cancel()
// Special context for NetLockers - do not use timeouts.
// Also, pass the trace context info if found for debugging
netLockCtx := context.Background()
tc, ok := ctx.Value(mcontext.ContextTraceKey).(*mcontext.TraceCtxt)
if ok {
netLockCtx = context.WithValue(netLockCtx, mcontext.ContextTraceKey, tc)
}
for index, c := range restClnts {
wg.Add(1)
// broadcast lock request to all nodes
@ -445,11 +455,11 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
var locked bool
var err error
if isReadLock {
if locked, err = c.RLock(context.Background(), args); err != nil {
if locked, err = c.RLock(netLockCtx, args); err != nil {
log("dsync: Unable to call RLock failed with %s for %#v at %s\n", err, args, c)
}
} else {
if locked, err = c.Lock(context.Background(), args); err != nil {
if locked, err = c.Lock(netLockCtx, args); err != nil {
log("dsync: Unable to call Lock failed with %s for %#v at %s\n", err, args, c)
}
}
@ -502,7 +512,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
if !quorumLocked {
log("dsync: Unable to acquire lock in quorum %#v\n", args)
// Release all acquired locks without quorum.
if !releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, names...) {
if !releaseAll(ctx, ds, tolerance, owner, locks, isReadLock, restClnts, names...) {
log("Unable to release acquired locks, these locks will expire automatically %#v\n", args)
}
}
@ -515,7 +525,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
if grantToBeReleased.isLocked() {
// release abandoned lock
log("Releasing abandoned lock\n")
sendRelease(ds, restClnts[grantToBeReleased.index],
sendRelease(ctx, ds, restClnts[grantToBeReleased.index],
owner, grantToBeReleased.lockUID, isReadLock, names...)
}
}
@ -564,13 +574,13 @@ func checkQuorumLocked(locks *[]string, quorum int) bool {
}
// releaseAll releases all locks that are marked as locked
func releaseAll(ds *Dsync, tolerance int, owner string, locks *[]string, isReadLock bool, restClnts []NetLocker, names ...string) bool {
func releaseAll(ctx context.Context, ds *Dsync, tolerance int, owner string, locks *[]string, isReadLock bool, restClnts []NetLocker, names ...string) bool {
var wg sync.WaitGroup
for lockID := range restClnts {
wg.Add(1)
go func(lockID int) {
defer wg.Done()
if sendRelease(ds, restClnts[lockID], owner, (*locks)[lockID], isReadLock, names...) {
if sendRelease(ctx, ds, restClnts[lockID], owner, (*locks)[lockID], isReadLock, names...) {
(*locks)[lockID] = ""
}
}(lockID)
@ -587,7 +597,7 @@ func releaseAll(ds *Dsync, tolerance int, owner string, locks *[]string, isReadL
// Unlock unlocks the write lock.
//
// It is a run-time error if dm is not locked on entry to Unlock.
func (dm *DRWMutex) Unlock() {
func (dm *DRWMutex) Unlock(ctx context.Context) {
dm.m.Lock()
dm.cancelRefresh()
dm.m.Unlock()
@ -620,7 +630,7 @@ func (dm *DRWMutex) Unlock() {
tolerance := len(restClnts) / 2
isReadLock := false
for !releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) {
for !releaseAll(ctx, dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) {
time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval)))
}
}
@ -628,7 +638,7 @@ func (dm *DRWMutex) Unlock() {
// RUnlock releases a read lock held on dm.
//
// It is a run-time error if dm is not locked on entry to RUnlock.
func (dm *DRWMutex) RUnlock() {
func (dm *DRWMutex) RUnlock(ctx context.Context) {
dm.m.Lock()
dm.cancelRefresh()
dm.m.Unlock()
@ -661,13 +671,13 @@ func (dm *DRWMutex) RUnlock() {
tolerance := len(restClnts) / 2
isReadLock := true
for !releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) {
for !releaseAll(ctx, dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) {
time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval)))
}
}
// sendRelease sends a release message to a node that previously granted a lock
func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bool, names ...string) bool {
func sendRelease(ctx context.Context, ds *Dsync, c NetLocker, owner string, uid string, isReadLock bool, names ...string) bool {
if c == nil {
log("Unable to call RUnlock failed with %s\n", errors.New("netLocker is offline"))
return false
@ -683,16 +693,21 @@ func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bo
Resources: names,
}
ctx, cancel := context.WithTimeout(context.Background(), ds.Timeouts.UnlockCall)
netLockCtx, cancel := context.WithTimeout(context.Background(), ds.Timeouts.UnlockCall)
defer cancel()
tc, ok := ctx.Value(mcontext.ContextTraceKey).(*mcontext.TraceCtxt)
if ok {
netLockCtx = context.WithValue(netLockCtx, mcontext.ContextTraceKey, tc)
}
if isReadLock {
if _, err := c.RUnlock(ctx, args); err != nil {
if _, err := c.RUnlock(netLockCtx, args); err != nil {
log("dsync: Unable to call RUnlock failed with %s for %#v at %s\n", err, args, c)
return false
}
} else {
if _, err := c.Unlock(ctx, args); err != nil {
if _, err := c.Unlock(netLockCtx, args); err != nil {
log("dsync: Unable to call Unlock failed with %s for %#v at %s\n", err, args, c)
return false
}

View File

@ -48,13 +48,13 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) {
go func() {
time.Sleep(2 * testDrwMutexAcquireTimeout)
drwm1.RUnlock()
drwm1.RUnlock(context.Background())
// fmt.Println("1st read lock released, waiting...")
}()
go func() {
time.Sleep(3 * testDrwMutexAcquireTimeout)
drwm2.RUnlock()
drwm2.RUnlock(context.Background())
// fmt.Println("2nd read lock released, waiting...")
}()
@ -66,7 +66,7 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) {
// fmt.Println("Write lock acquired, waiting...")
time.Sleep(testDrwMutexAcquireTimeout)
drwm3.Unlock()
drwm3.Unlock(context.Background())
}
// fmt.Println("Write lock failed due to timeout")
return
@ -101,7 +101,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) {
go func() {
time.Sleep(3 * testDrwMutexAcquireTimeout)
drwm1.Unlock()
drwm1.Unlock(context.Background())
// fmt.Println("Initial write lock released, waiting...")
}()
@ -113,7 +113,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) {
// fmt.Println("2nd write lock acquired, waiting...")
time.Sleep(testDrwMutexAcquireTimeout)
drwm2.Unlock()
drwm2.Unlock(context.Background())
}
// fmt.Println("2nd write lock failed due to timeout")
return
@ -144,7 +144,7 @@ func parallelReader(ctx context.Context, m *DRWMutex, clocked, cunlock, cdone ch
if m.GetRLock(ctx, nil, id, source, Options{Timeout: time.Second}) {
clocked <- true
<-cunlock
m.RUnlock()
m.RUnlock(context.Background())
cdone <- true
}
}
@ -193,7 +193,7 @@ func reader(resource string, numIterations int, activity *int32, cdone chan bool
for i := 0; i < 100; i++ {
}
atomic.AddInt32(activity, -1)
rwm.RUnlock()
rwm.RUnlock(context.Background())
}
}
cdone <- true
@ -211,7 +211,7 @@ func writer(resource string, numIterations int, activity *int32, cdone chan bool
for i := 0; i < 100; i++ {
}
atomic.AddInt32(activity, -10000)
rwm.Unlock()
rwm.Unlock(context.Background())
}
}
cdone <- true
@ -268,7 +268,7 @@ func TestUnlockPanic(t *testing.T) {
}
}()
mu := NewDRWMutex(ds, "test")
mu.Unlock()
mu.Unlock(context.Background())
}
// Borrowed from rwmutex_test.go
@ -278,10 +278,10 @@ func TestUnlockPanic2(t *testing.T) {
if recover() == nil {
t.Fatalf("unlock of unlocked RWMutex did not panic")
}
mu.RUnlock() // Unlock, so -test.count > 1 works
mu.RUnlock(context.Background()) // Unlock, so -test.count > 1 works
}()
mu.RLock(id, source)
mu.Unlock()
mu.Unlock(context.Background())
}
// Borrowed from rwmutex_test.go
@ -292,7 +292,7 @@ func TestRUnlockPanic(t *testing.T) {
}
}()
mu := NewDRWMutex(ds, "test")
mu.RUnlock()
mu.RUnlock(context.Background())
}
// Borrowed from rwmutex_test.go
@ -302,10 +302,10 @@ func TestRUnlockPanic2(t *testing.T) {
if recover() == nil {
t.Fatalf("read unlock of unlocked RWMutex did not panic")
}
mu.Unlock() // Unlock, so -test.count > 1 works
mu.Unlock(context.Background()) // Unlock, so -test.count > 1 works
}()
mu.Lock(id, source)
mu.RUnlock()
mu.RUnlock(context.Background())
}
// Borrowed from rwmutex_test.go
@ -320,14 +320,14 @@ func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) {
foo++
if foo%writeRatio == 0 {
rwm.Lock(id, source)
rwm.Unlock()
rwm.Unlock(context.Background())
} else {
rwm.RLock(id, source)
for i := 0; i != localWork; i++ {
foo *= 2
foo /= 2
}
rwm.RUnlock()
rwm.RUnlock(context.Background())
}
}
_ = foo

View File

@ -69,7 +69,7 @@ func TestSimpleLock(t *testing.T) {
// fmt.Println("Lock acquired, waiting...")
time.Sleep(testDrwMutexRefreshCallTimeout)
dm.Unlock()
dm.Unlock(context.Background())
}
func TestSimpleLockUnlockMultipleTimes(t *testing.T) {
@ -77,23 +77,23 @@ func TestSimpleLockUnlockMultipleTimes(t *testing.T) {
dm.Lock(id, source)
time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond)
dm.Unlock()
dm.Unlock(context.Background())
dm.Lock(id, source)
time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond)
dm.Unlock()
dm.Unlock(context.Background())
dm.Lock(id, source)
time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond)
dm.Unlock()
dm.Unlock(context.Background())
dm.Lock(id, source)
time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond)
dm.Unlock()
dm.Unlock(context.Background())
dm.Lock(id, source)
time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond)
dm.Unlock()
dm.Unlock(context.Background())
}
// Test two locks for same resource, one succeeds, one fails (after timeout)
@ -108,7 +108,7 @@ func TestTwoSimultaneousLocksForSameResource(t *testing.T) {
time.Sleep(5 * testDrwMutexAcquireTimeout)
// fmt.Println("Unlocking dm1")
dm1st.Unlock()
dm1st.Unlock(context.Background())
}()
dm2nd.Lock(id, source)
@ -116,7 +116,7 @@ func TestTwoSimultaneousLocksForSameResource(t *testing.T) {
// fmt.Printf("2nd lock obtained after 1st lock is released\n")
time.Sleep(testDrwMutexRefreshCallTimeout * 2)
dm2nd.Unlock()
dm2nd.Unlock(context.Background())
}
// Test three locks for same resource, one succeeds, one fails (after timeout)
@ -134,7 +134,7 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) {
time.Sleep(2 * testDrwMutexAcquireTimeout)
// fmt.Println("Unlocking dm1")
dm1st.Unlock()
dm1st.Unlock(context.Background())
}()
expect += 2 * testDrwMutexAcquireTimeout
@ -151,7 +151,7 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) {
time.Sleep(2 * testDrwMutexAcquireTimeout)
// fmt.Println("Unlocking dm2")
dm2nd.Unlock()
dm2nd.Unlock(context.Background())
}()
dm3rd.Lock(id, source)
@ -159,7 +159,7 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) {
// fmt.Printf("3rd lock obtained after 1st & 2nd locks are released\n")
time.Sleep(testDrwMutexRefreshCallTimeout)
dm3rd.Unlock()
dm3rd.Unlock(context.Background())
}()
expect += 2*testDrwMutexAcquireTimeout + testDrwMutexRefreshCallTimeout
@ -173,7 +173,7 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) {
time.Sleep(2 * testDrwMutexAcquireTimeout)
// fmt.Println("Unlocking dm3")
dm3rd.Unlock()
dm3rd.Unlock(context.Background())
}()
dm2nd.Lock(id, source)
@ -181,7 +181,7 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) {
// fmt.Printf("2nd lock obtained after 1st & 3rd locks are released\n")
time.Sleep(testDrwMutexRefreshCallTimeout)
dm2nd.Unlock()
dm2nd.Unlock(context.Background())
}()
expect += 2*testDrwMutexAcquireTimeout + testDrwMutexRefreshCallTimeout
@ -201,8 +201,8 @@ func TestTwoSimultaneousLocksForDifferentResources(t *testing.T) {
dm1.Lock(id, source)
dm2.Lock(id, source)
dm1.Unlock()
dm2.Unlock()
dm1.Unlock(context.Background())
dm2.Unlock(context.Background())
}
// Test refreshing lock - refresh should always return true
@ -230,7 +230,7 @@ func TestSuccessfulLockRefresh(t *testing.T) {
}
// Should be safe operation in all cases
dm.Unlock()
dm.Unlock(context.Background())
}
// Test canceling context while quorum servers report lock not found
@ -267,7 +267,7 @@ func TestFailedRefreshLock(t *testing.T) {
}
// Should be safe operation in all cases
dm.Unlock()
dm.Unlock(context.Background())
}
// Test Unlock should not timeout
@ -290,7 +290,9 @@ func TestUnlockShouldNotTimeout(t *testing.T) {
unlockReturned := make(chan struct{}, 1)
go func() {
dm.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
dm.Unlock(ctx)
unlockReturned <- struct{}{}
}()
@ -308,7 +310,7 @@ func TestUnlockShouldNotTimeout(t *testing.T) {
func HammerMutex(m *DRWMutex, loops int, cdone chan bool) {
for i := 0; i < loops; i++ {
m.Lock(id, source)
m.Unlock()
m.Unlock(context.Background())
}
cdone <- true
}
@ -340,7 +342,7 @@ func BenchmarkMutexUncontended(b *testing.B) {
mu := PaddedMutex{NewDRWMutex(ds, "")}
for pb.Next() {
mu.Lock(id, source)
mu.Unlock()
mu.Unlock(context.Background())
}
})
}
@ -357,7 +359,7 @@ func benchmarkMutex(b *testing.B, slack, work bool) {
foo := 0
for pb.Next() {
mu.Lock(id, source)
mu.Unlock()
mu.Unlock(context.Background())
if work {
for i := 0; i < 100; i++ {
foo *= 2
@ -406,7 +408,7 @@ func BenchmarkMutexNoSpin(b *testing.B) {
m.Lock(id, source)
acc0 -= 100
acc1 += 100
m.Unlock()
m.Unlock(context.Background())
} else {
for i := 0; i < len(data); i += 4 {
data[i]++
@ -438,7 +440,7 @@ func BenchmarkMutexSpin(b *testing.B) {
m.Lock(id, source)
acc0 -= 100
acc1 += 100
m.Unlock()
m.Unlock(context.Background())
for i := 0; i < len(data); i += 4 {
data[i]++
}