diff --git a/buildscripts/race.sh b/buildscripts/race.sh index af403e253..fbdd9b428 100755 --- a/buildscripts/race.sh +++ b/buildscripts/race.sh @@ -2,6 +2,7 @@ set -e -for d in $(go list ./... | grep -v browser); do - CGO_ENABLED=1 go test -v -tags kqueue -race --timeout 100m "$d" +## TODO remove `dsync` from race detector once this is merged and released https://go-review.googlesource.com/c/go/+/333529/ +for d in $(go list ./... | grep -v dsync); do + CGO_ENABLED=1 go test -v -race --timeout 100m "$d" done diff --git a/internal/dsync/rpc-client-impl_test.go b/internal/dsync/dsync-client_test.go similarity index 53% rename from internal/dsync/rpc-client-impl_test.go rename to internal/dsync/dsync-client_test.go index 2af7b0f17..d763aafc3 100644 --- a/internal/dsync/rpc-client-impl_test.go +++ b/internal/dsync/dsync-client_test.go @@ -18,33 +18,55 @@ package dsync import ( + "bytes" "context" - "net/rpc" - "sync" + "errors" + "net/http" + "net/url" + "time" + + xhttp "github.com/minio/minio/internal/http" + "github.com/minio/minio/internal/rest" ) // ReconnectRPCClient is a wrapper type for rpc.Client which provides reconnect on first failure. type ReconnectRPCClient struct { - mutex sync.Mutex - rpc *rpc.Client - addr string - endpoint string + u *url.URL + rpc *rest.Client } // newClient constructs a ReconnectRPCClient object with addr and endpoint initialized. // It _doesn't_ connect to the remote endpoint. See Call method to see when the // connect happens. -func newClient(addr, endpoint string) NetLocker { +func newClient(endpoint string) NetLocker { + u, err := url.Parse(endpoint) + if err != nil { + panic(err) + } + + tr := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + MaxIdleConnsPerHost: 1024, + WriteBufferSize: 32 << 10, // 32KiB moving up from 4KiB default + ReadBufferSize: 32 << 10, // 32KiB moving up from 4KiB default + IdleConnTimeout: 15 * time.Second, + ResponseHeaderTimeout: 15 * time.Minute, // Set conservative timeouts for MinIO internode. + TLSHandshakeTimeout: 15 * time.Second, + ExpectContinueTimeout: 15 * time.Second, + // Go net/http automatically unzip if content-type is + // gzip disable this feature, as we are always interested + // in raw stream. + DisableCompression: true, + } + return &ReconnectRPCClient{ - addr: addr, - endpoint: endpoint, + u: u, + rpc: rest.NewClient(u, tr, nil), } } // Close closes the underlying socket file descriptor. func (rpcClient *ReconnectRPCClient) IsOnline() bool { - rpcClient.mutex.Lock() - defer rpcClient.mutex.Unlock() // If rpc client has not connected yet there is nothing to close. return rpcClient.rpc != nil } @@ -55,74 +77,73 @@ func (rpcClient *ReconnectRPCClient) IsLocal() bool { // Close closes the underlying socket file descriptor. func (rpcClient *ReconnectRPCClient) Close() error { - rpcClient.mutex.Lock() - defer rpcClient.mutex.Unlock() - // If rpc client has not connected yet there is nothing to close. - if rpcClient.rpc == nil { - return nil - } - // Reset rpcClient.rpc to allow for subsequent calls to use a new - // (socket) connection. - clnt := rpcClient.rpc - rpcClient.rpc = nil - return clnt.Close() + return nil } -// Call makes a RPC call to the remote endpoint using the default codec, namely encoding/gob. -func (rpcClient *ReconnectRPCClient) Call(serviceMethod string, args interface{}, reply interface{}) (err error) { - rpcClient.mutex.Lock() - defer rpcClient.mutex.Unlock() - dialCall := func() error { - // If the rpc.Client is nil, we attempt to (re)connect with the remote endpoint. - if rpcClient.rpc == nil { - clnt, derr := rpc.DialHTTPPath("tcp", rpcClient.addr, rpcClient.endpoint) - if derr != nil { - return derr - } - rpcClient.rpc = clnt - } - // If the RPC fails due to a network-related error, then we reset - // rpc.Client for a subsequent reconnect. - return rpcClient.rpc.Call(serviceMethod, args, reply) +var ( + errLockConflict = errors.New("lock conflict") + errLockNotFound = errors.New("lock not found") +) + +func toLockError(err error) error { + if err == nil { + return nil } - if err = dialCall(); err == rpc.ErrShutdown { - rpcClient.rpc.Close() - rpcClient.rpc = nil - err = dialCall() + + switch err.Error() { + case errLockConflict.Error(): + return errLockConflict + case errLockNotFound.Error(): + return errLockNotFound } return err } +// Call makes a RPC call to the remote endpoint using the default codec, namely encoding/gob. +func (rpcClient *ReconnectRPCClient) Call(method string, args LockArgs) (status bool, err error) { + buf, err := args.MarshalMsg(nil) + if err != nil { + return false, err + } + body := bytes.NewReader(buf) + respBody, err := rpcClient.rpc.Call(context.Background(), method, + url.Values{}, body, body.Size()) + defer xhttp.DrainBody(respBody) + + switch toLockError(err) { + case nil: + return true, nil + case errLockConflict, errLockNotFound: + return false, nil + default: + return false, err + } +} + func (rpcClient *ReconnectRPCClient) RLock(ctx context.Context, args LockArgs) (status bool, err error) { - err = rpcClient.Call("Dsync.RLock", &args, &status) - return status, err + return rpcClient.Call("/v1/rlock", args) } func (rpcClient *ReconnectRPCClient) Lock(ctx context.Context, args LockArgs) (status bool, err error) { - err = rpcClient.Call("Dsync.Lock", &args, &status) - return status, err + return rpcClient.Call("/v1/lock", args) } func (rpcClient *ReconnectRPCClient) RUnlock(ctx context.Context, args LockArgs) (status bool, err error) { - err = rpcClient.Call("Dsync.RUnlock", &args, &status) - return status, err + return rpcClient.Call("/v1/runlock", args) } func (rpcClient *ReconnectRPCClient) Unlock(ctx context.Context, args LockArgs) (status bool, err error) { - err = rpcClient.Call("Dsync.Unlock", &args, &status) - return status, err + return rpcClient.Call("/v1/unlock", args) } func (rpcClient *ReconnectRPCClient) Refresh(ctx context.Context, args LockArgs) (refreshed bool, err error) { - err = rpcClient.Call("Dsync.Refresh", &args, &refreshed) - return refreshed, err + return rpcClient.Call("/v1/refresh", args) } func (rpcClient *ReconnectRPCClient) ForceUnlock(ctx context.Context, args LockArgs) (reply bool, err error) { - err = rpcClient.Call("Dsync.ForceUnlock", &args, &reply) - return reply, err + return rpcClient.Call("/v1/force-unlock", args) } func (rpcClient *ReconnectRPCClient) String() string { - return "http://" + rpcClient.addr + "/" + rpcClient.endpoint + return rpcClient.u.String() } diff --git a/internal/dsync/dsync-server_test.go b/internal/dsync/dsync-server_test.go index 09a539e80..fb8153804 100644 --- a/internal/dsync/dsync-server_test.go +++ b/internal/dsync/dsync-server_test.go @@ -19,11 +19,166 @@ package dsync import ( "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" "sync" "sync/atomic" "time" + + "github.com/gorilla/mux" ) +const numberOfNodes = 5 + +var ( + ds *Dsync + nodes = make([]*httptest.Server, numberOfNodes) // list of node IP addrs or hostname with ports. + lockServers = make([]*lockServer, numberOfNodes) +) + +func getLockArgs(r *http.Request) (args LockArgs, err error) { + buf, err := ioutil.ReadAll(r.Body) + if err != nil { + return args, err + } + _, err = args.UnmarshalMsg(buf) + return args, err +} + +type lockServerHandler struct { + lsrv *lockServer +} + +func (lh *lockServerHandler) writeErrorResponse(w http.ResponseWriter, err error) { + w.WriteHeader(http.StatusForbidden) + w.Write([]byte(err.Error())) +} + +func (lh *lockServerHandler) ForceUnlockHandler(w http.ResponseWriter, r *http.Request) { + args, err := getLockArgs(r) + if err != nil { + lh.writeErrorResponse(w, err) + return + } + + if _, err = lh.lsrv.ForceUnlock(&args); err != nil { + lh.writeErrorResponse(w, err) + return + } +} + +func (lh *lockServerHandler) RefreshHandler(w http.ResponseWriter, r *http.Request) { + args, err := getLockArgs(r) + if err != nil { + lh.writeErrorResponse(w, err) + return + } + + reply, err := lh.lsrv.Refresh(&args) + if err != nil { + lh.writeErrorResponse(w, err) + return + } + + if !reply { + lh.writeErrorResponse(w, errLockNotFound) + return + } +} + +func (lh *lockServerHandler) LockHandler(w http.ResponseWriter, r *http.Request) { + args, err := getLockArgs(r) + if err != nil { + lh.writeErrorResponse(w, err) + return + } + reply, err := lh.lsrv.Lock(&args) + if err == nil && !reply { + err = errLockConflict + } + if err != nil { + lh.writeErrorResponse(w, err) + return + } +} + +func (lh *lockServerHandler) UnlockHandler(w http.ResponseWriter, r *http.Request) { + args, err := getLockArgs(r) + if err != nil { + lh.writeErrorResponse(w, err) + return + } + _, err = lh.lsrv.Unlock(&args) + if err != nil { + lh.writeErrorResponse(w, err) + return + } +} + +func (lh *lockServerHandler) RUnlockHandler(w http.ResponseWriter, r *http.Request) { + args, err := getLockArgs(r) + if err != nil { + lh.writeErrorResponse(w, err) + return + } + _, err = lh.lsrv.RUnlock(&args) + if err != nil { + lh.writeErrorResponse(w, err) + return + } +} + +func (lh *lockServerHandler) HealthHandler(w http.ResponseWriter, r *http.Request) {} + +func (lh *lockServerHandler) RLockHandler(w http.ResponseWriter, r *http.Request) { + args, err := getLockArgs(r) + if err != nil { + lh.writeErrorResponse(w, err) + return + } + + reply, err := lh.lsrv.RLock(&args) + if err == nil && !reply { + err = errLockConflict + } + if err != nil { + lh.writeErrorResponse(w, err) + return + } +} + +func stopRPCServers() { + for i := 0; i < numberOfNodes; i++ { + nodes[i].Close() + } +} + +func startRPCServers() { + for i := 0; i < numberOfNodes; i++ { + lsrv := &lockServer{ + mutex: sync.Mutex{}, + lockMap: make(map[string]int64), + } + lockServer := lockServerHandler{ + lsrv: lsrv, + } + lockServers[i] = lsrv + + router := mux.NewRouter().SkipClean(true) + subrouter := router.PathPrefix("/").Subrouter() + subrouter.Methods(http.MethodPost).Path("/v1/health").HandlerFunc(lockServer.HealthHandler) + subrouter.Methods(http.MethodPost).Path("/v1/refresh").HandlerFunc(lockServer.RefreshHandler) + subrouter.Methods(http.MethodPost).Path("/v1/lock").HandlerFunc(lockServer.LockHandler) + subrouter.Methods(http.MethodPost).Path("/v1/rlock").HandlerFunc(lockServer.RLockHandler) + subrouter.Methods(http.MethodPost).Path("/v1/unlock").HandlerFunc(lockServer.UnlockHandler) + subrouter.Methods(http.MethodPost).Path("/v1/runlock").HandlerFunc(lockServer.RUnlockHandler) + subrouter.Methods(http.MethodPost).Path("/v1/force-unlock").HandlerFunc(lockServer.ForceUnlockHandler) + + nodes[i] = httptest.NewServer(router) + } +} + const WriteLock = -1 type lockServer struct { @@ -49,21 +204,21 @@ func (l *lockServer) setResponseDelay(responseDelay time.Duration) { atomic.StoreInt64(&l.responseDelay, int64(responseDelay)) } -func (l *lockServer) Lock(args *LockArgs, reply *bool) error { +func (l *lockServer) Lock(args *LockArgs) (reply bool, err error) { if d := atomic.LoadInt64(&l.responseDelay); d != 0 { time.Sleep(time.Duration(d)) } l.mutex.Lock() defer l.mutex.Unlock() - if _, *reply = l.lockMap[args.Resources[0]]; !*reply { + if _, reply = l.lockMap[args.Resources[0]]; !reply { l.lockMap[args.Resources[0]] = WriteLock // No locks held on the given name, so claim write lock } - *reply = !*reply // Negate *reply to return true when lock is granted or false otherwise - return nil + reply = !reply // Negate *reply to return true when lock is granted or false otherwise + return reply, nil } -func (l *lockServer) Unlock(args *LockArgs, reply *bool) error { +func (l *lockServer) Unlock(args *LockArgs) (reply bool, err error) { if d := atomic.LoadInt64(&l.responseDelay); d != 0 { time.Sleep(time.Duration(d)) } @@ -71,19 +226,19 @@ func (l *lockServer) Unlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() var locksHeld int64 - if locksHeld, *reply = l.lockMap[args.Resources[0]]; !*reply { // No lock is held on the given name - return fmt.Errorf("Unlock attempted on an unlocked entity: %s", args.Resources[0]) + if locksHeld, reply = l.lockMap[args.Resources[0]]; !reply { // No lock is held on the given name + return false, fmt.Errorf("Unlock attempted on an unlocked entity: %s", args.Resources[0]) } - if *reply = locksHeld == WriteLock; !*reply { // Unless it is a write lock - return fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Resources[0], locksHeld) + if reply = locksHeld == WriteLock; !reply { // Unless it is a write lock + return false, fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Resources[0], locksHeld) } delete(l.lockMap, args.Resources[0]) // Remove the write lock - return nil + return true, nil } const ReadLock = 1 -func (l *lockServer) RLock(args *LockArgs, reply *bool) error { +func (l *lockServer) RLock(args *LockArgs) (reply bool, err error) { if d := atomic.LoadInt64(&l.responseDelay); d != 0 { time.Sleep(time.Duration(d)) } @@ -91,16 +246,16 @@ func (l *lockServer) RLock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() var locksHeld int64 - if locksHeld, *reply = l.lockMap[args.Resources[0]]; !*reply { + if locksHeld, reply = l.lockMap[args.Resources[0]]; !reply { l.lockMap[args.Resources[0]] = ReadLock // No locks held on the given name, so claim (first) read lock - *reply = true - } else if *reply = locksHeld != WriteLock; *reply { // Unless there is a write lock + reply = true + } else if reply = locksHeld != WriteLock; reply { // Unless there is a write lock l.lockMap[args.Resources[0]] = locksHeld + ReadLock // Grant another read lock } - return nil + return reply, nil } -func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error { +func (l *lockServer) RUnlock(args *LockArgs) (reply bool, err error) { if d := atomic.LoadInt64(&l.responseDelay); d != 0 { time.Sleep(time.Duration(d)) } @@ -108,32 +263,32 @@ func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() var locksHeld int64 - if locksHeld, *reply = l.lockMap[args.Resources[0]]; !*reply { // No lock is held on the given name - return fmt.Errorf("RUnlock attempted on an unlocked entity: %s", args.Resources[0]) + if locksHeld, reply = l.lockMap[args.Resources[0]]; !reply { // No lock is held on the given name + return false, fmt.Errorf("RUnlock attempted on an unlocked entity: %s", args.Resources[0]) } - if *reply = locksHeld != WriteLock; !*reply { // A write-lock is held, cannot release a read lock - return fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Resources[0]) + if reply = locksHeld != WriteLock; !reply { // A write-lock is held, cannot release a read lock + return false, fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Resources[0]) } if locksHeld > ReadLock { l.lockMap[args.Resources[0]] = locksHeld - ReadLock // Remove one of the read locks held } else { delete(l.lockMap, args.Resources[0]) // Remove the (last) read lock } - return nil + return reply, nil } -func (l *lockServer) Refresh(args *LockArgs, reply *bool) error { +func (l *lockServer) Refresh(args *LockArgs) (reply bool, err error) { if d := atomic.LoadInt64(&l.responseDelay); d != 0 { time.Sleep(time.Duration(d)) } l.mutex.Lock() defer l.mutex.Unlock() - *reply = !l.lockNotFound - return nil + reply = !l.lockNotFound + return reply, nil } -func (l *lockServer) ForceUnlock(args *LockArgs, reply *bool) error { +func (l *lockServer) ForceUnlock(args *LockArgs) (reply bool, err error) { if d := atomic.LoadInt64(&l.responseDelay); d != 0 { time.Sleep(time.Duration(d)) } @@ -141,9 +296,9 @@ func (l *lockServer) ForceUnlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() if len(args.UID) != 0 { - return fmt.Errorf("ForceUnlock called with non-empty UID: %s", args.UID) + return false, fmt.Errorf("ForceUnlock called with non-empty UID: %s", args.UID) } delete(l.lockMap, args.Resources[0]) // Remove the lock (irrespective of write or read lock) - *reply = true - return nil + reply = true + return reply, nil } diff --git a/internal/dsync/dsync_test.go b/internal/dsync/dsync_test.go index c2a381d0a..43ee5e899 100644 --- a/internal/dsync/dsync_test.go +++ b/internal/dsync/dsync_test.go @@ -19,14 +19,8 @@ package dsync import ( "context" - "fmt" - golog "log" "math/rand" - "net" - "net/http" - "net/rpc" "os" - "strconv" "sync" "testing" "time" @@ -34,67 +28,23 @@ import ( "github.com/google/uuid" ) -const numberOfNodes = 5 - -var ( - ds *Dsync - rpcPaths []string // list of rpc paths where lock server is serving. -) - -var ( - nodes = make([]string, numberOfNodes) // list of node IP addrs or hostname with ports. - lockServers []*lockServer -) - -func startRPCServers() { - for i := range nodes { - server := rpc.NewServer() - ls := &lockServer{ - mutex: sync.Mutex{}, - lockMap: make(map[string]int64), - } - server.RegisterName("Dsync", ls) - // For some reason the registration paths need to be different (even for different server objs) - server.HandleHTTP(rpcPaths[i], fmt.Sprintf("%s-debug", rpcPaths[i])) - l, e := net.Listen("tcp", ":"+strconv.Itoa(i+12345)) - if e != nil { - golog.Fatal("listen error:", e) - } - go http.Serve(l, nil) - - lockServers = append(lockServers, ls) - } - - // Let servers start - time.Sleep(10 * time.Millisecond) -} - // TestMain initializes the testing framework func TestMain(m *testing.M) { - const rpcPath = "/dsync" - - rand.Seed(time.Now().UTC().UnixNano()) - - for i := range nodes { - nodes[i] = fmt.Sprintf("127.0.0.1:%d", i+12345) - } - for i := range nodes { - rpcPaths = append(rpcPaths, rpcPath+"-"+strconv.Itoa(i)) - } + startRPCServers() // Initialize net/rpc clients for dsync. var clnts []NetLocker for i := 0; i < len(nodes); i++ { - clnts = append(clnts, newClient(nodes[i], rpcPaths[i])) + clnts = append(clnts, newClient(nodes[i].URL)) } ds = &Dsync{ GetLockers: func() ([]NetLocker, string) { return clnts, uuid.New().String() }, } - startRPCServers() - - os.Exit(m.Run()) + code := m.Run() + stopRPCServers() + os.Exit(code) } func TestSimpleLock(t *testing.T) { @@ -281,7 +231,7 @@ func TestFailedRefreshLock(t *testing.T) { } dm := NewDRWMutex(ds, "aap") - wg := sync.WaitGroup{} + var wg sync.WaitGroup wg.Add(1) ctx, cl := context.WithCancel(context.Background()) diff --git a/internal/dsync/rpc-client-interface.go b/internal/dsync/locker.go similarity index 100% rename from internal/dsync/rpc-client-interface.go rename to internal/dsync/locker.go diff --git a/internal/http/listener_test.go b/internal/http/listener_test.go index bde323c51..d48cdb3a2 100644 --- a/internal/http/listener_test.go +++ b/internal/http/listener_test.go @@ -189,6 +189,9 @@ func TestHTTPListenerStartClose(t *testing.T) { // Ignore if IP is unbindable. continue } + if strings.Contains(err.Error(), "bind: address already in use") { + continue + } t.Fatalf("Test %d: error: expected = , got = %v", i+1, err) } @@ -232,6 +235,9 @@ func TestHTTPListenerAddr(t *testing.T) { // Ignore if IP is unbindable. continue } + if strings.Contains(err.Error(), "bind: address already in use") { + continue + } t.Fatalf("Test %d: error: expected = , got = %v", i+1, err) } @@ -272,6 +278,9 @@ func TestHTTPListenerAddrs(t *testing.T) { // Ignore if IP is unbindable. continue } + if strings.Contains(err.Error(), "bind: address already in use") { + continue + } t.Fatalf("Test %d: error: expected = , got = %v", i+1, err) } diff --git a/internal/rest/client.go b/internal/rest/client.go index de71aa706..9d264fc1b 100644 --- a/internal/rest/client.go +++ b/internal/rest/client.go @@ -131,7 +131,9 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod if err != nil { return nil, &NetworkError{err} } - req.Header.Set("Authorization", "Bearer "+c.newAuthToken(req.URL.RawQuery)) + if c.newAuthToken != nil { + req.Header.Set("Authorization", "Bearer "+c.newAuthToken(req.URL.RawQuery)) + } req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339)) if body != nil { req.Header.Set("Expect", "100-continue")