From 68dd74c5abaa1ae7059c39c2fc365a3fec30aa0c Mon Sep 17 00:00:00 2001 From: Anis Eleuch Date: Thu, 7 Mar 2024 19:58:22 +0100 Subject: [PATCH] batch: Separate batch job request and batch job stats (#19205) Currently, the progress of the batch job is saved in inside the job request object, which is normally not supported by MinIO. Though there is no apparent bug, it is better to fix this now. Batch progress is saved in .minio.sys/batch-jobs/reports/ Co-authored-by: Anis Eleuch --- cmd/batch-handlers.go | 63 ++++++++++++++++++++------------------- cmd/batch-handlers_gen.go | 35 ++++------------------ 2 files changed, 37 insertions(+), 61 deletions(-) diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index cbc3f3a18..4a932b8e3 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -63,7 +63,6 @@ type BatchJobRequest struct { ID string `yaml:"-" json:"name"` User string `yaml:"-" json:"user"` Started time.Time `yaml:"-" json:"started"` - Location string `yaml:"-" json:"location"` Replicate *BatchJobReplicateV1 `yaml:"replicate" json:"replicate"` KeyRotate *BatchJobKeyRotateV1 `yaml:"keyrotate" json:"keyrotate"` Expire *BatchJobExpire `yaml:"expire" json:"expire"` @@ -710,38 +709,52 @@ type batchJobInfo struct { } const ( - batchReplName = "batch-replicate.bin" - batchReplFormat = 1 - batchReplVersionV1 = 1 - batchReplVersion = batchReplVersionV1 - batchJobName = "job.bin" - batchJobPrefix = "batch-jobs" + batchReplName = "batch-replicate.bin" + batchReplFormat = 1 + batchReplVersionV1 = 1 + batchReplVersion = batchReplVersionV1 + batchJobName = "job.bin" + batchJobPrefix = "batch-jobs" + batchJobReportsPrefix = batchJobPrefix + "/reports" batchReplJobAPIVersion = "v1" batchReplJobDefaultRetries = 3 batchReplJobDefaultRetryDelay = 250 * time.Millisecond ) -func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobRequest) error { +func getJobReportPath(job BatchJobRequest) string { var fileName string - var format, version uint16 switch { case job.Replicate != nil: fileName = batchReplName + case job.KeyRotate != nil: + fileName = batchKeyRotationName + case job.Expire != nil: + fileName = batchExpireName + } + return pathJoin(batchJobReportsPrefix, job.ID, fileName) +} + +func getJobPath(job BatchJobRequest) string { + return pathJoin(batchJobPrefix, job.ID) +} + +func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobRequest) error { + var format, version uint16 + switch { + case job.Replicate != nil: version = batchReplVersionV1 format = batchReplFormat case job.KeyRotate != nil: - fileName = batchKeyRotationName version = batchKeyRotateVersionV1 format = batchKeyRotationFormat case job.Expire != nil: - fileName = batchExpireName version = batchExpireVersionV1 format = batchExpireFormat default: return errors.New("no supported batch job request specified") } - data, err := readConfig(ctx, api, pathJoin(job.Location, fileName)) + data, err := readConfig(ctx, api, getJobReportPath(job)) if err != nil { if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) { ri.Version = int(version) @@ -852,8 +865,8 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati now := UTCNow() ri.mu.Lock() var ( - format, version uint16 - jobTyp, fileName string + format, version uint16 + jobTyp string ) if now.Sub(ri.LastUpdate) >= duration { @@ -862,19 +875,16 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati format = batchReplFormat version = batchReplVersion jobTyp = string(job.Type()) - fileName = batchReplName ri.Version = batchReplVersionV1 case madmin.BatchJobKeyRotate: format = batchKeyRotationFormat version = batchKeyRotateVersion jobTyp = string(job.Type()) - fileName = batchKeyRotationName ri.Version = batchKeyRotateVersionV1 case madmin.BatchJobExpire: format = batchExpireFormat version = batchExpireVersion jobTyp = string(job.Type()) - fileName = batchExpireName ri.Version = batchExpireVersionV1 default: return errInvalidArgument @@ -895,7 +905,7 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati if err != nil { return err } - return saveConfig(ctx, api, pathJoin(job.Location, fileName), buf) + return saveConfig(ctx, api, getJobReportPath(job), buf) } ri.mu.Unlock() return nil @@ -1417,15 +1427,8 @@ func (j BatchJobRequest) Validate(ctx context.Context, o ObjectLayer) error { } func (j BatchJobRequest) delete(ctx context.Context, api ObjectLayer) { - switch { - case j.Replicate != nil: - deleteConfig(ctx, api, pathJoin(j.Location, batchReplName)) - case j.KeyRotate != nil: - deleteConfig(ctx, api, pathJoin(j.Location, batchKeyRotationName)) - case j.Expire != nil: - deleteConfig(ctx, api, pathJoin(j.Location, batchExpireName)) - } - deleteConfig(ctx, api, j.Location) + deleteConfig(ctx, api, getJobReportPath(j)) + deleteConfig(ctx, api, getJobPath(j)) } func (j *BatchJobRequest) save(ctx context.Context, api ObjectLayer) error { @@ -1437,13 +1440,12 @@ func (j *BatchJobRequest) save(ctx context.Context, api ObjectLayer) error { return err } - j.Location = pathJoin(batchJobPrefix, j.ID) job, err := j.MarshalMsg(nil) if err != nil { return err } - return saveConfig(ctx, api, j.Location, job) + return saveConfig(ctx, api, getJobPath(*j), job) } func (j *BatchJobRequest) load(ctx context.Context, api ObjectLayer, name string) error { @@ -1671,8 +1673,7 @@ func (a adminAPIHandlers) CancelBatchJob(w http.ResponseWriter, r *http.Request) } j := BatchJobRequest{ - ID: jobID, - Location: pathJoin(batchJobPrefix, jobID), + ID: jobID, } j.delete(ctx, objectAPI) diff --git a/cmd/batch-handlers_gen.go b/cmd/batch-handlers_gen.go index 40b476a27..09d3cec12 100644 --- a/cmd/batch-handlers_gen.go +++ b/cmd/batch-handlers_gen.go @@ -42,12 +42,6 @@ func (z *BatchJobRequest) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Started") return } - case "Location": - z.Location, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "Location") - return - } case "Replicate": if dc.IsNil() { err = dc.ReadNil() @@ -115,9 +109,9 @@ func (z *BatchJobRequest) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *BatchJobRequest) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 7 + // map header, size 6 // write "ID" - err = en.Append(0x87, 0xa2, 0x49, 0x44) + err = en.Append(0x86, 0xa2, 0x49, 0x44) if err != nil { return } @@ -146,16 +140,6 @@ func (z *BatchJobRequest) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Started") return } - // write "Location" - err = en.Append(0xa8, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e) - if err != nil { - return - } - err = en.WriteString(z.Location) - if err != nil { - err = msgp.WrapError(err, "Location") - return - } // write "Replicate" err = en.Append(0xa9, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65) if err != nil { @@ -213,9 +197,9 @@ func (z *BatchJobRequest) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *BatchJobRequest) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 7 + // map header, size 6 // string "ID" - o = append(o, 0x87, 0xa2, 0x49, 0x44) + o = append(o, 0x86, 0xa2, 0x49, 0x44) o = msgp.AppendString(o, z.ID) // string "User" o = append(o, 0xa4, 0x55, 0x73, 0x65, 0x72) @@ -223,9 +207,6 @@ func (z *BatchJobRequest) MarshalMsg(b []byte) (o []byte, err error) { // string "Started" o = append(o, 0xa7, 0x53, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64) o = msgp.AppendTime(o, z.Started) - // string "Location" - o = append(o, 0xa8, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e) - o = msgp.AppendString(o, z.Location) // string "Replicate" o = append(o, 0xa9, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65) if z.Replicate == nil { @@ -298,12 +279,6 @@ func (z *BatchJobRequest) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Started") return } - case "Location": - z.Location, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Location") - return - } case "Replicate": if msgp.IsNil(bts) { bts, err = msgp.ReadNilBytes(bts) @@ -369,7 +344,7 @@ func (z *BatchJobRequest) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *BatchJobRequest) Msgsize() (s int) { - s = 1 + 3 + msgp.StringPrefixSize + len(z.ID) + 5 + msgp.StringPrefixSize + len(z.User) + 8 + msgp.TimeSize + 9 + msgp.StringPrefixSize + len(z.Location) + 10 + s = 1 + 3 + msgp.StringPrefixSize + len(z.ID) + 5 + msgp.StringPrefixSize + len(z.User) + 8 + msgp.TimeSize + 10 if z.Replicate == nil { s += msgp.NilSize } else {