From fb268add7a64ded104a8714f75bcaccdb74afbf3 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 18 Nov 2021 17:19:58 -0800 Subject: [PATCH] do not flush if Write() failed (#13597) - Go might reset the internal http.ResponseWriter() to `nil` after Write() failure if the go-routine has returned, do not flush() such scenarios and avoid spurious flushes() as returning handlers always flush. - fix some racy tests with the console - avoid ticker leaks in certain situations --- cmd/admin-handlers.go | 36 ++++++++++++++------- cmd/api-router.go | 13 ++++++++ cmd/gateway-main.go | 20 +++++++++--- cmd/object-handlers.go | 9 ++++-- cmd/peer-rest-server.go | 12 ------- cmd/server-main.go | 6 ++-- cmd/signals.go | 4 +-- cmd/storage-rest-server.go | 65 ++++++++++++++++++++++++++++---------- internal/http/server.go | 1 - 9 files changed, 112 insertions(+), 54 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 159ff16f7..9d98fd345 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -756,13 +756,17 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) } // Send whitespace and keep connection open - w.Write([]byte(" ")) + if _, err := w.Write([]byte(" ")); err != nil { + return + } w.(http.Flusher).Flush() case hr := <-respCh: switch hr.apiErr { case noError: if started { - w.Write(hr.respBytes) + if _, err := w.Write(hr.respBytes); err != nil { + return + } w.(http.Flusher).Flush() } else { writeSuccessResponseJSON(w, hr.respBytes) @@ -787,7 +791,9 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set(xhttp.ContentType, string(mimeJSON)) w.WriteHeader(hr.apiErr.HTTPStatusCode) } - w.Write(errorRespJSON) + if _, err := w.Write(errorRespJSON); err != nil { + return + } w.(http.Flusher).Flush() } break forLoop @@ -1194,10 +1200,10 @@ func (a adminAPIHandlers) ConsoleLogHandler(w http.ResponseWriter, r *http.Reque if err := enc.Encode(log); err != nil { return } - } - if len(logCh) == 0 { - // Flush if nothing is queued - w.(http.Flusher).Flush() + if len(logCh) == 0 { + // Flush if nothing is queued + w.(http.Flusher).Flush() + } } case <-keepAliveTicker.C: if len(logCh) > 0 { @@ -1863,15 +1869,19 @@ func (a adminAPIHandlers) HealthInfoHandler(w http.ResponseWriter, r *http.Reque if !ok { return } - logger.LogIf(ctx, enc.Encode(oinfo)) - w.(http.Flusher).Flush() + if err := enc.Encode(oinfo); err != nil { + return + } + if len(healthInfoCh) == 0 { + // Flush if nothing is queued + w.(http.Flusher).Flush() + } case <-ticker.C: if _, err := w.Write([]byte(" ")); err != nil { return } w.(http.Flusher).Flush() case <-deadlinedCtx.Done(): - w.(http.Flusher).Flush() return } } @@ -1940,10 +1950,12 @@ func (a adminAPIHandlers) BandwidthMonitorHandler(w http.ResponseWriter, r *http return } if err := enc.Encode(report); err != nil { - writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) return } - w.(http.Flusher).Flush() + if len(reportCh) == 0 { + // Flush if nothing is queued + w.(http.Flusher).Flush() + } case <-keepAliveTicker.C: if _, err := w.Write([]byte(" ")); err != nil { return diff --git a/cmd/api-router.go b/cmd/api-router.go index 5f51a5dee..f19ba7b52 100644 --- a/cmd/api-router.go +++ b/cmd/api-router.go @@ -24,6 +24,7 @@ import ( "github.com/gorilla/mux" "github.com/klauspost/compress/gzhttp" + "github.com/minio/console/restapi" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/wildcard" @@ -42,6 +43,18 @@ func setHTTPServer(h *xhttp.Server) { globalObjLayerMutex.Unlock() } +func newConsoleServerFn() *restapi.Server { + globalObjLayerMutex.RLock() + defer globalObjLayerMutex.RUnlock() + return globalConsoleSrv +} + +func setConsoleSrv(srv *restapi.Server) { + globalObjLayerMutex.Lock() + globalConsoleSrv = srv + globalObjLayerMutex.Unlock() +} + func newObjectLayerFn() ObjectLayer { globalObjLayerMutex.RLock() defer globalObjLayerMutex.RUnlock() diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index dd56751c8..eecef9cd7 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -277,9 +277,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) { globalHTTPServerErrorCh <- httpServer.Start(GlobalContext) }() - globalObjLayerMutex.Lock() - globalHTTPServer = httpServer - globalObjLayerMutex.Unlock() + setHTTPServer(httpServer) newObject, err := gw.NewGatewayLayer(madmin.Credentials{ AccessKey: globalActiveCred.AccessKey, @@ -345,14 +343,26 @@ func StartGateway(ctx *cli.Context, gw Gateway) { } if globalBrowserEnabled { - globalConsoleSrv, err = initConsoleServer() + srv, err := initConsoleServer() if err != nil { logger.FatalIf(err, "Unable to initialize console service") } + setConsoleSrv(srv) + go func() { - logger.FatalIf(globalConsoleSrv.Serve(), "Unable to initialize console server") + logger.FatalIf(newConsoleServerFn().Serve(), "Unable to initialize console server") }() } + + if serverDebugLog { + logger.Info("== DEBUG Mode enabled ==") + logger.Info("Currently set environment settings:") + for _, v := range os.Environ() { + logger.Info(v) + } + logger.Info("======") + } + <-globalOSSignalCh } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 8350d4f29..08a29a35e 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -3060,14 +3060,17 @@ func sendWhiteSpace(w http.ResponseWriter) <-chan bool { case <-ticker.C: // Write header if not written yet. if !headerWritten { - w.Write([]byte(xml.Header)) - headerWritten = true + _, err := w.Write([]byte(xml.Header)) + headerWritten = err == nil } // Once header is written keep writing empty spaces // which are ignored by client SDK XML parsers. // This occurs when server takes long time to completeMultiPartUpload() - w.Write([]byte(" ")) + _, err := w.Write([]byte(" ")) + if err != nil { + return + } w.(http.Flusher).Flush() case doneCh <- headerWritten: ticker.Stop() diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index c53f1f0e2..48df0eb8c 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -821,7 +821,6 @@ func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Req return } signal := serviceSignal(si) - defer w.(http.Flusher).Flush() switch signal { case serviceRestart: globalServiceSignalCh <- signal @@ -902,9 +901,6 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) { rulesMap := event.NewRulesMap(eventNames, pattern, event.TargetID{ID: mustGetUUID()}) - w.WriteHeader(http.StatusOK) - w.(http.Flusher).Flush() - doneCh := make(chan struct{}) defer close(doneCh) @@ -975,9 +971,6 @@ func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) { return } - w.WriteHeader(http.StatusOK) - w.(http.Flusher).Flush() - doneCh := make(chan struct{}) defer close(doneCh) @@ -1047,7 +1040,6 @@ func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Reques w.Header().Set("Connection", "close") w.WriteHeader(http.StatusOK) - w.(http.Flusher).Flush() doneCh := make(chan struct{}) defer close(doneCh) @@ -1091,8 +1083,6 @@ func (s *peerRESTServer) GetBandwidth(w http.ResponseWriter, r *http.Request) { } bucketsString := r.Form.Get("buckets") - w.WriteHeader(http.StatusOK) - w.(http.Flusher).Flush() doneCh := make(chan struct{}) defer close(doneCh) @@ -1112,8 +1102,6 @@ func (s *peerRESTServer) GetPeerMetrics(w http.ResponseWriter, r *http.Request) if !s.IsValid(w, r) { s.writeErrorResponse(w, errors.New("invalid request")) } - w.WriteHeader(http.StatusOK) - w.(http.Flusher).Flush() doneCh := make(chan struct{}) defer close(doneCh) diff --git a/cmd/server-main.go b/cmd/server-main.go index 2d94458c1..df052ffd8 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -609,13 +609,15 @@ func serverMain(ctx *cli.Context) { } if globalBrowserEnabled { - globalConsoleSrv, err = initConsoleServer() + srv, err := initConsoleServer() if err != nil { logger.FatalIf(err, "Unable to initialize console service") } + setConsoleSrv(srv) + go func() { - logger.FatalIf(globalConsoleSrv.Serve(), "Unable to initialize console server") + logger.FatalIf(newConsoleServerFn().Serve(), "Unable to initialize console server") }() } diff --git a/cmd/signals.go b/cmd/signals.go index e3c03d151..8e43dae5d 100644 --- a/cmd/signals.go +++ b/cmd/signals.go @@ -66,8 +66,8 @@ func handleSignals() { logger.LogIf(context.Background(), oerr) } - if globalConsoleSrv != nil { - logger.LogIf(context.Background(), globalConsoleSrv.Shutdown()) + if srv := newConsoleServerFn(); srv != nil { + logger.LogIf(context.Background(), srv.Shutdown()) } return (err == nil && oerr == nil) diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 09b1da024..089c5248d 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -760,37 +760,51 @@ func keepHTTPReqResponseAlive(w http.ResponseWriter, r *http.Request) (resp func doneCh := make(chan error) ctx := r.Context() go func() { + defer close(doneCh) // Wait for body to be read. select { case <-ctx.Done(): + return case <-bodyDoneCh: - case err := <-doneCh: + case err, ok := <-doneCh: + if !ok { + return + } if err != nil { - w.Write([]byte{1}) + _, werr := w.Write([]byte{1}) + if werr != nil { + return + } w.Write([]byte(err.Error())) } else { w.Write([]byte{0}) } - close(doneCh) return } - defer close(doneCh) // Initiate ticker after body has been read. ticker := time.NewTicker(time.Second * 10) + defer ticker.Stop() for { select { case <-ticker.C: // Response not ready, write a filler byte. - w.Write([]byte{32}) + if _, err := w.Write([]byte{32}); err != nil { + return + } w.(http.Flusher).Flush() - case err := <-doneCh: + case err, ok := <-doneCh: + if !ok { + return + } if err != nil { - w.Write([]byte{1}) + _, werr := w.Write([]byte{1}) + if werr != nil { + return + } w.Write([]byte(err.Error())) } else { w.Write([]byte{0}) } - ticker.Stop() return } } @@ -825,20 +839,25 @@ func keepHTTPResponseAlive(w http.ResponseWriter) func(error) { go func() { defer close(doneCh) ticker := time.NewTicker(time.Second * 10) + defer ticker.Stop() for { select { case <-ticker.C: // Response not ready, write a filler byte. - w.Write([]byte{32}) + if _, err := w.Write([]byte{32}); err != nil { + return + } w.(http.Flusher).Flush() case err := <-doneCh: if err != nil { - w.Write([]byte{1}) + _, werr := w.Write([]byte{1}) + if werr != nil { + return + } w.Write([]byte(err.Error())) } else { w.Write([]byte{0}) } - ticker.Stop() return } } @@ -932,18 +951,24 @@ func streamHTTPResponse(w http.ResponseWriter) *httpStreamResponse { blockCh := make(chan []byte) h := httpStreamResponse{done: doneCh, block: blockCh} go func() { + defer close(doneCh) ticker := time.NewTicker(time.Second * 10) + defer ticker.Stop() for { select { case <-ticker.C: // Response not ready, write a filler byte. - w.Write([]byte{32}) + _, err := w.Write([]byte{32}) + if err != nil { + return + } w.(http.Flusher).Flush() case err := <-doneCh: - ticker.Stop() - defer close(doneCh) if err != nil { - w.Write([]byte{1}) + _, werr := w.Write([]byte{1}) + if werr != nil { + return + } w.Write([]byte(err.Error())) } else { w.Write([]byte{0}) @@ -953,8 +978,14 @@ func streamHTTPResponse(w http.ResponseWriter) *httpStreamResponse { var tmp [5]byte tmp[0] = 2 binary.LittleEndian.PutUint32(tmp[1:], uint32(len(block))) - w.Write(tmp[:]) - w.Write(block) + _, err := w.Write(tmp[:]) + if err != nil { + return + } + _, err = w.Write(block) + if err != nil { + return + } w.(http.Flusher).Flush() } } diff --git a/internal/http/server.go b/internal/http/server.go index 1315031ea..1b1e2f5dd 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -92,7 +92,6 @@ func (srv *Server) Start(ctx context.Context) (err error) { w.Header().Set("Connection", "close") w.WriteHeader(http.StatusServiceUnavailable) w.Write([]byte(http.ErrServerClosed.Error())) - w.(http.Flusher).Flush() return }