From b760137e1d2d2c179e9125fe245e09e0d824c0ab Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 11 Aug 2023 13:12:35 -0700 Subject: [PATCH] fix: add proxyByNode for batch jobs as part of their jobId (#17844) --- cmd/admin-handlers.go | 2 +- cmd/admin-heal-ops.go | 4 ++-- cmd/batch-handlers.go | 24 +++++++++++++++++++----- cmd/bucket-listobjects-handlers.go | 2 +- 4 files changed, 23 insertions(+), 9 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index d8b3153cc..eee5ab6b1 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1025,7 +1025,7 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) { if exists && !nh.hasEnded() && len(nh.currentStatus.Items) > 0 { clientToken := nh.clientToken if globalIsDistErasure { - clientToken = fmt.Sprintf("%s@%d", nh.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints)) + clientToken = fmt.Sprintf("%s:%d", nh.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints)) } b, err := json.Marshal(madmin.HealStartSuccess{ ClientToken: clientToken, diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 4ae3ad61e..b61e2c780 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -253,7 +253,7 @@ func (ahs *allHealState) stopHealSequence(path string) ([]byte, APIError) { } else { clientToken := he.clientToken if globalIsDistErasure { - clientToken = fmt.Sprintf("%s@%d", he.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints)) + clientToken = fmt.Sprintf("%s:%d", he.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints)) } hsp = madmin.HealStopSuccess{ @@ -327,7 +327,7 @@ func (ahs *allHealState) LaunchNewHealSequence(h *healSequence, objAPI ObjectLay clientToken := h.clientToken if globalIsDistErasure { - clientToken = fmt.Sprintf("%s@%d", h.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints)) + clientToken = fmt.Sprintf("%s:%d", h.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints)) } b, err := json.Marshal(madmin.HealStartSuccess{ diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index aec50dd09..c9e012ff1 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -46,6 +46,7 @@ import ( "github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" + "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/console" "github.com/minio/pkg/env" @@ -1526,14 +1527,14 @@ func (a adminAPIHandlers) DescribeBatchJob(w http.ResponseWriter, r *http.Reques return } - id := r.Form.Get("jobId") - if id == "" { + jobID := r.Form.Get("jobId") + if jobID == "" { writeErrorResponseJSON(ctx, w, toAPIError(ctx, errInvalidArgument), r.URL) return } req := &BatchJobRequest{} - if err := req.load(ctx, objectAPI, pathJoin(batchJobPrefix, id)); err != nil { + if err := req.load(ctx, objectAPI, pathJoin(batchJobPrefix, jobID)); err != nil { if !errors.Is(err, errNoSuchJob) { logger.LogIf(ctx, err) } @@ -1561,7 +1562,7 @@ func (a adminAPIHandlers) StartBatchJob(w http.ResponseWriter, r *http.Request) return } - buf, err := io.ReadAll(r.Body) + buf, err := io.ReadAll(ioutil.HardLimitReader(r.Body, humanize.MiByte*4)) if err != nil { writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) return @@ -1578,7 +1579,7 @@ func (a adminAPIHandlers) StartBatchJob(w http.ResponseWriter, r *http.Request) return } - job.ID = shortuuid.New() + job.ID = fmt.Sprintf("%s:%d", shortuuid.New(), GetProxyEndpointLocalIndex(globalProxyEndpoints)) job.User = user job.Started = time.Now() @@ -1614,19 +1615,27 @@ func (a adminAPIHandlers) CancelBatchJob(w http.ResponseWriter, r *http.Request) if objectAPI == nil { return } + jobID := r.Form.Get("id") if jobID == "" { writeErrorResponseJSON(ctx, w, toAPIError(ctx, errInvalidArgument), r.URL) return } + + if _, success := proxyRequestByToken(ctx, w, r, jobID); success { + return + } + if err := globalBatchJobPool.canceler(jobID, true); err != nil { writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrInvalidRequest, err), r.URL) return } + j := BatchJobRequest{ ID: jobID, Location: pathJoin(batchJobPrefix, jobID), } + j.delete(ctx, objectAPI) writeSuccessNoContent(w) @@ -1681,6 +1690,11 @@ func (j *BatchJobPool) resume() { logger.LogIf(ctx, err) continue } + _, nodeIdx := parseRequestToken(req.ID) + if nodeIdx > -1 && GetProxyEndpointLocalIndex(globalProxyEndpoints) != nodeIdx { + // This job doesn't belong on this node. + continue + } if err := j.queueJob(req); err != nil { logger.LogIf(ctx, err) continue diff --git a/cmd/bucket-listobjects-handlers.go b/cmd/bucket-listobjects-handlers.go index b1c91e098..a898dee58 100644 --- a/cmd/bucket-listobjects-handlers.go +++ b/cmd/bucket-listobjects-handlers.go @@ -231,7 +231,7 @@ func parseRequestToken(token string) (subToken string, nodeIndex int) { if token == "" { return token, -1 } - i := strings.Index(token, "@") + i := strings.Index(token, ":") if i < 0 { return token, -1 }