From b030ef1acaa8bfb25349f195c6a7a91eac937450 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Tue, 1 Mar 2022 11:14:28 -0800 Subject: [PATCH] tests: Clean up dsync package (#14415) Add non-constant timeouts to dsync package. Reduce test runtime by minutes. Hopefully not too aggressive. --- cmd/namespace-lock.go | 1 + internal/dsync/drwmutex.go | 99 +++++++++++++++++++++------------ internal/dsync/drwmutex_test.go | 84 +++++++++++++++------------- internal/dsync/dsync.go | 3 + internal/dsync/dsync_test.go | 69 ++++++++++++++--------- 5 files changed, 154 insertions(+), 102 deletions(-) diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 54e024953..05e60a279 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -225,6 +225,7 @@ func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume if n.isDistErasure { drwmutex := dsync.NewDRWMutex(&dsync.Dsync{ GetLockers: lockers, + Timeouts: dsync.DefaultTimeouts, }, pathsJoinPrefix(volume, paths...)...) return &distLockInstance{drwmutex, opsID} } diff --git a/internal/dsync/drwmutex.go b/internal/dsync/drwmutex.go index 6acbc04ef..f64974f98 100644 --- a/internal/dsync/drwmutex.go +++ b/internal/dsync/drwmutex.go @@ -43,32 +43,61 @@ func log(format string, data ...interface{}) { } } -// dRWMutexAcquireTimeout - tolerance limit to wait for lock acquisition before. -const drwMutexAcquireTimeout = 1 * time.Second // 1 second. +const ( + // dRWMutexAcquireTimeout - default tolerance limit to wait for lock acquisition before. + drwMutexAcquireTimeout = 1 * time.Second // 1 second. -// dRWMutexRefreshTimeout - timeout for the refresh call -const drwMutexRefreshCallTimeout = 5 * time.Second + // dRWMutexRefreshTimeout - default timeout for the refresh call + drwMutexRefreshCallTimeout = 5 * time.Second -// dRWMutexUnlockTimeout - timeout for the unlock call -const drwMutexUnlockCallTimeout = 30 * time.Second + // dRWMutexUnlockTimeout - default timeout for the unlock call + drwMutexUnlockCallTimeout = 30 * time.Second -// dRWMutexForceUnlockTimeout - timeout for the unlock call -const drwMutexForceUnlockCallTimeout = 30 * time.Second + // dRWMutexForceUnlockTimeout - default timeout for the unlock call + drwMutexForceUnlockCallTimeout = 30 * time.Second -// dRWMutexRefreshInterval - the interval between two refresh calls -const drwMutexRefreshInterval = 10 * time.Second + // dRWMutexRefreshInterval - default the interval between two refresh calls + drwMutexRefreshInterval = 10 * time.Second -const drwMutexInfinite = 1<<63 - 1 + lockRetryInterval = 1 * time.Second + + drwMutexInfinite = 1<<63 - 1 +) + +// Timeouts are timeouts for specific operations. +type Timeouts struct { + // Acquire - tolerance limit to wait for lock acquisition before. + Acquire time.Duration + + // RefreshCall - timeout for the refresh call + RefreshCall time.Duration + + // UnlockCall - timeout for the unlock call + UnlockCall time.Duration + + // ForceUnlockCall - timeout for the force unlock call + ForceUnlockCall time.Duration +} + +// DefaultTimeouts contains default timeouts. +var DefaultTimeouts = Timeouts{ + Acquire: drwMutexAcquireTimeout, + RefreshCall: drwMutexUnlockCallTimeout, + UnlockCall: drwMutexRefreshCallTimeout, + ForceUnlockCall: drwMutexForceUnlockCallTimeout, +} // A DRWMutex is a distributed mutual exclusion lock. type DRWMutex struct { - Names []string - writeLocks []string // Array of nodes that granted a write lock - readLocks []string // Array of array of nodes that granted reader locks - rng *rand.Rand - m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node - clnt *Dsync - cancelRefresh context.CancelFunc + Names []string + writeLocks []string // Array of nodes that granted a write lock + readLocks []string // Array of array of nodes that granted reader locks + rng *rand.Rand + m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node + clnt *Dsync + cancelRefresh context.CancelFunc + refreshInterval time.Duration + lockRetryInterval time.Duration } // Granted - represents a structure of a granted lock. @@ -90,11 +119,13 @@ func NewDRWMutex(clnt *Dsync, names ...string) *DRWMutex { restClnts, _ := clnt.GetLockers() sort.Strings(names) return &DRWMutex{ - writeLocks: make([]string, len(restClnts)), - readLocks: make([]string, len(restClnts)), - Names: names, - clnt: clnt, - rng: rand.New(&lockedRandSource{src: rand.NewSource(time.Now().UTC().UnixNano())}), + writeLocks: make([]string, len(restClnts)), + readLocks: make([]string, len(restClnts)), + Names: names, + clnt: clnt, + rng: rand.New(&lockedRandSource{src: rand.NewSource(time.Now().UTC().UnixNano())}), + refreshInterval: drwMutexRefreshInterval, + lockRetryInterval: lockRetryInterval, } } @@ -146,10 +177,6 @@ func (dm *DRWMutex) GetRLock(ctx context.Context, cancel context.CancelFunc, id, return dm.lockBlocking(ctx, cancel, id, source, isReadLock, opts) } -const ( - lockRetryInterval = 1 * time.Second -) - // lockBlocking will try to acquire either a read or a write lock // // The function will loop using a built-in timing randomized back-off @@ -209,7 +236,7 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), i return locked } - time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval))) + time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval))) } } } @@ -224,7 +251,7 @@ func (dm *DRWMutex) startContinousLockRefresh(lockLossCallback func(), id, sourc go func() { defer cancel() - refreshTimer := time.NewTimer(drwMutexRefreshInterval) + refreshTimer := time.NewTimer(dm.refreshInterval) defer refreshTimer.Stop() for { @@ -232,7 +259,7 @@ func (dm *DRWMutex) startContinousLockRefresh(lockLossCallback func(), id, sourc case <-ctx.Done(): return case <-refreshTimer.C: - refreshTimer.Reset(drwMutexRefreshInterval) + refreshTimer.Reset(dm.refreshInterval) noQuorum, err := refreshLock(ctx, dm.clnt, id, source, quorum) if err == nil && noQuorum { @@ -250,7 +277,7 @@ func (dm *DRWMutex) startContinousLockRefresh(lockLossCallback func(), id, sourc } func forceUnlock(ctx context.Context, ds *Dsync, id string) { - ctx, cancel := context.WithTimeout(ctx, drwMutexForceUnlockCallTimeout) + ctx, cancel := context.WithTimeout(ctx, ds.Timeouts.ForceUnlockCall) defer cancel() restClnts, _ := ds.GetLockers() @@ -300,7 +327,7 @@ func refreshLock(ctx context.Context, ds *Dsync, id, source string, quorum int) return } - ctx, cancel := context.WithTimeout(ctx, drwMutexRefreshCallTimeout) + ctx, cancel := context.WithTimeout(ctx, ds.Timeouts.RefreshCall) defer cancel() refreshed, err := c.Refresh(ctx, args) @@ -379,7 +406,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is } // Combined timeout for the lock attempt. - ctx, cancel := context.WithTimeout(ctx, drwMutexAcquireTimeout) + ctx, cancel := context.WithTimeout(ctx, ds.Timeouts.Acquire) defer cancel() for index, c := range restClnts { wg.Add(1) @@ -573,7 +600,7 @@ func (dm *DRWMutex) Unlock() { isReadLock := false for !releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) { - time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval))) + time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval))) } } @@ -614,7 +641,7 @@ func (dm *DRWMutex) RUnlock() { isReadLock := true for !releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) { - time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval))) + time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval))) } } @@ -635,7 +662,7 @@ func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bo Resources: names, } - ctx, cancel := context.WithTimeout(context.Background(), drwMutexUnlockCallTimeout) + ctx, cancel := context.WithTimeout(context.Background(), ds.Timeouts.UnlockCall) defer cancel() if isReadLock { diff --git a/internal/dsync/drwmutex_test.go b/internal/dsync/drwmutex_test.go index be13c97ec..aa45e35d1 100644 --- a/internal/dsync/drwmutex_test.go +++ b/internal/dsync/drwmutex_test.go @@ -47,13 +47,13 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) { // fmt.Println("2nd read lock acquired, waiting...") go func() { - time.Sleep(2 * time.Second) + time.Sleep(2 * testDrwMutexAcquireTimeout) drwm.RUnlock() // fmt.Println("1st read lock released, waiting...") }() go func() { - time.Sleep(3 * time.Second) + time.Sleep(3 * testDrwMutexAcquireTimeout) drwm.RUnlock() // fmt.Println("2nd read lock released, waiting...") }() @@ -63,7 +63,7 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) { locked = drwm.GetLock(ctx3, cancel3, id, source, Options{Timeout: duration}) if locked { // fmt.Println("Write lock acquired, waiting...") - time.Sleep(time.Second) + time.Sleep(testDrwMutexAcquireTimeout) drwm.Unlock() } @@ -72,7 +72,7 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) { } func TestSimpleWriteLockAcquired(t *testing.T) { - locked := testSimpleWriteLock(t, 5*time.Second) + locked := testSimpleWriteLock(t, 10*testDrwMutexAcquireTimeout) expected := true if locked != expected { @@ -81,7 +81,7 @@ func TestSimpleWriteLockAcquired(t *testing.T) { } func TestSimpleWriteLockTimedOut(t *testing.T) { - locked := testSimpleWriteLock(t, time.Second) + locked := testSimpleWriteLock(t, testDrwMutexAcquireTimeout) expected := false if locked != expected { @@ -99,7 +99,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) { } go func() { - time.Sleep(2 * time.Second) + time.Sleep(3 * testDrwMutexAcquireTimeout) drwm.Unlock() // fmt.Println("Initial write lock released, waiting...") }() @@ -109,7 +109,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) { locked = drwm.GetLock(ctx2, cancel2, id, source, Options{Timeout: duration}) if locked { // fmt.Println("2nd write lock acquired, waiting...") - time.Sleep(time.Second) + time.Sleep(testDrwMutexAcquireTimeout) drwm.Unlock() } @@ -118,7 +118,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) { } func TestDualWriteLockAcquired(t *testing.T) { - locked := testDualWriteLock(t, 5*time.Second) + locked := testDualWriteLock(t, 10*testDrwMutexAcquireTimeout) expected := true if locked != expected { @@ -127,7 +127,7 @@ func TestDualWriteLockAcquired(t *testing.T) { } func TestDualWriteLockTimedOut(t *testing.T) { - locked := testDualWriteLock(t, time.Second) + locked := testDualWriteLock(t, testDrwMutexAcquireTimeout) expected := false if locked != expected { @@ -214,25 +214,27 @@ func writer(rwm *DRWMutex, numIterations int, activity *int32, cdone chan bool) } // Borrowed from rwmutex_test.go -func HammerRWMutex(gomaxprocs, numReaders, numIterations int) { - runtime.GOMAXPROCS(gomaxprocs) - // Number of active readers + 10000 * number of active writers. - var activity int32 - rwm := NewDRWMutex(ds, "test") - cdone := make(chan bool) - go writer(rwm, numIterations, &activity, cdone) - var i int - for i = 0; i < numReaders/2; i++ { - go reader(rwm, numIterations, &activity, cdone) - } - go writer(rwm, numIterations, &activity, cdone) - for ; i < numReaders; i++ { - go reader(rwm, numIterations, &activity, cdone) - } - // Wait for the 2 writers and all readers to finish. - for i := 0; i < 2+numReaders; i++ { - <-cdone - } +func hammerRWMutex(t *testing.T, gomaxprocs, numReaders, numIterations int) { + t.Run(fmt.Sprintf("%d-%d-%d", gomaxprocs, numReaders, numIterations), func(t *testing.T) { + runtime.GOMAXPROCS(gomaxprocs) + // Number of active readers + 10000 * number of active writers. + var activity int32 + rwm := NewDRWMutex(ds, "test") + cdone := make(chan bool) + go writer(rwm, numIterations, &activity, cdone) + var i int + for i = 0; i < numReaders/2; i++ { + go reader(rwm, numIterations, &activity, cdone) + } + go writer(rwm, numIterations, &activity, cdone) + for ; i < numReaders; i++ { + go reader(rwm, numIterations, &activity, cdone) + } + // Wait for the 2 writers and all readers to finish. + for i := 0; i < 2+numReaders; i++ { + <-cdone + } + }) } // Borrowed from rwmutex_test.go @@ -242,16 +244,16 @@ func TestRWMutex(t *testing.T) { if testing.Short() { n = 5 } - HammerRWMutex(1, 1, n) - HammerRWMutex(1, 3, n) - HammerRWMutex(1, 10, n) - HammerRWMutex(4, 1, n) - HammerRWMutex(4, 3, n) - HammerRWMutex(4, 10, n) - HammerRWMutex(10, 1, n) - HammerRWMutex(10, 3, n) - HammerRWMutex(10, 10, n) - HammerRWMutex(10, 5, n) + hammerRWMutex(t, 1, 1, n) + hammerRWMutex(t, 1, 3, n) + hammerRWMutex(t, 1, 10, n) + hammerRWMutex(t, 4, 1, n) + hammerRWMutex(t, 4, 3, n) + hammerRWMutex(t, 4, 10, n) + hammerRWMutex(t, 10, 1, n) + hammerRWMutex(t, 10, 3, n) + hammerRWMutex(t, 10, 10, n) + hammerRWMutex(t, 10, 5, n) } // Borrowed from rwmutex_test.go @@ -267,12 +269,13 @@ func TestUnlockPanic(t *testing.T) { // Borrowed from rwmutex_test.go func TestUnlockPanic2(t *testing.T) { + mu := NewDRWMutex(ds, "test-unlock-panic-2") defer func() { if recover() == nil { t.Fatalf("unlock of unlocked RWMutex did not panic") } + mu.RUnlock() // Unlock, so -test.count > 1 works }() - mu := NewDRWMutex(ds, "test-unlock-panic-2") mu.RLock(id, source) mu.Unlock() } @@ -290,12 +293,13 @@ func TestRUnlockPanic(t *testing.T) { // Borrowed from rwmutex_test.go func TestRUnlockPanic2(t *testing.T) { + mu := NewDRWMutex(ds, "test-runlock-panic-2") defer func() { if recover() == nil { t.Fatalf("read unlock of unlocked RWMutex did not panic") } + mu.Unlock() // Unlock, so -test.count > 1 works }() - mu := NewDRWMutex(ds, "test-runlock-panic-2") mu.Lock(id, source) mu.RUnlock() } diff --git a/internal/dsync/dsync.go b/internal/dsync/dsync.go index 2688abd30..02ec60aa4 100644 --- a/internal/dsync/dsync.go +++ b/internal/dsync/dsync.go @@ -22,4 +22,7 @@ package dsync type Dsync struct { // List of rest client objects, one per lock server. GetLockers func() ([]NetLocker, string) + + // Timeouts to apply. + Timeouts Timeouts } diff --git a/internal/dsync/dsync_test.go b/internal/dsync/dsync_test.go index 3154112d3..c717b2db1 100644 --- a/internal/dsync/dsync_test.go +++ b/internal/dsync/dsync_test.go @@ -28,6 +28,14 @@ import ( "github.com/google/uuid" ) +const ( + testDrwMutexAcquireTimeout = 250 * time.Millisecond + testDrwMutexRefreshCallTimeout = 250 * time.Millisecond + testDrwMutexUnlockCallTimeout = 250 * time.Millisecond + testDrwMutexForceUnlockCallTimeout = 250 * time.Millisecond + testDrwMutexRefreshInterval = 100 * time.Millisecond +) + // TestMain initializes the testing framework func TestMain(m *testing.M) { startLockServers() @@ -40,6 +48,12 @@ func TestMain(m *testing.M) { ds = &Dsync{ GetLockers: func() ([]NetLocker, string) { return clnts, uuid.New().String() }, + Timeouts: Timeouts{ + Acquire: testDrwMutexAcquireTimeout, + RefreshCall: testDrwMutexRefreshCallTimeout, + UnlockCall: testDrwMutexUnlockCallTimeout, + ForceUnlockCall: testDrwMutexForceUnlockCallTimeout, + }, } code := m.Run() @@ -53,7 +67,7 @@ func TestSimpleLock(t *testing.T) { dm.Lock(id, source) // fmt.Println("Lock acquired, waiting...") - time.Sleep(2500 * time.Millisecond) + time.Sleep(testDrwMutexRefreshCallTimeout) dm.Unlock() } @@ -91,7 +105,7 @@ func TestTwoSimultaneousLocksForSameResource(t *testing.T) { // Release lock after 10 seconds go func() { - time.Sleep(10 * time.Second) + time.Sleep(5 * testDrwMutexAcquireTimeout) // fmt.Println("Unlocking dm1") dm1st.Unlock() @@ -100,7 +114,7 @@ func TestTwoSimultaneousLocksForSameResource(t *testing.T) { dm2nd.Lock(id, source) // fmt.Printf("2nd lock obtained after 1st lock is released\n") - time.Sleep(2500 * time.Millisecond) + time.Sleep(testDrwMutexRefreshCallTimeout * 2) dm2nd.Unlock() } @@ -112,14 +126,17 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) { dm3rd := NewDRWMutex(ds, "aap") dm1st.Lock(id, source) - + started := time.Now() + var expect time.Duration // Release lock after 10 seconds go func() { - time.Sleep(10 * time.Second) + // TOTAL + time.Sleep(2 * testDrwMutexAcquireTimeout) // fmt.Println("Unlocking dm1") dm1st.Unlock() }() + expect += 2 * testDrwMutexAcquireTimeout var wg sync.WaitGroup wg.Add(2) @@ -131,7 +148,7 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) { // Release lock after 10 seconds go func() { - time.Sleep(2500 * time.Millisecond) + time.Sleep(2 * testDrwMutexAcquireTimeout) // fmt.Println("Unlocking dm2") dm2nd.Unlock() @@ -140,10 +157,11 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) { dm3rd.Lock(id, source) // fmt.Printf("3rd lock obtained after 1st & 2nd locks are released\n") - time.Sleep(2500 * time.Millisecond) + time.Sleep(testDrwMutexRefreshCallTimeout) dm3rd.Unlock() }() + expect += 2*testDrwMutexAcquireTimeout + testDrwMutexRefreshCallTimeout go func() { defer wg.Done() @@ -152,7 +170,7 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) { // Release lock after 10 seconds go func() { - time.Sleep(2500 * time.Millisecond) + time.Sleep(2 * testDrwMutexAcquireTimeout) // fmt.Println("Unlocking dm3") dm3rd.Unlock() @@ -161,12 +179,19 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) { dm2nd.Lock(id, source) // fmt.Printf("2nd lock obtained after 1st & 3rd locks are released\n") - time.Sleep(2500 * time.Millisecond) + time.Sleep(testDrwMutexRefreshCallTimeout) dm2nd.Unlock() }() + expect += 2*testDrwMutexAcquireTimeout + testDrwMutexRefreshCallTimeout wg.Wait() + // We expect at least 3 x 2 x testDrwMutexAcquireTimeout to have passed + elapsed := time.Since(started) + if elapsed < expect { + t.Errorf("expected at least %v time have passed, however %v passed", expect, elapsed) + } + t.Logf("expected at least %v time have passed, %v passed", expect, elapsed) } // Test two locks for different resources, both succeed @@ -176,14 +201,8 @@ func TestTwoSimultaneousLocksForDifferentResources(t *testing.T) { dm1.Lock(id, source) dm2.Lock(id, source) - - // fmt.Println("Both locks acquired, waiting...") - time.Sleep(2500 * time.Millisecond) - dm1.Unlock() dm2.Unlock() - - time.Sleep(10 * time.Millisecond) } // Test refreshing lock - refresh should always return true @@ -194,22 +213,19 @@ func TestSuccessfulLockRefresh(t *testing.T) { } dm := NewDRWMutex(ds, "aap") - contextCanceled := make(chan struct{}) + dm.refreshInterval = testDrwMutexRefreshInterval - ctx, cl := context.WithCancel(context.Background()) - cancel := func() { - cl() - close(contextCanceled) - } + ctx, cancel := context.WithCancel(context.Background()) if !dm.GetLock(ctx, cancel, id, source, Options{Timeout: 5 * time.Minute}) { t.Fatal("GetLock() should be successful") } - timer := time.NewTimer(drwMutexRefreshInterval * 2) + // Make it run twice. + timer := time.NewTimer(testDrwMutexRefreshInterval * 2) select { - case <-contextCanceled: + case <-ctx.Done(): t.Fatal("Lock context canceled which is not expected") case <-timer.C: } @@ -231,6 +247,7 @@ func TestFailedRefreshLock(t *testing.T) { } dm := NewDRWMutex(ds, "aap") + dm.refreshInterval = 500 * time.Millisecond var wg sync.WaitGroup wg.Add(1) @@ -261,14 +278,14 @@ func TestUnlockShouldNotTimeout(t *testing.T) { } dm := NewDRWMutex(ds, "aap") - + dm.refreshInterval = testDrwMutexUnlockCallTimeout if !dm.GetLock(context.Background(), nil, id, source, Options{Timeout: 5 * time.Minute}) { t.Fatal("GetLock() should be successful") } // Add delay to lock server responses to ensure that lock does not timeout for i := range lockServers { - lockServers[i].setResponseDelay(2 * drwMutexUnlockCallTimeout) + lockServers[i].setResponseDelay(5 * testDrwMutexUnlockCallTimeout) defer lockServers[i].setResponseDelay(0) } @@ -278,7 +295,7 @@ func TestUnlockShouldNotTimeout(t *testing.T) { unlockReturned <- struct{}{} }() - timer := time.NewTimer(2 * drwMutexUnlockCallTimeout) + timer := time.NewTimer(2 * testDrwMutexUnlockCallTimeout) defer timer.Stop() select {