diff --git a/cmd/admin-router.go b/cmd/admin-router.go index b522d449e..52b463b4c 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -237,6 +237,8 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { adminRouter.Methods(http.MethodGet).Path(adminVersion + "/describe-job").HandlerFunc( gz(httpTraceHdrs(adminAPI.DescribeBatchJob))) + adminRouter.Methods(http.MethodDelete).Path(adminVersion + "/cancel-job").HandlerFunc( + gz(httpTraceHdrs(adminAPI.CancelBatchJob))) // Bucket migration operations // ExportBucketMetaHandler diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index f65708b42..940c63f49 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -234,6 +234,7 @@ type BatchJobRequest struct { Started time.Time `yaml:"-" json:"started"` Location string `yaml:"-" json:"location"` Replicate *BatchJobReplicateV1 `yaml:"replicate" json:"replicate"` + ctx context.Context `msg:"-"` } // Notify notifies notification endpoint if configured regarding job failure or success. @@ -859,9 +860,7 @@ func (j BatchJobRequest) Validate(ctx context.Context, o ObjectLayer) error { } func (j BatchJobRequest) delete(ctx context.Context, api ObjectLayer) { - if j.Replicate != nil { - deleteConfig(ctx, api, pathJoin(j.Location, batchReplName)) - } + deleteConfig(ctx, api, pathJoin(j.Location, batchReplName)) globalBatchJobsMetrics.delete(j.ID) deleteConfig(ctx, api, j.Location) } @@ -1061,6 +1060,34 @@ func (a adminAPIHandlers) StartBatchJob(w http.ResponseWriter, r *http.Request) writeSuccessResponseJSON(w, buf) } +// CancelBatchJob cancels a job in progress +func (a adminAPIHandlers) CancelBatchJob(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "CancelBatchJob") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.CancelBatchJobAction) + if objectAPI == nil { + return + } + jobID := r.Form.Get("id") + if jobID == "" { + writeErrorResponseJSON(ctx, w, toAPIError(ctx, errInvalidArgument), r.URL) + return + } + if err := globalBatchJobPool.canceler(jobID, true); err != nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrInvalidRequest, err), r.URL) + return + } + j := BatchJobRequest{ + ID: jobID, + Location: pathJoin(batchJobPrefix, jobID), + } + j.delete(ctx, objectAPI) + + writeSuccessNoContent(w) +} + //msgp:ignore BatchJobPool // BatchJobPool batch job pool @@ -1070,6 +1097,8 @@ type BatchJobPool struct { once sync.Once mu sync.Mutex jobCh chan *BatchJobRequest + jmu sync.Mutex // protects jobCancelers + jobCancelers map[string]context.CancelFunc workerKillCh chan struct{} workerSize int } @@ -1083,6 +1112,7 @@ func newBatchJobPool(ctx context.Context, o ObjectLayer, workers int) *BatchJobP objLayer: o, jobCh: make(chan *BatchJobRequest, 10000), workerKillCh: make(chan struct{}, workers), + jobCancelers: make(map[string]context.CancelFunc), } jpool.ResizeWorkers(workers) jpool.resume() @@ -1124,15 +1154,17 @@ func (j *BatchJobPool) AddWorker() { return } if job.Replicate != nil { - if err := job.Replicate.Start(j.ctx, j.objLayer, *job); err != nil { + if err := job.Replicate.Start(job.ctx, j.objLayer, *job); err != nil { if !isErrBucketNotFound(err) { logger.LogIf(j.ctx, err) + j.canceler(job.ID, false) continue } // Bucket not found proceed to delete such a job. } } job.delete(j.ctx, j.objLayer) + j.canceler(job.ID, false) case <-j.workerKillCh: return } @@ -1162,6 +1194,12 @@ func (j *BatchJobPool) queueJob(req *BatchJobRequest) error { if j == nil { return errInvalidArgument } + jctx, jcancel := context.WithCancel(j.ctx) + j.jmu.Lock() + j.jobCancelers[req.ID] = jcancel + j.jmu.Unlock() + req.ctx = jctx + select { case <-j.ctx.Done(): j.once.Do(func() { @@ -1174,6 +1212,23 @@ func (j *BatchJobPool) queueJob(req *BatchJobRequest) error { return nil } +// delete canceler from the map, cancel job if requested +func (j *BatchJobPool) canceler(jobID string, cancel bool) error { + if j == nil { + return errInvalidArgument + } + j.jmu.Lock() + defer j.jmu.Unlock() + if canceler, ok := j.jobCancelers[jobID]; ok { + if cancel { + canceler() + + } + } + delete(j.jobCancelers, jobID) + return nil +} + //msgp:ignore batchJobMetrics type batchJobMetrics struct { sync.RWMutex diff --git a/go.mod b/go.mod index e324d655b..adf1c0273 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/minio/madmin-go/v2 v2.0.16 github.com/minio/minio-go/v7 v7.0.49 github.com/minio/mux v1.9.0 - github.com/minio/pkg v1.6.4 + github.com/minio/pkg v1.6.5-0.20230318001333-39b6e90c1c88 github.com/minio/selfupdate v0.6.0 github.com/minio/sha256-simd v1.0.0 github.com/minio/simdjson-go v0.4.5 diff --git a/go.sum b/go.sum index e21fbf6cc..d038504df 100644 --- a/go.sum +++ b/go.sum @@ -787,6 +787,8 @@ github.com/minio/mux v1.9.0/go.mod h1:1pAare17ZRL5GpmNL+9YmqHoWnLmMZF9C/ioUCfy0B github.com/minio/pkg v1.5.4/go.mod h1:2MOaRFdmFKULD+uOLc3qHLGTQTuxCNPKNPfLBTxC8CA= github.com/minio/pkg v1.6.4 h1:k6XlhyJ8zOn90PI4csuMeePx7BQrcX1jorKriR5J2fo= github.com/minio/pkg v1.6.4/go.mod h1:0iX1IuJGSCnMvIvrEJauk1GgQSX9JdU6Kh0P3EQRGkI= +github.com/minio/pkg v1.6.5-0.20230318001333-39b6e90c1c88 h1:xASCs0/flZ65B+wyOX1XnCe7qS+fWElVG8N3VGq1BLA= +github.com/minio/pkg v1.6.5-0.20230318001333-39b6e90c1c88/go.mod h1:0iX1IuJGSCnMvIvrEJauk1GgQSX9JdU6Kh0P3EQRGkI= github.com/minio/selfupdate v0.6.0 h1:i76PgT0K5xO9+hjzKcacQtO7+MjJ4JKA8Ak8XQ9DDwU= github.com/minio/selfupdate v0.6.0/go.mod h1:bO02GTIPCMQFTEvE5h4DjYB58bCoZ35XLeBf0buTDdM= github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=