fix: possible race in FS local lockMap (#9598)

This commit is contained in:
Harshavardhana 2020-05-14 23:59:07 -07:00 committed by GitHub
parent 56e0c6adf8
commit b730bd1396
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 42 additions and 78 deletions

View file

@ -320,12 +320,6 @@ func (fs *FSObjects) MakeBucketWithLocation(ctx context.Context, bucket, locatio
return NotImplemented{}
}
bucketLock := fs.NewNSLock(ctx, bucket, "")
if err := bucketLock.GetLock(globalObjectTimeout); err != nil {
return err
}
defer bucketLock.Unlock()
// Verify if bucket is valid.
if s3utils.CheckValidBucketNameStrict(bucket) != nil {
return BucketNameInvalid{Bucket: bucket}
@ -356,12 +350,6 @@ func (fs *FSObjects) MakeBucketWithLocation(ctx context.Context, bucket, locatio
// GetBucketInfo - fetch bucket metadata info.
func (fs *FSObjects) GetBucketInfo(ctx context.Context, bucket string) (bi BucketInfo, e error) {
bucketLock := fs.NewNSLock(ctx, bucket, "")
if e := bucketLock.GetRLock(globalObjectTimeout); e != nil {
return bi, e
}
defer bucketLock.RUnlock()
atomic.AddInt64(&fs.activeIOCount, 1)
defer func() {
atomic.AddInt64(&fs.activeIOCount, -1)
@ -438,15 +426,6 @@ func (fs *FSObjects) ListBuckets(ctx context.Context) ([]BucketInfo, error) {
// DeleteBucket - delete a bucket and all the metadata associated
// with the bucket including pending multipart, object metadata.
func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
if !forceDelete {
bucketLock := fs.NewNSLock(ctx, bucket, "")
if err := bucketLock.GetLock(globalObjectTimeout); err != nil {
logger.LogIf(ctx, err)
return err
}
defer bucketLock.Unlock()
}
atomic.AddInt64(&fs.activeIOCount, 1)
defer func() {
atomic.AddInt64(&fs.activeIOCount, -1)

View file

@ -18,18 +18,17 @@ package cmd
import (
"context"
"errors"
pathutil "path"
"runtime"
"sort"
"strings"
"sync"
"sync/atomic"
"fmt"
"time"
"github.com/minio/lsync"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/dsync"
)
@ -58,8 +57,8 @@ func newNSLock(isDistXL bool) *nsLockMap {
// nsLock - provides primitives for locking critical namespace regions.
type nsLock struct {
ref uint32
*lsync.LRWMutex
ref uint
}
// nsLockMap - namespace lock map, provides primitives to Lock,
@ -68,7 +67,7 @@ type nsLockMap struct {
// Indicates if namespace is part of a distributed setup.
isDistXL bool
lockMap map[string]*nsLock
lockMapMutex sync.RWMutex
lockMapMutex sync.Mutex
}
// Lock the namespace resource.
@ -78,17 +77,16 @@ func (n *nsLockMap) lock(ctx context.Context, volume string, path string, lockSo
resource := pathJoin(volume, path)
n.lockMapMutex.Lock()
nsLk, found := n.lockMap[resource]
if !found {
nsLk = &nsLock{
if _, found := n.lockMap[resource]; !found {
n.lockMap[resource] = &nsLock{
LRWMutex: lsync.NewLRWMutex(ctx),
ref: 1,
}
n.lockMap[resource] = nsLk
} else {
// Update ref count here to avoid multiple races.
nsLk.ref++
atomic.AddUint32(&n.lockMap[resource].ref, 1)
}
nsLk = n.lockMap[resource]
n.lockMapMutex.Unlock()
// Locking here will block (until timeout).
@ -101,13 +99,12 @@ func (n *nsLockMap) lock(ctx context.Context, volume string, path string, lockSo
if !locked { // We failed to get the lock
// Decrement ref count since we failed to get the lock
n.lockMapMutex.Lock()
nsLk.ref--
if nsLk.ref == 0 {
if atomic.AddUint32(&nsLk.ref, ^uint32(0)) == 0 {
// Remove from the map if there are no more references.
n.lockMapMutex.Lock()
delete(n.lockMap, resource)
n.lockMapMutex.Unlock()
}
n.lockMapMutex.Unlock()
}
return
}
@ -115,28 +112,21 @@ func (n *nsLockMap) lock(ctx context.Context, volume string, path string, lockSo
// Unlock the namespace resource.
func (n *nsLockMap) unlock(volume string, path string, readLock bool) {
resource := pathJoin(volume, path)
n.lockMapMutex.RLock()
nsLk, found := n.lockMap[resource]
n.lockMapMutex.RUnlock()
if !found {
n.lockMapMutex.Lock()
defer n.lockMapMutex.Unlock()
if _, found := n.lockMap[resource]; !found {
return
}
if readLock {
nsLk.RUnlock()
n.lockMap[resource].RUnlock()
} else {
nsLk.Unlock()
n.lockMap[resource].Unlock()
}
n.lockMapMutex.Lock()
if nsLk.ref == 0 {
logger.LogIf(GlobalContext, errors.New("Namespace reference count cannot be 0"))
} else {
nsLk.ref--
if nsLk.ref == 0 {
// Remove from the map if there are no more references.
delete(n.lockMap, resource)
}
if atomic.AddUint32(&n.lockMap[resource].ref, ^uint32(0)) == 0 {
// Remove from the map if there are no more references.
delete(n.lockMap, resource)
}
n.lockMapMutex.Unlock()
}
// dsync's distributed lock instance.
@ -147,7 +137,7 @@ type distLockInstance struct {
// Lock - block until write lock is taken or timeout has occurred.
func (di *distLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error) {
lockSource := getSource()
lockSource := getSource(2)
start := UTCNow()
if !di.rwMutex.GetLock(di.opsID, lockSource, timeout.Timeout()) {
@ -165,7 +155,7 @@ func (di *distLockInstance) Unlock() {
// RLock - block until read lock is taken or timeout has occurred.
func (di *distLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr error) {
lockSource := getSource()
lockSource := getSource(2)
start := UTCNow()
if !di.rwMutex.GetRLock(di.opsID, lockSource, timeout.Timeout()) {
timeout.LogFailure()
@ -206,7 +196,7 @@ func (n *nsLockMap) NewNSLock(ctx context.Context, lockersFn func() []dsync.NetL
// Lock - block until write lock is taken or timeout has occurred.
func (li *localLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error) {
lockSource := getSource()
lockSource := getSource(2)
start := UTCNow()
readLock := false
var success []int
@ -234,7 +224,7 @@ func (li *localLockInstance) Unlock() {
// RLock - block until read lock is taken or timeout has occurred.
func (li *localLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr error) {
lockSource := getSource()
lockSource := getSource(2)
start := UTCNow()
readLock := true
var success []int
@ -260,9 +250,9 @@ func (li *localLockInstance) RUnlock() {
}
}
func getSource() string {
func getSource(n int) string {
var funcName string
pc, filename, lineNum, ok := runtime.Caller(2)
pc, filename, lineNum, ok := runtime.Caller(n)
if ok {
filename = pathutil.Base(filename)
funcName = strings.TrimPrefix(runtime.FuncForPC(pc).Name(),

View file

@ -27,7 +27,7 @@ import (
// position will cause the line number to change and the test to FAIL
// Tests getSource().
func TestGetSource(t *testing.T) {
currentSource := func() string { return getSource() }
currentSource := func() string { return getSource(2) }
gotSource := currentSource()
// Hard coded line number, 31, in the "expectedSource" value
expectedSource := "[namespace-lock_test.go:31:TestGetSource()]"

View file

@ -58,7 +58,7 @@ type check struct {
// Assert - checks if gotValue is same as expectedValue, if not fails the test.
func (c *check) Assert(gotValue interface{}, expectedValue interface{}) {
if !reflect.DeepEqual(gotValue, expectedValue) {
c.Fatalf("Test %s:%s expected %v, got %v", getSource(), c.testType, expectedValue, gotValue)
c.Fatalf("Test %s:%s expected %v, got %v", getSource(2), c.testType, expectedValue, gotValue)
}
}

View file

@ -57,6 +57,7 @@ import (
"github.com/minio/minio-go/v6/pkg/s3utils"
"github.com/minio/minio-go/v6/pkg/signer"
"github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/crypto"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/bucket/policy"
@ -65,6 +66,18 @@ import (
// Tests should initNSLock only once.
func init() {
// disable ENVs which interfere with tests.
for _, env := range []string{
crypto.EnvAutoEncryptionLegacy,
crypto.EnvKMSAutoEncryption,
config.EnvAccessKey,
config.EnvAccessKeyOld,
config.EnvSecretKey,
config.EnvSecretKeyOld,
} {
os.Unsetenv(env)
}
// Set as non-distributed.
globalIsDistXL = false
@ -342,27 +355,9 @@ func UnstartedTestServer(t TestErrHandler, instanceType string) TestServer {
globalMinioPort = port
globalMinioAddr = getEndpointsLocalAddr(testServer.Disks)
globalConfigSys = NewConfigSys()
newAllSubsystems()
globalIAMSys = NewIAMSys()
globalIAMSys.Init(ctx, objLayer)
buckets, err := objLayer.ListBuckets(ctx)
if err != nil {
t.Fatalf("Unable to list buckets on backend %s", err)
}
globalPolicySys = NewPolicySys()
globalPolicySys.Init(buckets, objLayer)
globalNotificationSys = NewNotificationSys(testServer.Disks)
globalNotificationSys.Init(buckets, objLayer)
globalLifecycleSys = NewLifecycleSys()
globalLifecycleSys.Init(buckets, objLayer)
globalBucketSSEConfigSys = NewBucketSSEConfigSys()
globalBucketSSEConfigSys.Init(buckets, objLayer)
initAllSubsystems(objLayer)
return testServer
}