Restructure server code, controller now runs in silo

This commit is contained in:
Harshavardhana 2015-08-27 17:05:24 -07:00
parent 526c8b4d76
commit 025f95b1d6
18 changed files with 156 additions and 197 deletions

View file

@ -23,52 +23,27 @@ import (
var controllerCmd = cli.Command{ var controllerCmd = cli.Command{
Name: "controller", Name: "controller",
Usage: "Get|Set server configuration", Usage: "Start minio controller",
Action: controllerMain, Action: controllerMain,
CustomHelpTemplate: `NAME: CustomHelpTemplate: `NAME:
minio {{.Name}} - {{.Description}} minio {{.Name}} - {{.Description}}
USAGE: USAGE:
minio {{.Name}} [get|set] [INFOTYPE] [SERVERURL] minio {{.Name}}
EXAMPLES: EXAMPLES:
1. Get disks from controller 1. Start minio controller
$ minio {{.Name}} get disks http://localhost:9001/rpc $ minio {{.Name}}
2. Get memstats from controller
$ minio {{.Name}} get mem http://localhost:9001/rpc
`, `,
} }
func controllerMain(c *cli.Context) { func controllerMain(c *cli.Context) {
if len(c.Args()) < 2 || c.Args().First() == "help" { if c.Args().Present() {
cli.ShowCommandHelpAndExit(c, "controller", 1) // last argument is exit code cli.ShowCommandHelpAndExit(c, "controller", 1)
} }
if c.Args().First() == "get" { err := controller.StartController()
newArgs := c.Args().Tail() if err != nil {
switch newArgs.First() { Fatalln(err)
case "mem":
memstats, err := controller.GetMemStats(newArgs.Tail().First())
if err != nil {
Fatalln(err)
}
Println(string(memstats))
case "sysinfo":
sysinfo, err := controller.GetSysInfo(newArgs.Tail().First())
if err != nil {
Fatalln(err)
}
Println(string(sysinfo))
case "auth":
keys, err := controller.GetAuthKeys(newArgs.Tail().First())
if err != nil {
Fatalln(err)
}
Println(string(keys))
}
}
if c.Args().First() == "set" {
Fatalln("Not supported yet")
} }
} }

View file

@ -1,124 +0,0 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controller
import (
"encoding/json"
"net/http"
jsonrpc "github.com/gorilla/rpc/v2/json"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/probe"
"github.com/minio/minio/pkg/server/rpc"
)
func closeResp(resp *http.Response) {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
}
// GetMemStats get memory status of the server at given url
func GetMemStats(url string) ([]byte, *probe.Error) {
op := RPCOps{
Method: "MemStats.Get",
Request: rpc.Args{Request: ""},
}
req, perr := NewRequest(url, op, http.DefaultTransport)
if perr != nil {
return nil, perr.Trace()
}
resp, perr := req.Do()
defer closeResp(resp)
if perr != nil {
return nil, perr.Trace()
}
var reply rpc.MemStatsReply
if err := jsonrpc.DecodeClientResponse(resp.Body, &reply); err != nil {
return nil, probe.NewError(err)
}
jsonRespBytes, err := json.MarshalIndent(reply, "", "\t")
if err != nil {
return nil, probe.NewError(err)
}
return jsonRespBytes, nil
}
// GetSysInfo get system status of the server at given url
func GetSysInfo(url string) ([]byte, *probe.Error) {
op := RPCOps{
Method: "SysInfo.Get",
Request: rpc.Args{Request: ""},
}
req, perr := NewRequest(url, op, http.DefaultTransport)
if perr != nil {
return nil, perr.Trace()
}
resp, perr := req.Do()
defer closeResp(resp)
if perr != nil {
return nil, perr.Trace()
}
var reply rpc.SysInfoReply
if err := jsonrpc.DecodeClientResponse(resp.Body, &reply); err != nil {
return nil, probe.NewError(err)
}
jsonRespBytes, err := json.MarshalIndent(reply, "", "\t")
if err != nil {
return nil, probe.NewError(err)
}
return jsonRespBytes, nil
}
// GetAuthKeys get access key id and secret access key
func GetAuthKeys(url string) ([]byte, *probe.Error) {
op := RPCOps{
Method: "Auth.Get",
Request: rpc.Args{Request: ""},
}
req, perr := NewRequest(url, op, http.DefaultTransport)
if perr != nil {
return nil, perr.Trace()
}
resp, perr := req.Do()
defer closeResp(resp)
if perr != nil {
return nil, perr.Trace()
}
var reply rpc.AuthReply
if err := jsonrpc.DecodeClientResponse(resp.Body, &reply); err != nil {
return nil, probe.NewError(err)
}
authConfig := &auth.Config{}
authConfig.Version = "0.0.1"
authConfig.Users = make(map[string]*auth.User)
user := &auth.User{}
user.Name = "testuser"
user.AccessKeyID = reply.AccessKeyID
user.SecretAccessKey = reply.SecretAccessKey
authConfig.Users[reply.AccessKeyID] = user
if err := auth.SaveConfig(authConfig); err != nil {
return nil, err.Trace()
}
jsonRespBytes, err := json.MarshalIndent(reply, "", "\t")
if err != nil {
return nil, probe.NewError(err)
}
return jsonRespBytes, nil
}
// Add more functions here for other RPC messages

43
pkg/controller/router.go Normal file
View file

@ -0,0 +1,43 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controller
import (
"net/http"
router "github.com/gorilla/mux"
"github.com/minio/minio/pkg/rpc"
)
// getRPCHandler rpc handler
func getRPCHandler() http.Handler {
s := rpc.NewServer()
s.RegisterJSONCodec()
s.RegisterService(new(rpc.VersionService), "Version")
s.RegisterService(new(rpc.SysInfoService), "SysInfo")
s.RegisterService(new(rpc.MemStatsService), "MemStats")
s.RegisterService(new(rpc.DonutService), "Donut")
s.RegisterService(new(rpc.AuthService), "Auth")
// Add new RPC services here
return registerRPC(router.NewRouter(), s)
}
// registerRPC - register rpc handlers
func registerRPC(mux *router.Router, s *rpc.Server) http.Handler {
mux.Handle("/rpc", s)
return mux
}

View file

@ -14,38 +14,41 @@
* limitations under the License. * limitations under the License.
*/ */
package server package controller
import ( import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing"
jsonrpc "github.com/gorilla/rpc/v2/json" jsonrpc "github.com/gorilla/rpc/v2/json"
"github.com/minio/minio/pkg/controller" "github.com/minio/minio/pkg/rpc"
"github.com/minio/minio/pkg/server/rpc"
. "gopkg.in/check.v1" . "gopkg.in/check.v1"
) )
type MyRPCSuite struct{} // Hook up gocheck into the "go test" runner.
func Test(t *testing.T) { TestingT(t) }
var _ = Suite(&MyRPCSuite{}) type MySuite struct{}
var _ = Suite(&MySuite{})
var testRPCServer *httptest.Server var testRPCServer *httptest.Server
func (s *MyRPCSuite) SetUpSuite(c *C) { func (s *MySuite) SetUpSuite(c *C) {
testRPCServer = httptest.NewServer(getRPCHandler()) testRPCServer = httptest.NewServer(getRPCHandler())
} }
func (s *MyRPCSuite) TearDownSuite(c *C) { func (s *MySuite) TearDownSuite(c *C) {
testRPCServer.Close() testRPCServer.Close()
} }
func (s *MyRPCSuite) TestMemStats(c *C) { func (s *MySuite) TestMemStats(c *C) {
op := controller.RPCOps{ op := rpc.Operation{
Method: "MemStats.Get", Method: "MemStats.Get",
Request: rpc.Args{Request: ""}, Request: rpc.Args{Request: ""},
} }
req, err := controller.NewRequest(testRPCServer.URL+"/rpc", op, http.DefaultTransport) req, err := rpc.NewRequest(testRPCServer.URL+"/rpc", op, http.DefaultTransport)
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(req.Get("Content-Type"), Equals, "application/json") c.Assert(req.Get("Content-Type"), Equals, "application/json")
resp, err := req.Do() resp, err := req.Do()
@ -58,12 +61,12 @@ func (s *MyRPCSuite) TestMemStats(c *C) {
c.Assert(reply, Not(DeepEquals), rpc.MemStatsReply{}) c.Assert(reply, Not(DeepEquals), rpc.MemStatsReply{})
} }
func (s *MyRPCSuite) TestSysInfo(c *C) { func (s *MySuite) TestSysInfo(c *C) {
op := controller.RPCOps{ op := rpc.Operation{
Method: "SysInfo.Get", Method: "SysInfo.Get",
Request: rpc.Args{Request: ""}, Request: rpc.Args{Request: ""},
} }
req, err := controller.NewRequest(testRPCServer.URL+"/rpc", op, http.DefaultTransport) req, err := rpc.NewRequest(testRPCServer.URL+"/rpc", op, http.DefaultTransport)
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(req.Get("Content-Type"), Equals, "application/json") c.Assert(req.Get("Content-Type"), Equals, "application/json")
resp, err := req.Do() resp, err := req.Do()
@ -76,12 +79,12 @@ func (s *MyRPCSuite) TestSysInfo(c *C) {
c.Assert(reply, Not(DeepEquals), rpc.SysInfoReply{}) c.Assert(reply, Not(DeepEquals), rpc.SysInfoReply{})
} }
func (s *MyRPCSuite) TestAuth(c *C) { func (s *MySuite) TestAuth(c *C) {
op := controller.RPCOps{ op := rpc.Operation{
Method: "Auth.Get", Method: "Auth.Get",
Request: rpc.Args{Request: ""}, Request: rpc.Args{Request: ""},
} }
req, err := controller.NewRequest(testRPCServer.URL+"/rpc", op, http.DefaultTransport) req, err := rpc.NewRequest(testRPCServer.URL+"/rpc", op, http.DefaultTransport)
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(req.Get("Content-Type"), Equals, "application/json") c.Assert(req.Get("Content-Type"), Equals, "application/json")
resp, err := req.Do() resp, err := req.Do()

67
pkg/controller/server.go Normal file
View file

@ -0,0 +1,67 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controller
import (
"fmt"
"net"
"net/http"
"os"
"strings"
"github.com/minio/minio/pkg/minhttp"
"github.com/minio/minio/pkg/probe"
)
// getRPCServer instance
func getRPCServer(rpcHandler http.Handler) (*http.Server, *probe.Error) {
// Minio server config
httpServer := &http.Server{
Addr: ":9001", // TODO make this configurable
Handler: rpcHandler,
MaxHeaderBytes: 1 << 20,
}
var hosts []string
addrs, err := net.InterfaceAddrs()
if err != nil {
return nil, probe.NewError(err)
}
for _, addr := range addrs {
if addr.Network() == "ip+net" {
host := strings.Split(addr.String(), "/")[0]
if ip := net.ParseIP(host); ip.To4() != nil {
hosts = append(hosts, host)
}
}
}
for _, host := range hosts {
fmt.Printf("Starting minio server on: http://%s:9001/rpc, PID: %d\n", host, os.Getpid())
}
return httpServer, nil
}
func StartController() *probe.Error {
rpcServer, err := getRPCServer(getRPCHandler())
if err != nil {
return err.Trace()
}
// Setting rate limit to 'zero' no ratelimiting implemented
if err := minhttp.ListenAndServeLimited(0, rpcServer); err != nil {
return err.Trace()
}
return nil
}

View file

@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
package controller package rpc
import ( import (
"bytes" "bytes"
@ -24,20 +24,20 @@ import (
"github.com/minio/minio/pkg/probe" "github.com/minio/minio/pkg/probe"
) )
// RPCOps RPC operation // Operation RPC operation
type RPCOps struct { type Operation struct {
Method string Method string
Request interface{} Request interface{}
} }
// RPCRequest rpc client request // Request rpc client request
type RPCRequest struct { type Request struct {
req *http.Request req *http.Request
transport http.RoundTripper transport http.RoundTripper
} }
// NewRequest initiate a new client RPC request // NewRequest initiate a new client RPC request
func NewRequest(url string, op RPCOps, transport http.RoundTripper) (*RPCRequest, *probe.Error) { func NewRequest(url string, op Operation, transport http.RoundTripper) (*Request, *probe.Error) {
params, err := json.EncodeClientRequest(op.Method, op.Request) params, err := json.EncodeClientRequest(op.Method, op.Request)
if err != nil { if err != nil {
return nil, probe.NewError(err) return nil, probe.NewError(err)
@ -46,7 +46,7 @@ func NewRequest(url string, op RPCOps, transport http.RoundTripper) (*RPCRequest
if err != nil { if err != nil {
return nil, probe.NewError(err) return nil, probe.NewError(err)
} }
rpcReq := &RPCRequest{} rpcReq := &Request{}
rpcReq.req = req rpcReq.req = req
rpcReq.req.Header.Set("Content-Type", "application/json") rpcReq.req.Header.Set("Content-Type", "application/json")
if transport == nil { if transport == nil {
@ -57,7 +57,7 @@ func NewRequest(url string, op RPCOps, transport http.RoundTripper) (*RPCRequest
} }
// Do - make a http connection // Do - make a http connection
func (r RPCRequest) Do() (*http.Response, *probe.Error) { func (r Request) Do() (*http.Response, *probe.Error) {
resp, err := r.transport.RoundTrip(r.req) resp, err := r.transport.RoundTrip(r.req)
if err != nil { if err != nil {
if err, ok := probe.UnwrapError(err); ok { if err, ok := probe.UnwrapError(err); ok {
@ -69,11 +69,11 @@ func (r RPCRequest) Do() (*http.Response, *probe.Error) {
} }
// Get - get value of requested header // Get - get value of requested header
func (r RPCRequest) Get(key string) string { func (r Request) Get(key string) string {
return r.req.Header.Get(key) return r.req.Header.Get(key)
} }
// Set - set value of a header key // Set - set value of a header key
func (r *RPCRequest) Set(key, value string) { func (r *Request) Set(key, value string) {
r.req.Header.Set(key, value) r.req.Header.Set(key, value)
} }

View file

@ -20,8 +20,8 @@ import (
"net/http" "net/http"
router "github.com/gorilla/mux" router "github.com/gorilla/mux"
"github.com/minio/minio/pkg/rpc"
"github.com/minio/minio/pkg/server/api" "github.com/minio/minio/pkg/server/api"
"github.com/minio/minio/pkg/server/rpc"
) )
// registerAPI - register all the object API handlers to their respective paths // registerAPI - register all the object API handlers to their respective paths
@ -92,12 +92,6 @@ func registerCustomMiddleware(mux http.Handler, conf api.Config) http.Handler {
return mux return mux
} }
// registerRPC - register rpc handlers
func registerRPC(mux *router.Router, s *rpc.Server) http.Handler {
mux.Handle("/rpc", s)
return mux
}
// getAPIHandler api handler // getAPIHandler api handler
func getAPIHandler(conf api.Config) (http.Handler, api.Minio) { func getAPIHandler(conf api.Config) (http.Handler, api.Minio) {
mux := router.NewRouter() mux := router.NewRouter()
@ -111,11 +105,12 @@ func getAPIHandler(conf api.Config) (http.Handler, api.Minio) {
func getRPCHandler() http.Handler { func getRPCHandler() http.Handler {
s := rpc.NewServer() s := rpc.NewServer()
s.RegisterJSONCodec() s.RegisterJSONCodec()
s.RegisterService(new(rpc.VersionService), "Version")
s.RegisterService(new(rpc.SysInfoService), "SysInfo")
s.RegisterService(new(rpc.MemStatsService), "MemStats")
s.RegisterService(new(rpc.DonutService), "Donut")
s.RegisterService(new(rpc.AuthService), "Auth")
// Add new RPC services here // Add new RPC services here
return registerRPC(router.NewRouter(), s) return registerRPC(router.NewRouter(), s)
} }
// registerRPC - register rpc handlers
func registerRPC(mux *router.Router, s *rpc.Server) http.Handler {
mux.Handle("/rpc", s)
return mux
}

View file

@ -24,9 +24,9 @@ import (
"os" "os"
"strings" "strings"
"github.com/minio/minio/pkg/minhttp"
"github.com/minio/minio/pkg/probe" "github.com/minio/minio/pkg/probe"
"github.com/minio/minio/pkg/server/api" "github.com/minio/minio/pkg/server/api"
"github.com/minio/minio/pkg/server/minhttp"
) )
// getAPI server instance // getAPI server instance