diff --git a/cmd/admin-handlers-site-replication.go b/cmd/admin-handlers-site-replication.go index a1039725e..a4a674545 100644 --- a/cmd/admin-handlers-site-replication.go +++ b/cmd/admin-handlers-site-replication.go @@ -31,6 +31,7 @@ import ( "github.com/dustin/go-humanize" "github.com/minio/madmin-go/v3" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/mux" "github.com/minio/pkg/v2/policy" @@ -537,7 +538,7 @@ func (a adminAPIHandlers) SiteReplicationDevNull(w http.ResponseWriter, r *http. connectTime := time.Now() for { - n, err := io.CopyN(io.Discard, r.Body, 128*humanize.KiByte) + n, err := io.CopyN(xioutil.Discard, r.Body, 128*humanize.KiByte) atomic.AddUint64(&globalSiteNetPerfRX.RX, uint64(n)) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { // If there is a disconnection before globalNetPerfMinDuration (we give a margin of error of 1 sec) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 60d02dfbb..452fa275e 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -52,6 +52,7 @@ import ( "github.com/minio/minio/internal/dsync" "github.com/minio/minio/internal/handlers" xhttp "github.com/minio/minio/internal/http" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/kms" "github.com/minio/minio/internal/logger" "github.com/minio/mux" @@ -756,11 +757,8 @@ func (a adminAPIHandlers) ProfileHandler(w http.ResponseWriter, r *http.Request) return } } - // read request body - io.CopyN(io.Discard, r.Body, 1) globalProfilerMu.Lock() - if globalProfiler == nil { globalProfiler = make(map[string]minioProfiler, 10) } @@ -1220,7 +1218,7 @@ func (a adminAPIHandlers) ClientDevNull(w http.ResponseWriter, r *http.Request) totalRx := int64(0) connectTime := time.Now() for { - n, err := io.CopyN(io.Discard, r.Body, 128*humanize.KiByte) + n, err := io.CopyN(xioutil.Discard, r.Body, 128*humanize.KiByte) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { // would mean the network is not stable. Logging here will help in debugging network issues. if time.Since(connectTime) < (globalNetPerfMinDuration - time.Second) { diff --git a/cmd/erasure-utils.go b/cmd/erasure-utils.go index 0a7e073e8..c73e0901e 100644 --- a/cmd/erasure-utils.go +++ b/cmd/erasure-utils.go @@ -18,7 +18,6 @@ package cmd import ( - "bytes" "context" "encoding/base64" "fmt" @@ -26,7 +25,6 @@ import ( "strings" "github.com/klauspost/reedsolomon" - xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" ) @@ -85,7 +83,7 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data // We have written all the blocks, write the last remaining block. if write < int64(len(block)) { - n, err := xioutil.Copy(dst, bytes.NewReader(block[:write])) + n, err := dst.Write(block[:write]) if err != nil { // The writer will be closed incase of range queries, which will emit ErrClosedPipe. // The reader pipe might be closed at ListObjects io.EOF ignore it. @@ -94,12 +92,12 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data } return 0, err } - totalWritten += n + totalWritten += int64(n) break } // Copy the block. - n, err := xioutil.Copy(dst, bytes.NewReader(block)) + n, err := dst.Write(block) if err != nil { // The writer will be closed incase of range queries, which will emit ErrClosedPipe. // The reader pipe might be closed at ListObjects io.EOF ignore it. @@ -110,10 +108,10 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data } // Decrement output size. - write -= n + write -= int64(n) // Increment written. - totalWritten += n + totalWritten += int64(n) } // Success. diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 2b047374b..28e60b284 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -33,6 +33,7 @@ import ( "github.com/minio/madmin-go/v3" b "github.com/minio/minio/internal/bucket/bandwidth" "github.com/minio/minio/internal/event" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/pubsub" "github.com/minio/mux" @@ -1423,7 +1424,7 @@ func (s *peerRESTServer) DevNull(w http.ResponseWriter, r *http.Request) { connectTime := time.Now() ctx := newContext(r, w, "DevNull") for { - n, err := io.CopyN(io.Discard, r.Body, 128*humanize.KiByte) + n, err := io.CopyN(xioutil.Discard, r.Body, 128*humanize.KiByte) atomic.AddUint64(&globalNetPerfRX.RX, uint64(n)) if err != nil && err != io.EOF { // If there is a disconnection before globalNetPerfMinDuration (we give a margin of error of 1 sec) diff --git a/cmd/perf-tests.go b/cmd/perf-tests.go index 0b229e6a1..8452de523 100644 --- a/cmd/perf-tests.go +++ b/cmd/perf-tests.go @@ -34,6 +34,7 @@ import ( "github.com/minio/madmin-go/v3" "github.com/minio/minio-go/v7" xhttp "github.com/minio/minio/internal/http" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/pkg/v2/randreader" ) @@ -148,6 +149,8 @@ func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, er var downloadTimes madmin.TimeDurations var downloadTTFB madmin.TimeDurations wg.Add(opts.concurrency) + + c := minio.Core{Client: globalMinioClient} for i := 0; i < opts.concurrency; i++ { go func(i int) { defer wg.Done() @@ -161,7 +164,8 @@ func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, er } tmpObjName := pathJoin(objNamePrefix, fmt.Sprintf("%d/%d", i, j)) t := time.Now() - r, err := globalMinioClient.GetObject(downloadsCtx, opts.bucketName, tmpObjName, gopts) + + r, _, _, err := c.GetObject(downloadsCtx, opts.bucketName, tmpObjName, gopts) if err != nil { errResp, ok := err.(minio.ErrorResponse) if ok && errResp.StatusCode == http.StatusNotFound { @@ -178,7 +182,7 @@ func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, er fbr := firstByteRecorder{ r: r, } - n, err := io.Copy(io.Discard, &fbr) + n, err := xioutil.Copy(xioutil.Discard, &fbr) r.Close() if err == nil { response := time.Since(t) diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index ac1645bda..a59e936fe 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -856,10 +856,7 @@ func (client *storageRESTClient) CleanAbandonedData(ctx context.Context, volume return err } defer xhttp.DrainBody(respBody) - respReader, err := waitForHTTPResponse(respBody) - if err == nil { - io.Copy(io.Discard, respReader) - } + _, err = waitForHTTPResponse(respBody) return err } diff --git a/go.mod b/go.mod index e460ecbba..4473478de 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( github.com/minio/console v0.41.0 github.com/minio/csvparser v1.0.0 github.com/minio/dnscache v0.1.1 - github.com/minio/dperf v0.5.1 + github.com/minio/dperf v0.5.2 github.com/minio/highwayhash v1.0.2 github.com/minio/kes-go v0.2.0 github.com/minio/madmin-go/v3 v3.0.29 diff --git a/go.sum b/go.sum index db189306a..bbe5364d0 100644 --- a/go.sum +++ b/go.sum @@ -470,8 +470,8 @@ github.com/minio/csvparser v1.0.0 h1:xJEHcYK8ZAjeW4hNV9Zu30u+/2o4UyPnYgyjWp8b7ZU github.com/minio/csvparser v1.0.0/go.mod h1:lKXskSLzPgC5WQyzP7maKH7Sl1cqvANXo9YCto8zbtM= github.com/minio/dnscache v0.1.1 h1:AMYLqomzskpORiUA1ciN9k7bZT1oB3YZN4cEIi88W5o= github.com/minio/dnscache v0.1.1/go.mod h1:WCumm6offO4rQ/82oTCSoHnlhTmc81+vXgKpBtSYRbg= -github.com/minio/dperf v0.5.1 h1:oXX1og7sajPlcnr0c/cJ9qBeoPFNlbCqt0WAd5t3TPY= -github.com/minio/dperf v0.5.1/go.mod h1:Y8HlbQ90LbtybYXEyMfsoVyOWndB1kGUo707uCwEuxo= +github.com/minio/dperf v0.5.2 h1:ZTCyWE9jnngK55w6eU9u4VvQktJGYsUP6kaaUYORDyE= +github.com/minio/dperf v0.5.2/go.mod h1:Y8HlbQ90LbtybYXEyMfsoVyOWndB1kGUo707uCwEuxo= github.com/minio/filepath v1.0.0 h1:fvkJu1+6X+ECRA6G3+JJETj4QeAYO9sV43I79H8ubDY= github.com/minio/filepath v1.0.0/go.mod h1:/nRZA2ldl5z6jT9/KQuvZcQlxZIMQoFFQPvEXx9T/Bw= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= diff --git a/internal/config/lambda/target/webhook.go b/internal/config/lambda/target/webhook.go index 9708abe72..e3a5e5576 100644 --- a/internal/config/lambda/target/webhook.go +++ b/internal/config/lambda/target/webhook.go @@ -23,7 +23,6 @@ import ( "crypto/tls" "encoding/json" "errors" - "io" "net/http" "strings" "sync/atomic" @@ -31,6 +30,7 @@ import ( "time" "github.com/minio/minio/internal/config/lambda/event" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/certs" xnet "github.com/minio/pkg/v2/net" @@ -133,8 +133,7 @@ func (target *WebhookTarget) isActive() (bool, error) { } return false, err } - io.Copy(io.Discard, resp.Body) - resp.Body.Close() + xioutil.DiscardReader(resp.Body) // No network failure i.e response from the target means its up return true, nil } diff --git a/internal/event/target/elasticsearch.go b/internal/event/target/elasticsearch.go index 7ef39e7a8..ff721e639 100644 --- a/internal/event/target/elasticsearch.go +++ b/internal/event/target/elasticsearch.go @@ -34,6 +34,7 @@ import ( elasticsearch7 "github.com/elastic/go-elasticsearch/v7" "github.com/minio/highwayhash" "github.com/minio/minio/internal/event" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/once" "github.com/minio/minio/internal/store" @@ -474,7 +475,7 @@ func (c *esClientV7) createIndex(args ElasticsearchArgs) error { if err != nil { return err } - defer DrainBody(resp.Body) + defer xioutil.DiscardReader(resp.Body) if resp.IsError() { return fmt.Errorf("Create index err: %v", res) } @@ -490,7 +491,7 @@ func (c *esClientV7) ping(ctx context.Context, _ ElasticsearchArgs) (bool, error if err != nil { return false, store.ErrNotConnected } - DrainBody(resp.Body) + xioutil.DiscardReader(resp.Body) return !resp.IsError(), nil } @@ -503,7 +504,7 @@ func (c *esClientV7) entryExists(ctx context.Context, index string, key string) if err != nil { return false, err } - DrainBody(res.Body) + xioutil.DiscardReader(res.Body) return !res.IsError(), nil } @@ -518,7 +519,7 @@ func (c *esClientV7) removeEntry(ctx context.Context, index string, key string) if err != nil { return err } - defer DrainBody(res.Body) + defer xioutil.DiscardReader(res.Body) if res.IsError() { return fmt.Errorf("Delete err: %s", res.String()) } @@ -546,7 +547,7 @@ func (c *esClientV7) updateEntry(ctx context.Context, index string, key string, if err != nil { return err } - defer DrainBody(res.Body) + defer xioutil.DiscardReader(res.Body) if res.IsError() { return fmt.Errorf("Update err: %s", res.String()) } @@ -572,7 +573,7 @@ func (c *esClientV7) addEntry(ctx context.Context, index string, eventData event if err != nil { return err } - defer DrainBody(res.Body) + defer xioutil.DiscardReader(res.Body) if res.IsError() { return fmt.Errorf("Add err: %s", res.String()) } diff --git a/internal/event/target/webhook.go b/internal/event/target/webhook.go index 46cd56ff7..d0670e98e 100644 --- a/internal/event/target/webhook.go +++ b/internal/event/target/webhook.go @@ -24,7 +24,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "net" "net/http" "net/url" @@ -34,8 +33,8 @@ import ( "syscall" "time" - "github.com/dustin/go-humanize" "github.com/minio/minio/internal/event" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/once" "github.com/minio/minio/internal/store" @@ -197,7 +196,7 @@ func (target *WebhookTarget) send(eventData event.Event) error { if err != nil { return err } - defer DrainBody(resp.Body) + defer xioutil.DiscardReader(resp.Body) if resp.StatusCode < 200 || resp.StatusCode > 299 { return fmt.Errorf("sending event failed with %v", resp.Status) @@ -312,23 +311,3 @@ func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOn return target, nil } - -// DrainBody close non nil response with any response Body. -// convenient wrapper to drain any remaining data on response body. -// -// Subsequently this allows golang http RoundTripper -// to re-use the same connection for future requests. -func DrainBody(respBody io.ReadCloser) { - // Callers should close resp.Body when done reading from it. - // If resp.Body is not closed, the Client's underlying RoundTripper - // (typically Transport) may not be able to re-use a persistent TCP - // connection to the server for a subsequent "keep-alive" request. - if respBody != nil { - // Drain any remaining Body and then close the connection. - // Without this closing connection would disallow re-using - // the same connection for future uses. - // - http://stackoverflow.com/a/17961593/4465767 - defer respBody.Close() - io.CopyN(io.Discard, respBody, 1*humanize.MiByte) - } -} diff --git a/internal/http/close.go b/internal/http/close.go index 04df4b465..1b3b8c8e1 100644 --- a/internal/http/close.go +++ b/internal/http/close.go @@ -19,6 +19,8 @@ package http import ( "io" + + xioutil "github.com/minio/minio/internal/ioutil" ) // DrainBody close non nil response with any response Body. @@ -37,6 +39,6 @@ func DrainBody(respBody io.ReadCloser) { // the same connection for future uses. // - http://stackoverflow.com/a/17961593/4465767 defer respBody.Close() - io.Copy(io.Discard, respBody) + xioutil.DiscardReader(respBody) } } diff --git a/internal/ioutil/discard.go b/internal/ioutil/discard.go new file mode 100644 index 000000000..cfd5b8c6f --- /dev/null +++ b/internal/ioutil/discard.go @@ -0,0 +1,40 @@ +// Copyright (c) 2015-2023 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package ioutil + +import ( + "io" +) + +// Discard is just like io.Discard without the io.ReaderFrom compatible +// implementation which is buggy on NUMA systems, we have to use a simpler +// io.Writer implementation alone avoids also unnecessary buffer copies, +// and as such incurred latencies. +var Discard io.Writer = discard{} + +// discard is /dev/null for Golang. +type discard struct{} + +func (discard) Write(p []byte) (int, error) { + return len(p), nil +} + +// DiscardReader discarded reader +func DiscardReader(r io.Reader) { + Copy(Discard, r) +} diff --git a/internal/s3select/select.go b/internal/s3select/select.go index e16da7a29..404022d61 100644 --- a/internal/s3select/select.go +++ b/internal/s3select/select.go @@ -33,6 +33,7 @@ import ( "github.com/klauspost/compress/zstd" gzip "github.com/klauspost/pgzip" "github.com/minio/minio/internal/config" + xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/s3select/csv" "github.com/minio/minio/internal/s3select/json" "github.com/minio/minio/internal/s3select/parquet" @@ -91,7 +92,7 @@ var bufioWriterPool = sync.Pool{ New: func() interface{} { // io.Discard is just used to create the writer. Actual destination // writer is set later by Reset() before using it. - return bufio.NewWriter(io.Discard) + return bufio.NewWriter(xioutil.Discard) }, } @@ -467,7 +468,7 @@ func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error { // Use bufio Writer to prevent csv.Writer from allocating a new buffer. bufioWriter := bufioWriterPool.Get().(*bufio.Writer) defer func() { - bufioWriter.Reset(io.Discard) + bufioWriter.Reset(xioutil.Discard) bufioWriterPool.Put(bufioWriter) }()