1
0
mirror of https://github.com/minio/minio synced 2024-07-08 19:56:05 +00:00

do not flush if Write() failed (#13597)

- Go might reset the internal http.ResponseWriter() to `nil`
  after Write() failure if the go-routine has returned, do not
  flush() such scenarios and avoid spurious flushes() as
  returning handlers always flush.
- fix some racy tests with the console 
- avoid ticker leaks in certain situations
This commit is contained in:
Harshavardhana 2021-11-18 17:19:58 -08:00 committed by GitHub
parent 7700973538
commit fb268add7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 112 additions and 54 deletions

View File

@ -756,13 +756,17 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200) w.WriteHeader(200)
} }
// Send whitespace and keep connection open // Send whitespace and keep connection open
w.Write([]byte(" ")) if _, err := w.Write([]byte(" ")); err != nil {
return
}
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
case hr := <-respCh: case hr := <-respCh:
switch hr.apiErr { switch hr.apiErr {
case noError: case noError:
if started { if started {
w.Write(hr.respBytes) if _, err := w.Write(hr.respBytes); err != nil {
return
}
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} else { } else {
writeSuccessResponseJSON(w, hr.respBytes) writeSuccessResponseJSON(w, hr.respBytes)
@ -787,7 +791,9 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set(xhttp.ContentType, string(mimeJSON)) w.Header().Set(xhttp.ContentType, string(mimeJSON))
w.WriteHeader(hr.apiErr.HTTPStatusCode) w.WriteHeader(hr.apiErr.HTTPStatusCode)
} }
w.Write(errorRespJSON) if _, err := w.Write(errorRespJSON); err != nil {
return
}
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} }
break forLoop break forLoop
@ -1194,10 +1200,10 @@ func (a adminAPIHandlers) ConsoleLogHandler(w http.ResponseWriter, r *http.Reque
if err := enc.Encode(log); err != nil { if err := enc.Encode(log); err != nil {
return return
} }
} if len(logCh) == 0 {
if len(logCh) == 0 { // Flush if nothing is queued
// Flush if nothing is queued w.(http.Flusher).Flush()
w.(http.Flusher).Flush() }
} }
case <-keepAliveTicker.C: case <-keepAliveTicker.C:
if len(logCh) > 0 { if len(logCh) > 0 {
@ -1863,15 +1869,19 @@ func (a adminAPIHandlers) HealthInfoHandler(w http.ResponseWriter, r *http.Reque
if !ok { if !ok {
return return
} }
logger.LogIf(ctx, enc.Encode(oinfo)) if err := enc.Encode(oinfo); err != nil {
w.(http.Flusher).Flush() return
}
if len(healthInfoCh) == 0 {
// Flush if nothing is queued
w.(http.Flusher).Flush()
}
case <-ticker.C: case <-ticker.C:
if _, err := w.Write([]byte(" ")); err != nil { if _, err := w.Write([]byte(" ")); err != nil {
return return
} }
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
case <-deadlinedCtx.Done(): case <-deadlinedCtx.Done():
w.(http.Flusher).Flush()
return return
} }
} }
@ -1940,10 +1950,12 @@ func (a adminAPIHandlers) BandwidthMonitorHandler(w http.ResponseWriter, r *http
return return
} }
if err := enc.Encode(report); err != nil { if err := enc.Encode(report); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return return
} }
w.(http.Flusher).Flush() if len(reportCh) == 0 {
// Flush if nothing is queued
w.(http.Flusher).Flush()
}
case <-keepAliveTicker.C: case <-keepAliveTicker.C:
if _, err := w.Write([]byte(" ")); err != nil { if _, err := w.Write([]byte(" ")); err != nil {
return return

View File

@ -24,6 +24,7 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/klauspost/compress/gzhttp" "github.com/klauspost/compress/gzhttp"
"github.com/minio/console/restapi"
xhttp "github.com/minio/minio/internal/http" xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/pkg/wildcard" "github.com/minio/pkg/wildcard"
@ -42,6 +43,18 @@ func setHTTPServer(h *xhttp.Server) {
globalObjLayerMutex.Unlock() globalObjLayerMutex.Unlock()
} }
func newConsoleServerFn() *restapi.Server {
globalObjLayerMutex.RLock()
defer globalObjLayerMutex.RUnlock()
return globalConsoleSrv
}
func setConsoleSrv(srv *restapi.Server) {
globalObjLayerMutex.Lock()
globalConsoleSrv = srv
globalObjLayerMutex.Unlock()
}
func newObjectLayerFn() ObjectLayer { func newObjectLayerFn() ObjectLayer {
globalObjLayerMutex.RLock() globalObjLayerMutex.RLock()
defer globalObjLayerMutex.RUnlock() defer globalObjLayerMutex.RUnlock()

View File

@ -277,9 +277,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
globalHTTPServerErrorCh <- httpServer.Start(GlobalContext) globalHTTPServerErrorCh <- httpServer.Start(GlobalContext)
}() }()
globalObjLayerMutex.Lock() setHTTPServer(httpServer)
globalHTTPServer = httpServer
globalObjLayerMutex.Unlock()
newObject, err := gw.NewGatewayLayer(madmin.Credentials{ newObject, err := gw.NewGatewayLayer(madmin.Credentials{
AccessKey: globalActiveCred.AccessKey, AccessKey: globalActiveCred.AccessKey,
@ -345,14 +343,26 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
} }
if globalBrowserEnabled { if globalBrowserEnabled {
globalConsoleSrv, err = initConsoleServer() srv, err := initConsoleServer()
if err != nil { if err != nil {
logger.FatalIf(err, "Unable to initialize console service") logger.FatalIf(err, "Unable to initialize console service")
} }
setConsoleSrv(srv)
go func() { go func() {
logger.FatalIf(globalConsoleSrv.Serve(), "Unable to initialize console server") logger.FatalIf(newConsoleServerFn().Serve(), "Unable to initialize console server")
}() }()
} }
if serverDebugLog {
logger.Info("== DEBUG Mode enabled ==")
logger.Info("Currently set environment settings:")
for _, v := range os.Environ() {
logger.Info(v)
}
logger.Info("======")
}
<-globalOSSignalCh <-globalOSSignalCh
} }

View File

@ -3060,14 +3060,17 @@ func sendWhiteSpace(w http.ResponseWriter) <-chan bool {
case <-ticker.C: case <-ticker.C:
// Write header if not written yet. // Write header if not written yet.
if !headerWritten { if !headerWritten {
w.Write([]byte(xml.Header)) _, err := w.Write([]byte(xml.Header))
headerWritten = true headerWritten = err == nil
} }
// Once header is written keep writing empty spaces // Once header is written keep writing empty spaces
// which are ignored by client SDK XML parsers. // which are ignored by client SDK XML parsers.
// This occurs when server takes long time to completeMultiPartUpload() // This occurs when server takes long time to completeMultiPartUpload()
w.Write([]byte(" ")) _, err := w.Write([]byte(" "))
if err != nil {
return
}
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
case doneCh <- headerWritten: case doneCh <- headerWritten:
ticker.Stop() ticker.Stop()

View File

@ -821,7 +821,6 @@ func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Req
return return
} }
signal := serviceSignal(si) signal := serviceSignal(si)
defer w.(http.Flusher).Flush()
switch signal { switch signal {
case serviceRestart: case serviceRestart:
globalServiceSignalCh <- signal globalServiceSignalCh <- signal
@ -902,9 +901,6 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
rulesMap := event.NewRulesMap(eventNames, pattern, event.TargetID{ID: mustGetUUID()}) rulesMap := event.NewRulesMap(eventNames, pattern, event.TargetID{ID: mustGetUUID()})
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
doneCh := make(chan struct{}) doneCh := make(chan struct{})
defer close(doneCh) defer close(doneCh)
@ -975,9 +971,6 @@ func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
doneCh := make(chan struct{}) doneCh := make(chan struct{})
defer close(doneCh) defer close(doneCh)
@ -1047,7 +1040,6 @@ func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Reques
w.Header().Set("Connection", "close") w.Header().Set("Connection", "close")
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
doneCh := make(chan struct{}) doneCh := make(chan struct{})
defer close(doneCh) defer close(doneCh)
@ -1091,8 +1083,6 @@ func (s *peerRESTServer) GetBandwidth(w http.ResponseWriter, r *http.Request) {
} }
bucketsString := r.Form.Get("buckets") bucketsString := r.Form.Get("buckets")
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
doneCh := make(chan struct{}) doneCh := make(chan struct{})
defer close(doneCh) defer close(doneCh)
@ -1112,8 +1102,6 @@ func (s *peerRESTServer) GetPeerMetrics(w http.ResponseWriter, r *http.Request)
if !s.IsValid(w, r) { if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request")) s.writeErrorResponse(w, errors.New("invalid request"))
} }
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
doneCh := make(chan struct{}) doneCh := make(chan struct{})
defer close(doneCh) defer close(doneCh)

View File

@ -609,13 +609,15 @@ func serverMain(ctx *cli.Context) {
} }
if globalBrowserEnabled { if globalBrowserEnabled {
globalConsoleSrv, err = initConsoleServer() srv, err := initConsoleServer()
if err != nil { if err != nil {
logger.FatalIf(err, "Unable to initialize console service") logger.FatalIf(err, "Unable to initialize console service")
} }
setConsoleSrv(srv)
go func() { go func() {
logger.FatalIf(globalConsoleSrv.Serve(), "Unable to initialize console server") logger.FatalIf(newConsoleServerFn().Serve(), "Unable to initialize console server")
}() }()
} }

View File

@ -66,8 +66,8 @@ func handleSignals() {
logger.LogIf(context.Background(), oerr) logger.LogIf(context.Background(), oerr)
} }
if globalConsoleSrv != nil { if srv := newConsoleServerFn(); srv != nil {
logger.LogIf(context.Background(), globalConsoleSrv.Shutdown()) logger.LogIf(context.Background(), srv.Shutdown())
} }
return (err == nil && oerr == nil) return (err == nil && oerr == nil)

View File

@ -760,37 +760,51 @@ func keepHTTPReqResponseAlive(w http.ResponseWriter, r *http.Request) (resp func
doneCh := make(chan error) doneCh := make(chan error)
ctx := r.Context() ctx := r.Context()
go func() { go func() {
defer close(doneCh)
// Wait for body to be read. // Wait for body to be read.
select { select {
case <-ctx.Done(): case <-ctx.Done():
return
case <-bodyDoneCh: case <-bodyDoneCh:
case err := <-doneCh: case err, ok := <-doneCh:
if !ok {
return
}
if err != nil { if err != nil {
w.Write([]byte{1}) _, werr := w.Write([]byte{1})
if werr != nil {
return
}
w.Write([]byte(err.Error())) w.Write([]byte(err.Error()))
} else { } else {
w.Write([]byte{0}) w.Write([]byte{0})
} }
close(doneCh)
return return
} }
defer close(doneCh)
// Initiate ticker after body has been read. // Initiate ticker after body has been read.
ticker := time.NewTicker(time.Second * 10) ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
// Response not ready, write a filler byte. // Response not ready, write a filler byte.
w.Write([]byte{32}) if _, err := w.Write([]byte{32}); err != nil {
return
}
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
case err := <-doneCh: case err, ok := <-doneCh:
if !ok {
return
}
if err != nil { if err != nil {
w.Write([]byte{1}) _, werr := w.Write([]byte{1})
if werr != nil {
return
}
w.Write([]byte(err.Error())) w.Write([]byte(err.Error()))
} else { } else {
w.Write([]byte{0}) w.Write([]byte{0})
} }
ticker.Stop()
return return
} }
} }
@ -825,20 +839,25 @@ func keepHTTPResponseAlive(w http.ResponseWriter) func(error) {
go func() { go func() {
defer close(doneCh) defer close(doneCh)
ticker := time.NewTicker(time.Second * 10) ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
// Response not ready, write a filler byte. // Response not ready, write a filler byte.
w.Write([]byte{32}) if _, err := w.Write([]byte{32}); err != nil {
return
}
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
case err := <-doneCh: case err := <-doneCh:
if err != nil { if err != nil {
w.Write([]byte{1}) _, werr := w.Write([]byte{1})
if werr != nil {
return
}
w.Write([]byte(err.Error())) w.Write([]byte(err.Error()))
} else { } else {
w.Write([]byte{0}) w.Write([]byte{0})
} }
ticker.Stop()
return return
} }
} }
@ -932,18 +951,24 @@ func streamHTTPResponse(w http.ResponseWriter) *httpStreamResponse {
blockCh := make(chan []byte) blockCh := make(chan []byte)
h := httpStreamResponse{done: doneCh, block: blockCh} h := httpStreamResponse{done: doneCh, block: blockCh}
go func() { go func() {
defer close(doneCh)
ticker := time.NewTicker(time.Second * 10) ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
// Response not ready, write a filler byte. // Response not ready, write a filler byte.
w.Write([]byte{32}) _, err := w.Write([]byte{32})
if err != nil {
return
}
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
case err := <-doneCh: case err := <-doneCh:
ticker.Stop()
defer close(doneCh)
if err != nil { if err != nil {
w.Write([]byte{1}) _, werr := w.Write([]byte{1})
if werr != nil {
return
}
w.Write([]byte(err.Error())) w.Write([]byte(err.Error()))
} else { } else {
w.Write([]byte{0}) w.Write([]byte{0})
@ -953,8 +978,14 @@ func streamHTTPResponse(w http.ResponseWriter) *httpStreamResponse {
var tmp [5]byte var tmp [5]byte
tmp[0] = 2 tmp[0] = 2
binary.LittleEndian.PutUint32(tmp[1:], uint32(len(block))) binary.LittleEndian.PutUint32(tmp[1:], uint32(len(block)))
w.Write(tmp[:]) _, err := w.Write(tmp[:])
w.Write(block) if err != nil {
return
}
_, err = w.Write(block)
if err != nil {
return
}
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} }
} }

View File

@ -92,7 +92,6 @@ func (srv *Server) Start(ctx context.Context) (err error) {
w.Header().Set("Connection", "close") w.Header().Set("Connection", "close")
w.WriteHeader(http.StatusServiceUnavailable) w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(http.ErrServerClosed.Error())) w.Write([]byte(http.ErrServerClosed.Error()))
w.(http.Flusher).Flush()
return return
} }