diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index c89129301..99997fc75 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -50,6 +50,7 @@ import ( "github.com/minio/madmin-go/v3/estream" "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio/internal/dsync" + "github.com/minio/minio/internal/grid" "github.com/minio/minio/internal/handlers" xhttp "github.com/minio/minio/internal/http" xioutil "github.com/minio/minio/internal/ioutil" @@ -1909,14 +1910,11 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) { setEventStreamHeaders(w) // Trace Publisher and peer-trace-client uses nonblocking send and hence does not wait for slow receivers. - // Use buffered channel to take care of burst sends or slow w.Write() - - // Keep 100k buffered channel, should be sufficient to ensure we do not lose any events. - traceCh := make(chan madmin.TraceInfo, 100000) - + // Keep 100k buffered channel. + // If receiver cannot keep up with that we drop events. + traceCh := make(chan []byte, 100000) peers, _ := newPeerRestClients(globalEndpoints) - - err = globalTrace.Subscribe(traceOpts.TraceTypes(), traceCh, ctx.Done(), func(entry madmin.TraceInfo) bool { + err = globalTrace.SubscribeJSON(traceOpts.TraceTypes(), traceCh, ctx.Done(), func(entry madmin.TraceInfo) bool { return shouldTrace(entry, traceOpts) }) if err != nil { @@ -1933,19 +1931,19 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) { if peer == nil { continue } - peer.Trace(traceCh, ctx.Done(), traceOpts) + peer.Trace(ctx, traceCh, traceOpts) } keepAliveTicker := time.NewTicker(time.Second) defer keepAliveTicker.Stop() - enc := json.NewEncoder(w) for { select { case entry := <-traceCh: - if err := enc.Encode(entry); err != nil { + if _, err := w.Write(entry); err != nil { return } + grid.PutByteBuffer(entry) if len(traceCh) == 0 { // Flush if nothing is queued w.(http.Flusher).Flush() diff --git a/cmd/endpoint.go b/cmd/endpoint.go index 8faaee245..29afe1a64 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -411,6 +411,29 @@ func (l EndpointServerPools) GridHosts() (gridHosts []string, gridLocal string) return gridHosts, gridLocal } +// FindGridHostsFromPeer will return a matching peer from provided peer. +func (l EndpointServerPools) FindGridHostsFromPeer(peer *xnet.Host) (peerGrid string) { + if peer == nil { + return "" + } + for _, ep := range l { + for _, endpoint := range ep.Endpoints { + if endpoint.IsLocal { + continue + } + host, err := xnet.ParseHost(endpoint.Host) + if err != nil { + continue + } + + if host.String() == peer.String() { + return endpoint.GridHost() + } + } + } + return "" +} + // Hostnames - returns list of unique hostnames func (l EndpointServerPools) Hostnames() []string { foundSet := set.NewStringSet() diff --git a/cmd/globals.go b/cmd/globals.go index 507fd53ea..ce78c17d0 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -34,6 +34,7 @@ import ( "github.com/minio/minio/internal/bucket/bandwidth" "github.com/minio/minio/internal/config" "github.com/minio/minio/internal/config/browser" + "github.com/minio/minio/internal/grid" "github.com/minio/minio/internal/handlers" "github.com/minio/minio/internal/kms" "go.uber.org/atomic" @@ -130,6 +131,11 @@ const ( tlsClientSessionCacheSize = 100 ) +func init() { + // Injected to prevent circular dependency. + pubsub.GetByteBuffer = grid.GetByteBuffer +} + type poolDisksLayout struct { cmdline string layout [][]string diff --git a/cmd/listen-notification-handlers.go b/cmd/listen-notification-handlers.go index 815387ea5..c8b72836d 100644 --- a/cmd/listen-notification-handlers.go +++ b/cmd/listen-notification-handlers.go @@ -18,12 +18,14 @@ package cmd import ( + "bytes" "encoding/json" "net/http" "strconv" "time" "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/grid" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/pubsub" "github.com/minio/mux" @@ -116,11 +118,32 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r // Listen Publisher and peer-listen-client uses nonblocking send and hence does not wait for slow receivers. // Use buffered channel to take care of burst sends or slow w.Write() - listenCh := make(chan event.Event, globalAPIConfig.getRequestsPoolCapacity()*len(globalEndpoints.Hostnames())) + mergeCh := make(chan []byte, globalAPIConfig.getRequestsPoolCapacity()*len(globalEndpoints.Hostnames())) + localCh := make(chan event.Event, globalAPIConfig.getRequestsPoolCapacity()) + // Convert local messages to JSON and send to mergeCh + go func() { + buf := bytes.NewBuffer(grid.GetByteBuffer()[:0]) + enc := json.NewEncoder(buf) + tmpEvt := struct{ Records []event.Event }{[]event.Event{{}}} + for { + select { + case ev := <-localCh: + buf.Reset() + tmpEvt.Records[0] = ev + if err := enc.Encode(tmpEvt); err != nil { + logger.LogOnceIf(ctx, err, "event: Encode failed") + continue + } + mergeCh <- append(grid.GetByteBuffer()[:0], buf.Bytes()...) + case <-ctx.Done(): + grid.PutByteBuffer(buf.Bytes()) + return + } + } + }() peers, _ := newPeerRestClients(globalEndpoints) - - err := globalHTTPListen.Subscribe(mask, listenCh, ctx.Done(), func(ev event.Event) bool { + err := globalHTTPListen.Subscribe(mask, localCh, ctx.Done(), func(ev event.Event) bool { if ev.S3.Bucket.Name != "" && bucketName != "" { if ev.S3.Bucket.Name != bucketName { return false @@ -139,7 +162,7 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r if peer == nil { continue } - peer.Listen(listenCh, ctx.Done(), values) + peer.Listen(ctx, mergeCh, values) } var ( @@ -170,14 +193,16 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r enc := json.NewEncoder(w) for { select { - case ev := <-listenCh: - if err := enc.Encode(struct{ Records []event.Event }{[]event.Event{ev}}); err != nil { + case ev := <-mergeCh: + _, err := w.Write(ev) + if err != nil { return } - if len(listenCh) == 0 { + if len(mergeCh) == 0 { // Flush if nothing is queued w.(http.Flusher).Flush() } + grid.PutByteBuffer(ev) case <-emptyEventTicker: if err := enc.Encode(struct{ Records []event.Event }{}); err != nil { return diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 88e09022a..98d94a1dc 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -22,19 +22,20 @@ import ( "context" "encoding/gob" "encoding/hex" + "encoding/json" "errors" "fmt" "io" "net/url" "strconv" "strings" + "sync/atomic" "time" "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/bucket/bandwidth" - "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/grid" xhttp "github.com/minio/minio/internal/http" - xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/rest" "github.com/minio/pkg/v2/logger/message/log" @@ -46,6 +47,66 @@ import ( type peerRESTClient struct { host *xnet.Host restClient *rest.Client + gridHost string + // Function that returns the grid connection for this peer when initialized. + // Will return nil if the grid connection is not initialized yet. + gridConn func() *grid.Connection +} + +// Returns a peer rest client. +func newPeerRESTClient(peer *xnet.Host, gridHost string) *peerRESTClient { + scheme := "http" + if globalIsTLS { + scheme = "https" + } + + serverURL := &url.URL{ + Scheme: scheme, + Host: peer.String(), + Path: peerRESTPath, + } + + restClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken()) + // Use a separate client to avoid recursive calls. + healthClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken()) + healthClient.NoMetrics = true + + // Construct a new health function. + restClient.HealthCheckFn = func() bool { + ctx, cancel := context.WithTimeout(context.Background(), restClient.HealthCheckTimeout) + defer cancel() + respBody, err := healthClient.Call(ctx, peerRESTMethodHealth, nil, nil, -1) + xhttp.DrainBody(respBody) + return !isNetworkError(err) + } + var gridConn atomic.Pointer[grid.Connection] + + return &peerRESTClient{ + host: peer, restClient: restClient, gridHost: gridHost, + gridConn: func() *grid.Connection { + // Lazy initialization of grid connection. + // When we create this peer client, the grid connection is likely not yet initialized. + if gridHost == "" { + logger.LogOnceIf(context.Background(), fmt.Errorf("gridHost is empty for peer %s", peer.String()), peer.String()+":gridHost") + return nil + } + gc := gridConn.Load() + if gc != nil { + return gc + } + gm := globalGrid.Load() + if gm == nil { + return nil + } + gc = gm.Connection(gridHost) + if gc == nil { + logger.LogOnceIf(context.Background(), fmt.Errorf("gridHost %q not found for peer %s", gridHost, peer.String()), peer.String()+":gridHost") + return nil + } + gridConn.Store(gc) + return gc + }, + } } // Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected @@ -254,7 +315,7 @@ func (client *peerRESTClient) GetResourceMetrics(ctx context.Context) (<-chan Me go func(ch chan<- Metric) { defer func() { xhttp.DrainBody(respBody) - xioutil.SafeClose(ch) + close(ch) }() for { var metric Metric @@ -618,92 +679,61 @@ func (client *peerRESTClient) LoadTransitionTierConfig(ctx context.Context) erro return nil } -func (client *peerRESTClient) doTrace(traceCh chan<- madmin.TraceInfo, doneCh <-chan struct{}, traceOpts madmin.ServiceTraceOpts) { - values := make(url.Values) - traceOpts.AddParams(values) - - // To cancel the REST request in case doneCh gets closed. - ctx, cancel := context.WithCancel(GlobalContext) - - cancelCh := make(chan struct{}) - defer xioutil.SafeClose(cancelCh) - go func() { - select { - case <-doneCh: - case <-cancelCh: - // There was an error in the REST request. - } - cancel() - }() - - respBody, err := client.callWithContext(ctx, peerRESTMethodTrace, values, nil, -1) - defer xhttp.DrainBody(respBody) - - if err != nil { +func (client *peerRESTClient) doTrace(ctx context.Context, traceCh chan<- []byte, traceOpts madmin.ServiceTraceOpts) { + gridConn := client.gridConn() + if gridConn == nil { return } - dec := gob.NewDecoder(respBody) - for { - var info madmin.TraceInfo - if err = dec.Decode(&info); err != nil { - return - } - if len(info.NodeName) > 0 { - select { - case traceCh <- info: - default: - // Do not block on slow receivers. - } - } + payload, err := json.Marshal(traceOpts) + if err != nil { + logger.LogIf(ctx, err) + return } + + st, err := gridConn.NewStream(ctx, grid.HandlerTrace, payload) + if err != nil { + return + } + st.Results(func(b []byte) error { + select { + case traceCh <- b: + default: + // Do not block on slow receivers. + // Just recycle the buffer. + grid.PutByteBuffer(b) + } + return nil + }) } -func (client *peerRESTClient) doListen(listenCh chan<- event.Event, doneCh <-chan struct{}, v url.Values) { - // To cancel the REST request in case doneCh gets closed. - ctx, cancel := context.WithCancel(GlobalContext) - - cancelCh := make(chan struct{}) - defer xioutil.SafeClose(cancelCh) - go func() { - select { - case <-doneCh: - case <-cancelCh: - // There was an error in the REST request. - } - cancel() - }() - - respBody, err := client.callWithContext(ctx, peerRESTMethodListen, v, nil, -1) - defer xhttp.DrainBody(respBody) - +func (client *peerRESTClient) doListen(ctx context.Context, listenCh chan<- []byte, v url.Values) { + conn := client.gridConn() + if conn == nil { + return + } + st, err := listenHandler.Call(ctx, conn, grid.NewURLValuesWith(v)) if err != nil { return } - - dec := gob.NewDecoder(respBody) - for { - var ev event.Event - if err := dec.Decode(&ev); err != nil { - return + st.Results(func(b *grid.Bytes) error { + select { + case listenCh <- *b: + default: + // Do not block on slow receivers. + b.Recycle() } - if len(ev.EventVersion) > 0 { - select { - case listenCh <- ev: - default: - // Do not block on slow receivers. - } - } - } + return nil + }) } // Listen - listen on peers. -func (client *peerRESTClient) Listen(listenCh chan<- event.Event, doneCh <-chan struct{}, v url.Values) { +func (client *peerRESTClient) Listen(ctx context.Context, listenCh chan<- []byte, v url.Values) { go func() { for { - client.doListen(listenCh, doneCh, v) + client.doListen(ctx, listenCh, v) select { - case <-doneCh: + case <-ctx.Done(): return default: // There was error in the REST request, retry after sometime as probably the peer is down. @@ -714,12 +744,13 @@ func (client *peerRESTClient) Listen(listenCh chan<- event.Event, doneCh <-chan } // Trace - send http trace request to peer nodes -func (client *peerRESTClient) Trace(traceCh chan<- madmin.TraceInfo, doneCh <-chan struct{}, traceOpts madmin.ServiceTraceOpts) { +func (client *peerRESTClient) Trace(ctx context.Context, traceCh chan<- []byte, traceOpts madmin.ServiceTraceOpts) { go func() { for { - client.doTrace(traceCh, doneCh, traceOpts) + // Blocks until context is canceled or an error occurs. + client.doTrace(ctx, traceCh, traceOpts) select { - case <-doneCh: + case <-ctx.Done(): return default: // There was error in the REST request, retry after sometime as probably the peer is down. @@ -734,7 +765,7 @@ func (client *peerRESTClient) doConsoleLog(logCh chan log.Info, doneCh <-chan st ctx, cancel := context.WithCancel(GlobalContext) cancelCh := make(chan struct{}) - defer xioutil.SafeClose(cancelCh) + defer close(cancelCh) go func() { select { case <-doneCh: @@ -789,6 +820,7 @@ func newPeerRestClients(endpoints EndpointServerPools) (remote, all []*peerRESTC // Only useful in distributed setups return nil, nil } + hosts := endpoints.hostsSorted() remote = make([]*peerRESTClient, 0, len(hosts)) all = make([]*peerRESTClient, len(hosts)) @@ -796,7 +828,7 @@ func newPeerRestClients(endpoints EndpointServerPools) (remote, all []*peerRESTC if host == nil { continue } - all[i] = newPeerRESTClient(host) + all[i] = newPeerRESTClient(host, endpoints.FindGridHostsFromPeer(host)) remote = append(remote, all[i]) } if len(all) != len(remote)+1 { @@ -805,36 +837,6 @@ func newPeerRestClients(endpoints EndpointServerPools) (remote, all []*peerRESTC return remote, all } -// Returns a peer rest client. -func newPeerRESTClient(peer *xnet.Host) *peerRESTClient { - scheme := "http" - if globalIsTLS { - scheme = "https" - } - - serverURL := &url.URL{ - Scheme: scheme, - Host: peer.String(), - Path: peerRESTPath, - } - - restClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken()) - // Use a separate client to avoid recursive calls. - healthClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken()) - healthClient.NoMetrics = true - - // Construct a new health function. - restClient.HealthCheckFn = func() bool { - ctx, cancel := context.WithTimeout(context.Background(), restClient.HealthCheckTimeout) - defer cancel() - respBody, err := healthClient.Call(ctx, peerRESTMethodHealth, nil, nil, -1) - xhttp.DrainBody(respBody) - return !isNetworkError(err) - } - - return &peerRESTClient{host: peer, restClient: restClient} -} - // MonitorBandwidth - send http trace request to peer nodes func (client *peerRESTClient) MonitorBandwidth(ctx context.Context, buckets []string) (*bandwidth.BucketBandwidthReport, error) { values := make(url.Values) @@ -861,7 +863,7 @@ func (client *peerRESTClient) GetPeerMetrics(ctx context.Context) (<-chan Metric go func(ch chan<- Metric) { defer func() { xhttp.DrainBody(respBody) - xioutil.SafeClose(ch) + close(ch) }() for { var metric Metric @@ -888,7 +890,7 @@ func (client *peerRESTClient) GetPeerBucketMetrics(ctx context.Context) (<-chan go func(ch chan<- Metric) { defer func() { xhttp.DrainBody(respBody) - xioutil.SafeClose(ch) + close(ch) }() for { var metric Metric @@ -1026,7 +1028,7 @@ func (client *peerRESTClient) GetReplicationMRF(ctx context.Context, bucket stri go func(ch chan madmin.ReplicationMRF) { defer func() { xhttp.DrainBody(respBody) - xioutil.SafeClose(ch) + close(ch) }() for { var entry madmin.ReplicationMRF diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index efbdad203..93dda9c6d 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -18,6 +18,7 @@ package cmd import ( + "bytes" "context" "encoding/gob" "encoding/hex" @@ -37,6 +38,7 @@ import ( "github.com/minio/madmin-go/v3" b "github.com/minio/minio/internal/bucket/bandwidth" "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/grid" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/pubsub" @@ -976,25 +978,23 @@ func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Req } } +// Set an output capacity of 100 for listenHandler +// There is another buffer that will buffer events. +var listenHandler = grid.NewStream[*grid.URLValues, grid.NoPayload, *grid.Bytes](grid.HandlerListen, + grid.NewURLValues, nil, grid.NewBytes).WithOutCapacity(100) + // ListenHandler sends http trace messages back to peer rest client -func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) { - if !s.IsValid(w, r) { - s.writeErrorResponse(w, errors.New("invalid request")) - return - } - - values := r.Form - +func (s *peerRESTServer) ListenHandler(ctx context.Context, v *grid.URLValues, out chan<- *grid.Bytes) *grid.RemoteErr { + values := v.Values() + defer v.Recycle() var prefix string if len(values[peerRESTListenPrefix]) > 1 { - s.writeErrorResponse(w, errors.New("invalid request")) - return + return grid.NewRemoteErrString("invalid request (peerRESTListenPrefix)") } - + globalAPIConfig.getRequestsPoolCapacity() if len(values[peerRESTListenPrefix]) == 1 { if err := event.ValidateFilterRuleValue(values[peerRESTListenPrefix][0]); err != nil { - s.writeErrorResponse(w, err) - return + return grid.NewRemoteErr(err) } prefix = values[peerRESTListenPrefix][0] @@ -1002,14 +1002,12 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) { var suffix string if len(values[peerRESTListenSuffix]) > 1 { - s.writeErrorResponse(w, errors.New("invalid request")) - return + return grid.NewRemoteErrString("invalid request (peerRESTListenSuffix)") } if len(values[peerRESTListenSuffix]) == 1 { if err := event.ValidateFilterRuleValue(values[peerRESTListenSuffix][0]); err != nil { - s.writeErrorResponse(w, err) - return + return grid.NewRemoteErr(err) } suffix = values[peerRESTListenSuffix][0] @@ -1022,8 +1020,7 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) { for _, ev := range values[peerRESTListenEvents] { eventName, err := event.ParseName(ev) if err != nil { - s.writeErrorResponse(w, err) - return + return grid.NewRemoteErr(err) } mask.MergeMaskable(eventName) eventNames = append(eventNames, eventName) @@ -1031,13 +1028,10 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) { rulesMap := event.NewRulesMap(eventNames, pattern, event.TargetID{ID: mustGetUUID()}) - doneCh := r.Context().Done() - // Listen Publisher uses nonblocking publish and hence does not wait for slow subscribers. // Use buffered channel to take care of burst sends or slow w.Write() ch := make(chan event.Event, globalAPIConfig.getRequestsPoolCapacity()) - - err := globalHTTPListen.Subscribe(mask, ch, doneCh, func(ev event.Event) bool { + err := globalHTTPListen.Subscribe(mask, ch, ctx.Done(), func(ev event.Event) bool { if ev.S3.Bucket.Name != "" && values.Get(peerRESTListenBucket) != "" { if ev.S3.Bucket.Name != values.Get(peerRESTListenBucket) { return false @@ -1046,83 +1040,56 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) { return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key) }) if err != nil { - s.writeErrorResponse(w, err) - return + return grid.NewRemoteErr(err) } - keepAliveTicker := time.NewTicker(500 * time.Millisecond) - defer keepAliveTicker.Stop() - enc := gob.NewEncoder(w) + // Process until remote disconnects. + // Blocks on upstream (out) congestion. + // We have however a dynamic downstream buffer (ch). + buf := bytes.NewBuffer(grid.GetByteBuffer()) + enc := json.NewEncoder(buf) + tmpEvt := struct{ Records []event.Event }{[]event.Event{{}}} for { select { + case <-ctx.Done(): + grid.PutByteBuffer(buf.Bytes()) + return nil case ev := <-ch: - if err := enc.Encode(ev); err != nil { - return + buf.Reset() + tmpEvt.Records[0] = ev + if err := enc.Encode(tmpEvt); err != nil { + logger.LogOnceIf(ctx, err, "event: Encode failed") + continue } - if len(ch) == 0 { - // Flush if nothing is queued - w.(http.Flusher).Flush() - } - case <-r.Context().Done(): - return - case <-keepAliveTicker.C: - if err := enc.Encode(&event.Event{}); err != nil { - return - } - w.(http.Flusher).Flush() + out <- grid.NewBytesWith(append(grid.GetByteBuffer()[:0], buf.Bytes()...)) } } } // TraceHandler sends http trace messages back to peer rest client -func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) { - if !s.IsValid(w, r) { - s.writeErrorResponse(w, errors.New("Invalid request")) - return - } - +func (s *peerRESTServer) TraceHandler(ctx context.Context, payload []byte, _ <-chan []byte, out chan<- []byte) *grid.RemoteErr { var traceOpts madmin.ServiceTraceOpts - err := traceOpts.ParseParams(r) + err := json.Unmarshal(payload, &traceOpts) if err != nil { - s.writeErrorResponse(w, errors.New("Invalid request")) - return + return grid.NewRemoteErr(err) } // Trace Publisher uses nonblocking publish and hence does not wait for slow subscribers. // Use buffered channel to take care of burst sends or slow w.Write() - ch := make(chan madmin.TraceInfo, 100000) - err = globalTrace.Subscribe(traceOpts.TraceTypes(), ch, r.Context().Done(), func(entry madmin.TraceInfo) bool { + err = globalTrace.SubscribeJSON(traceOpts.TraceTypes(), out, ctx.Done(), func(entry madmin.TraceInfo) bool { return shouldTrace(entry, traceOpts) }) if err != nil { - s.writeErrorResponse(w, err) - return + return grid.NewRemoteErr(err) } // Publish bootstrap events that have already occurred before client could subscribe. if traceOpts.TraceTypes().Contains(madmin.TraceBootstrap) { - go globalBootstrapTracer.Publish(r.Context(), globalTrace) - } - - keepAliveTicker := time.NewTicker(500 * time.Millisecond) - defer keepAliveTicker.Stop() - - enc := gob.NewEncoder(w) - for { - select { - case entry := <-ch: - if err := enc.Encode(entry); err != nil { - return - } - case <-r.Context().Done(): - return - case <-keepAliveTicker.C: - if err := enc.Encode(&madmin.TraceInfo{}); err != nil { - return - } - w.(http.Flusher).Flush() - } + go globalBootstrapTracer.Publish(ctx, globalTrace) } + // Wait for remote to cancel. + <-ctx.Done() + return nil } func (s *peerRESTServer) BackgroundHealStatusHandler(w http.ResponseWriter, r *http.Request) { @@ -1522,7 +1489,7 @@ func (s *peerRESTServer) NetSpeedTestHandler(w http.ResponseWriter, r *http.Requ } // registerPeerRESTHandlers - register peer rest router. -func registerPeerRESTHandlers(router *mux.Router) { +func registerPeerRESTHandlers(router *mux.Router, gm *grid.Manager) { h := func(f http.HandlerFunc) http.HandlerFunc { return collectInternodeStats(httpTraceHdrs(f)) } @@ -1564,8 +1531,6 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStartProfiling).HandlerFunc(h(server.StartProfilingHandler)).Queries(restQueries(peerRESTProfiler)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDownloadProfilingData).HandlerFunc(h(server.DownloadProfilingDataHandler)) - subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTrace).HandlerFunc(server.TraceHandler) - subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodListen).HandlerFunc(h(server.ListenHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLog).HandlerFunc(server.ConsoleLogHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLocalDiskIDs).HandlerFunc(h(server.GetLocalDiskIDs)) @@ -1584,4 +1549,11 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadRebalanceMeta).HandlerFunc(h(server.LoadRebalanceMetaHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStopRebalance).HandlerFunc(h(server.StopRebalanceHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLastDayTierStats).HandlerFunc(h(server.GetLastDayTierStatsHandler)) + logger.FatalIf(listenHandler.RegisterNoInput(gm, server.ListenHandler), "unable to register handler") + logger.FatalIf(gm.RegisterStreamingHandler(grid.HandlerTrace, grid.StreamHandler{ + Handle: server.TraceHandler, + Subroute: "", + OutCapacity: 100000, + InCapacity: 0, + }), "unable to register handler") } diff --git a/cmd/routers.go b/cmd/routers.go index 7ee5446bf..d786dc3ad 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -30,7 +30,7 @@ func registerDistErasureRouters(router *mux.Router, endpointServerPools Endpoint registerStorageRESTHandlers(router, endpointServerPools, globalGrid.Load()) // Register peer REST router only if its a distributed setup. - registerPeerRESTHandlers(router) + registerPeerRESTHandlers(router, globalGrid.Load()) // Register peer S3 router only if its a distributed setup. registerPeerS3Handlers(router) diff --git a/internal/grid/handlers.go b/internal/grid/handlers.go index 2143baa23..36ff172d5 100644 --- a/internal/grid/handlers.go +++ b/internal/grid/handlers.go @@ -59,8 +59,10 @@ const ( HandlerRenameData HandlerRenameFile HandlerReadAll - HandlerServerVerify + HandlerTrace + HandlerListen + // Add more above here ^^^ // If all handlers are used, the type of Handler can be changed. // Handlers have no versioning, so non-compatible handler changes must result in new IDs. @@ -542,6 +544,20 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) Register(m *Manager, handle func return h.register(m, handle, subroute...) } +// WithOutCapacity adjusts the output capacity from the handler perspective. +// This must be done prior to registering the handler. +func (h *StreamTypeHandler[Payload, Req, Resp]) WithOutCapacity(out int) *StreamTypeHandler[Payload, Req, Resp] { + h.OutCapacity = out + return h +} + +// WithInCapacity adjusts the input capacity from the handler perspective. +// This must be done prior to registering the handler. +func (h *StreamTypeHandler[Payload, Req, Resp]) WithInCapacity(in int) *StreamTypeHandler[Payload, Req, Resp] { + h.InCapacity = in + return h +} + // RegisterNoInput a handler for one-way streaming with payload and output stream. // An optional subroute can be given. Multiple entries are joined with '/'. func (h *StreamTypeHandler[Payload, Req, Resp]) RegisterNoInput(m *Manager, handle func(ctx context.Context, p Payload, out chan<- Resp) *RemoteErr, subroute ...string) error { diff --git a/internal/grid/handlers_string.go b/internal/grid/handlers_string.go index af26b4a31..b260ea72d 100644 --- a/internal/grid/handlers_string.go +++ b/internal/grid/handlers_string.go @@ -30,14 +30,16 @@ func _() { _ = x[HandlerRenameFile-19] _ = x[HandlerReadAll-20] _ = x[HandlerServerVerify-21] - _ = x[handlerTest-22] - _ = x[handlerTest2-23] - _ = x[handlerLast-24] + _ = x[HandlerTrace-22] + _ = x[HandlerListen-23] + _ = x[handlerTest-24] + _ = x[handlerTest2-25] + _ = x[handlerLast-26] } -const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyhandlerTesthandlerTest2handlerLast" +const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyTraceListenhandlerTesthandlerTest2handlerLast" -var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 236, 248, 259} +var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 230, 236, 247, 259, 270} func (i HandlerID) String() string { if i >= HandlerID(len(_HandlerID_index)-1) { diff --git a/internal/grid/types.go b/internal/grid/types.go index 35851f286..8d900115d 100644 --- a/internal/grid/types.go +++ b/internal/grid/types.go @@ -22,6 +22,7 @@ import ( "net/url" "sort" "strings" + "sync" "github.com/tinylib/msgp/msgp" ) @@ -198,3 +199,133 @@ func (b *Bytes) Msgsize() int { } return msgp.ArrayHeaderSize + len(*b) } + +// Recycle puts the Bytes back into the pool. +func (b *Bytes) Recycle() { + if *b != nil { + PutByteBuffer(*b) + *b = nil + } +} + +// URLValues can be used for url.Values. +type URLValues map[string][]string + +var urlValuesPool = sync.Pool{ + New: func() interface{} { + return make(map[string][]string, 10) + }, +} + +// NewURLValues returns a new URLValues. +func NewURLValues() *URLValues { + u := URLValues(urlValuesPool.Get().(map[string][]string)) + return &u +} + +// NewURLValuesWith returns a new URLValues with the provided content. +func NewURLValuesWith(values map[string][]string) *URLValues { + u := URLValues(values) + return &u +} + +// Values returns the url.Values. +// If u is nil, an empty url.Values is returned. +// The values are a shallow copy of the underlying map. +func (u *URLValues) Values() url.Values { + if u == nil { + return url.Values{} + } + return url.Values(*u) +} + +// Recycle the underlying map. +func (u *URLValues) Recycle() { + if *u != nil { + for key := range *u { + delete(*u, key) + } + val := map[string][]string(*u) + urlValuesPool.Put(val) + *u = nil + } +} + +// MarshalMsg implements msgp.Marshaler +func (u URLValues) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, u.Msgsize()) + o = msgp.AppendMapHeader(o, uint32(len(u))) + for zb0006, zb0007 := range u { + o = msgp.AppendString(o, zb0006) + o = msgp.AppendArrayHeader(o, uint32(len(zb0007))) + for zb0008 := range zb0007 { + o = msgp.AppendString(o, zb0007[zb0008]) + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (u *URLValues) UnmarshalMsg(bts []byte) (o []byte, err error) { + var zb0004 uint32 + zb0004, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + if *u == nil { + *u = urlValuesPool.Get().(map[string][]string) + } + if len(*u) > 0 { + for key := range *u { + delete(*u, key) + } + } + + for zb0004 > 0 { + var zb0001 string + var zb0002 []string + zb0004-- + zb0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + var zb0005 uint32 + zb0005, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + if cap(zb0002) >= int(zb0005) { + zb0002 = zb0002[:zb0005] + } else { + zb0002 = make([]string, zb0005) + } + for zb0003 := range zb0002 { + zb0002[zb0003], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, zb0001, zb0003) + return + } + } + (*u)[zb0001] = zb0002 + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (u URLValues) Msgsize() (s int) { + s = msgp.MapHeaderSize + if u != nil { + for zb0006, zb0007 := range u { + _ = zb0007 + s += msgp.StringPrefixSize + len(zb0006) + msgp.ArrayHeaderSize + for zb0008 := range zb0007 { + s += msgp.StringPrefixSize + len(zb0007[zb0008]) + } + } + } + return +} diff --git a/internal/pubsub/pubsub.go b/internal/pubsub/pubsub.go index 294976f15..74131437c 100644 --- a/internal/pubsub/pubsub.go +++ b/internal/pubsub/pubsub.go @@ -18,11 +18,18 @@ package pubsub import ( + "bytes" + "encoding/json" "fmt" "sync" "sync/atomic" ) +// GetByteBuffer returns a byte buffer from the pool. +var GetByteBuffer = func() []byte { + return make([]byte, 0, 4096) +} + // Sub - subscriber entity. type Sub[T Maskable] struct { ch chan T @@ -96,6 +103,62 @@ func (ps *PubSub[T, M]) Subscribe(mask M, subCh chan T, doneCh <-chan struct{}, return nil } +// SubscribeJSON - Adds a subscriber to pubsub system and returns results with JSON encoding. +func (ps *PubSub[T, M]) SubscribeJSON(mask M, subCh chan<- []byte, doneCh <-chan struct{}, filter func(entry T) bool) error { + totalSubs := atomic.AddInt32(&ps.numSubscribers, 1) + if ps.maxSubscribers > 0 && totalSubs > ps.maxSubscribers { + atomic.AddInt32(&ps.numSubscribers, -1) + return fmt.Errorf("the limit of `%d` subscribers is reached", ps.maxSubscribers) + } + ps.Lock() + defer ps.Unlock() + subChT := make(chan T, 10000) + sub := &Sub[T]{ch: subChT, types: Mask(mask.Mask()), filter: filter} + ps.subs = append(ps.subs, sub) + + // We hold a lock, so we are safe to update + combined := Mask(atomic.LoadUint64(&ps.types)) + combined.Merge(Mask(mask.Mask())) + atomic.StoreUint64(&ps.types, uint64(combined)) + + go func() { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + for { + select { + case <-doneCh: + case v, ok := <-subChT: + if !ok { + break + } + buf.Reset() + err := enc.Encode(v) + if err != nil { + break + } + subCh <- append(GetByteBuffer()[:0], buf.Bytes()...) + continue + } + break + } + + ps.Lock() + defer ps.Unlock() + var remainTypes Mask + for i, s := range ps.subs { + if s == sub { + ps.subs = append(ps.subs[:i], ps.subs[i+1:]...) + } else { + remainTypes.Merge(s.types) + } + } + atomic.StoreUint64(&ps.types, uint64(remainTypes)) + atomic.AddInt32(&ps.numSubscribers, -1) + }() + + return nil +} + // NumSubscribers returns the number of current subscribers, // The mask is checked against the active subscribed types, // and 0 will be returned if nobody is subscribed for the type(s).