compress binary while sending it to all the nodes (#18837)

Also limit the amount of concurrency when sending
binary updates to peers, avoid high network over
TX that can cause disconnection events for the
node sending updates.
This commit is contained in:
Harshavardhana 2024-01-22 12:16:36 -08:00 committed by GitHub
parent feeeef71f1
commit 961f7dea82
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 69 additions and 13 deletions

View file

@ -149,7 +149,7 @@ func (a adminAPIHandlers) ServerUpdateHandler(w http.ResponseWriter, r *http.Req
}
// Download Binary Once
bin, err := downloadBinary(u, mode)
binC, bin, err := downloadBinary(u, mode)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("server update failed with %w", err))
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
@ -157,7 +157,7 @@ func (a adminAPIHandlers) ServerUpdateHandler(w http.ResponseWriter, r *http.Req
}
// Push binary to other servers
for _, nerr := range globalNotificationSys.VerifyBinary(ctx, u, sha256Sum, releaseInfo, bin) {
for _, nerr := range globalNotificationSys.VerifyBinary(ctx, u, sha256Sum, releaseInfo, binC) {
if nerr.Err != nil {
err := AdminError{
Code: AdminUpdateApplyFailure,

View file

@ -26,6 +26,7 @@ import (
"math/rand"
"net/http"
"net/url"
"runtime"
"sync"
"time"
@ -34,6 +35,7 @@ import (
"github.com/minio/madmin-go/v3"
xnet "github.com/minio/pkg/v2/net"
"github.com/minio/pkg/v2/sync/errgroup"
"github.com/minio/pkg/v2/workers"
"github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/logger"
@ -59,7 +61,7 @@ type NotificationPeerErr struct {
//
// A zero NotificationGroup is valid and does not cancel on error.
type NotificationGroup struct {
wg sync.WaitGroup
workers *workers.Workers
errs []NotificationPeerErr
retryCount int
}
@ -67,7 +69,11 @@ type NotificationGroup struct {
// WithNPeers returns a new NotificationGroup with length of errs slice upto nerrs,
// upon Wait() errors are returned collected from all tasks.
func WithNPeers(nerrs int) *NotificationGroup {
return &NotificationGroup{errs: make([]NotificationPeerErr, nerrs), retryCount: 3}
if nerrs <= 0 {
nerrs = 1
}
wk, _ := workers.New(nerrs)
return &NotificationGroup{errs: make([]NotificationPeerErr, nerrs), workers: wk, retryCount: 3}
}
// WithRetries sets the retry count for all function calls from the Go method.
@ -81,7 +87,7 @@ func (g *NotificationGroup) WithRetries(retryCount int) *NotificationGroup {
// Wait blocks until all function calls from the Go method have returned, then
// returns the slice of errors from all function calls.
func (g *NotificationGroup) Wait() []NotificationPeerErr {
g.wg.Wait()
g.workers.Wait()
return g.errs
}
@ -92,10 +98,11 @@ func (g *NotificationGroup) Wait() []NotificationPeerErr {
func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, addr xnet.Host) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
g.wg.Add(1)
g.workers.Take()
go func() {
defer g.wg.Done()
defer g.workers.Give()
g.errs[index] = NotificationPeerErr{
Host: addr,
}
@ -340,7 +347,26 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io
// VerifyBinary - asks remote peers to verify the checksum
func (sys *NotificationSys) VerifyBinary(ctx context.Context, u *url.URL, sha256Sum []byte, releaseInfo string, bin []byte) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))
// FIXME: network calls made in this manner such as one goroutine per node,
// can easily eat into the internode bandwidth. This function would be mostly
// TX saturating, however there are situations where a RX might also saturate.
// To avoid these problems we must split the work at scale. With 1000 node
// setup becoming a reality we must try to shard the work properly such as
// pick 10 nodes that precisely can send those 100 requests the first node
// in the 10 node shard would coordinate between other 9 shards to get the
// rest of the `99*9` requests.
//
// This essentially splits the workload properly and also allows for network
// utilization to be optimal, instead of blindly throttling the way we are
// doing below. However the changes that are needed here are a bit involved,
// further discussion advised. Remove this comment and remove the worker model
// for this function in future.
maxWorkers := runtime.GOMAXPROCS(0) / 2
if maxWorkers > len(sys.peerClients) {
maxWorkers = len(sys.peerClients)
}
ng := WithNPeers(maxWorkers)
for idx, client := range sys.peerClients {
if client == nil {
continue

View file

@ -33,6 +33,7 @@ import (
"time"
"github.com/dustin/go-humanize"
"github.com/klauspost/compress/zstd"
"github.com/minio/madmin-go/v3"
b "github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/event"
@ -850,7 +851,14 @@ func (s *peerRESTServer) VerifyBinaryHandler(w http.ResponseWriter, r *http.Requ
}
releaseInfo := r.Form.Get(peerRESTReleaseInfo)
if err = verifyBinary(u, sha256Sum, releaseInfo, getMinioMode(), r.Body); err != nil {
zr, err := zstd.NewReader(r.Body)
if err != nil {
s.writeErrorResponse(w, err)
return
}
defer zr.Close()
if err = verifyBinary(u, sha256Sum, releaseInfo, getMinioMode(), zr); err != nil {
s.writeErrorResponse(w, err)
return
}

View file

@ -35,12 +35,14 @@ import (
"sync/atomic"
"time"
"github.com/klauspost/compress/zstd"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/env"
xnet "github.com/minio/pkg/v2/net"
"github.com/minio/selfupdate"
gopsutilcpu "github.com/shirou/gopsutil/v3/cpu"
"github.com/valyala/bytebufferpool"
)
const (
@ -510,20 +512,40 @@ func getUpdateReaderFromURL(u *url.URL, transport http.RoundTripper, mode string
var updateInProgress atomic.Uint32
// Function to get the reader from an architecture
func downloadBinary(u *url.URL, mode string) (bin []byte, err error) {
func downloadBinary(u *url.URL, mode string) (binCompressed []byte, bin []byte, err error) {
transport := getUpdateTransport(30 * time.Second)
var reader io.ReadCloser
if u.Scheme == "https" || u.Scheme == "http" {
reader, err = getUpdateReaderFromURL(u, transport, mode)
if err != nil {
return nil, err
return nil, nil, err
}
} else {
return nil, fmt.Errorf("unsupported protocol scheme: %s", u.Scheme)
return nil, nil, fmt.Errorf("unsupported protocol scheme: %s", u.Scheme)
}
defer xhttp.DrainBody(reader)
return io.ReadAll(reader)
b := bytebufferpool.Get()
bc := bytebufferpool.Get()
defer func() {
b.Reset()
bc.Reset()
bytebufferpool.Put(b)
bytebufferpool.Put(bc)
}()
w, err := zstd.NewWriter(bc)
if err != nil {
return nil, nil, err
}
if _, err = io.Copy(w, io.TeeReader(reader, b)); err != nil {
return nil, nil, err
}
w.Close()
return bc.Bytes(), b.Bytes(), nil
}
const (