batch: Separate batch job request and batch job stats (#19205)

Currently, the progress of the batch job is saved in inside the job
request object, which is normally not supported by MinIO. Though there
is no apparent bug, it is better to fix this now.

Batch progress is saved in .minio.sys/batch-jobs/reports/

Co-authored-by: Anis Eleuch <anis@min.io>
This commit is contained in:
Anis Eleuch 2024-03-07 19:58:22 +01:00 committed by GitHub
parent 48b590e14b
commit 68dd74c5ab
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 37 additions and 61 deletions

View file

@ -63,7 +63,6 @@ type BatchJobRequest struct {
ID string `yaml:"-" json:"name"` ID string `yaml:"-" json:"name"`
User string `yaml:"-" json:"user"` User string `yaml:"-" json:"user"`
Started time.Time `yaml:"-" json:"started"` Started time.Time `yaml:"-" json:"started"`
Location string `yaml:"-" json:"location"`
Replicate *BatchJobReplicateV1 `yaml:"replicate" json:"replicate"` Replicate *BatchJobReplicateV1 `yaml:"replicate" json:"replicate"`
KeyRotate *BatchJobKeyRotateV1 `yaml:"keyrotate" json:"keyrotate"` KeyRotate *BatchJobKeyRotateV1 `yaml:"keyrotate" json:"keyrotate"`
Expire *BatchJobExpire `yaml:"expire" json:"expire"` Expire *BatchJobExpire `yaml:"expire" json:"expire"`
@ -716,32 +715,46 @@ const (
batchReplVersion = batchReplVersionV1 batchReplVersion = batchReplVersionV1
batchJobName = "job.bin" batchJobName = "job.bin"
batchJobPrefix = "batch-jobs" batchJobPrefix = "batch-jobs"
batchJobReportsPrefix = batchJobPrefix + "/reports"
batchReplJobAPIVersion = "v1" batchReplJobAPIVersion = "v1"
batchReplJobDefaultRetries = 3 batchReplJobDefaultRetries = 3
batchReplJobDefaultRetryDelay = 250 * time.Millisecond batchReplJobDefaultRetryDelay = 250 * time.Millisecond
) )
func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobRequest) error { func getJobReportPath(job BatchJobRequest) string {
var fileName string var fileName string
var format, version uint16
switch { switch {
case job.Replicate != nil: case job.Replicate != nil:
fileName = batchReplName fileName = batchReplName
case job.KeyRotate != nil:
fileName = batchKeyRotationName
case job.Expire != nil:
fileName = batchExpireName
}
return pathJoin(batchJobReportsPrefix, job.ID, fileName)
}
func getJobPath(job BatchJobRequest) string {
return pathJoin(batchJobPrefix, job.ID)
}
func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobRequest) error {
var format, version uint16
switch {
case job.Replicate != nil:
version = batchReplVersionV1 version = batchReplVersionV1
format = batchReplFormat format = batchReplFormat
case job.KeyRotate != nil: case job.KeyRotate != nil:
fileName = batchKeyRotationName
version = batchKeyRotateVersionV1 version = batchKeyRotateVersionV1
format = batchKeyRotationFormat format = batchKeyRotationFormat
case job.Expire != nil: case job.Expire != nil:
fileName = batchExpireName
version = batchExpireVersionV1 version = batchExpireVersionV1
format = batchExpireFormat format = batchExpireFormat
default: default:
return errors.New("no supported batch job request specified") return errors.New("no supported batch job request specified")
} }
data, err := readConfig(ctx, api, pathJoin(job.Location, fileName)) data, err := readConfig(ctx, api, getJobReportPath(job))
if err != nil { if err != nil {
if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) { if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) {
ri.Version = int(version) ri.Version = int(version)
@ -853,7 +866,7 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati
ri.mu.Lock() ri.mu.Lock()
var ( var (
format, version uint16 format, version uint16
jobTyp, fileName string jobTyp string
) )
if now.Sub(ri.LastUpdate) >= duration { if now.Sub(ri.LastUpdate) >= duration {
@ -862,19 +875,16 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati
format = batchReplFormat format = batchReplFormat
version = batchReplVersion version = batchReplVersion
jobTyp = string(job.Type()) jobTyp = string(job.Type())
fileName = batchReplName
ri.Version = batchReplVersionV1 ri.Version = batchReplVersionV1
case madmin.BatchJobKeyRotate: case madmin.BatchJobKeyRotate:
format = batchKeyRotationFormat format = batchKeyRotationFormat
version = batchKeyRotateVersion version = batchKeyRotateVersion
jobTyp = string(job.Type()) jobTyp = string(job.Type())
fileName = batchKeyRotationName
ri.Version = batchKeyRotateVersionV1 ri.Version = batchKeyRotateVersionV1
case madmin.BatchJobExpire: case madmin.BatchJobExpire:
format = batchExpireFormat format = batchExpireFormat
version = batchExpireVersion version = batchExpireVersion
jobTyp = string(job.Type()) jobTyp = string(job.Type())
fileName = batchExpireName
ri.Version = batchExpireVersionV1 ri.Version = batchExpireVersionV1
default: default:
return errInvalidArgument return errInvalidArgument
@ -895,7 +905,7 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati
if err != nil { if err != nil {
return err return err
} }
return saveConfig(ctx, api, pathJoin(job.Location, fileName), buf) return saveConfig(ctx, api, getJobReportPath(job), buf)
} }
ri.mu.Unlock() ri.mu.Unlock()
return nil return nil
@ -1417,15 +1427,8 @@ func (j BatchJobRequest) Validate(ctx context.Context, o ObjectLayer) error {
} }
func (j BatchJobRequest) delete(ctx context.Context, api ObjectLayer) { func (j BatchJobRequest) delete(ctx context.Context, api ObjectLayer) {
switch { deleteConfig(ctx, api, getJobReportPath(j))
case j.Replicate != nil: deleteConfig(ctx, api, getJobPath(j))
deleteConfig(ctx, api, pathJoin(j.Location, batchReplName))
case j.KeyRotate != nil:
deleteConfig(ctx, api, pathJoin(j.Location, batchKeyRotationName))
case j.Expire != nil:
deleteConfig(ctx, api, pathJoin(j.Location, batchExpireName))
}
deleteConfig(ctx, api, j.Location)
} }
func (j *BatchJobRequest) save(ctx context.Context, api ObjectLayer) error { func (j *BatchJobRequest) save(ctx context.Context, api ObjectLayer) error {
@ -1437,13 +1440,12 @@ func (j *BatchJobRequest) save(ctx context.Context, api ObjectLayer) error {
return err return err
} }
j.Location = pathJoin(batchJobPrefix, j.ID)
job, err := j.MarshalMsg(nil) job, err := j.MarshalMsg(nil)
if err != nil { if err != nil {
return err return err
} }
return saveConfig(ctx, api, j.Location, job) return saveConfig(ctx, api, getJobPath(*j), job)
} }
func (j *BatchJobRequest) load(ctx context.Context, api ObjectLayer, name string) error { func (j *BatchJobRequest) load(ctx context.Context, api ObjectLayer, name string) error {
@ -1672,7 +1674,6 @@ func (a adminAPIHandlers) CancelBatchJob(w http.ResponseWriter, r *http.Request)
j := BatchJobRequest{ j := BatchJobRequest{
ID: jobID, ID: jobID,
Location: pathJoin(batchJobPrefix, jobID),
} }
j.delete(ctx, objectAPI) j.delete(ctx, objectAPI)

View file

@ -42,12 +42,6 @@ func (z *BatchJobRequest) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "Started") err = msgp.WrapError(err, "Started")
return return
} }
case "Location":
z.Location, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Location")
return
}
case "Replicate": case "Replicate":
if dc.IsNil() { if dc.IsNil() {
err = dc.ReadNil() err = dc.ReadNil()
@ -115,9 +109,9 @@ func (z *BatchJobRequest) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable // EncodeMsg implements msgp.Encodable
func (z *BatchJobRequest) EncodeMsg(en *msgp.Writer) (err error) { func (z *BatchJobRequest) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 7 // map header, size 6
// write "ID" // write "ID"
err = en.Append(0x87, 0xa2, 0x49, 0x44) err = en.Append(0x86, 0xa2, 0x49, 0x44)
if err != nil { if err != nil {
return return
} }
@ -146,16 +140,6 @@ func (z *BatchJobRequest) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "Started") err = msgp.WrapError(err, "Started")
return return
} }
// write "Location"
err = en.Append(0xa8, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e)
if err != nil {
return
}
err = en.WriteString(z.Location)
if err != nil {
err = msgp.WrapError(err, "Location")
return
}
// write "Replicate" // write "Replicate"
err = en.Append(0xa9, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65) err = en.Append(0xa9, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65)
if err != nil { if err != nil {
@ -213,9 +197,9 @@ func (z *BatchJobRequest) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler // MarshalMsg implements msgp.Marshaler
func (z *BatchJobRequest) MarshalMsg(b []byte) (o []byte, err error) { func (z *BatchJobRequest) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize()) o = msgp.Require(b, z.Msgsize())
// map header, size 7 // map header, size 6
// string "ID" // string "ID"
o = append(o, 0x87, 0xa2, 0x49, 0x44) o = append(o, 0x86, 0xa2, 0x49, 0x44)
o = msgp.AppendString(o, z.ID) o = msgp.AppendString(o, z.ID)
// string "User" // string "User"
o = append(o, 0xa4, 0x55, 0x73, 0x65, 0x72) o = append(o, 0xa4, 0x55, 0x73, 0x65, 0x72)
@ -223,9 +207,6 @@ func (z *BatchJobRequest) MarshalMsg(b []byte) (o []byte, err error) {
// string "Started" // string "Started"
o = append(o, 0xa7, 0x53, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64) o = append(o, 0xa7, 0x53, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64)
o = msgp.AppendTime(o, z.Started) o = msgp.AppendTime(o, z.Started)
// string "Location"
o = append(o, 0xa8, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e)
o = msgp.AppendString(o, z.Location)
// string "Replicate" // string "Replicate"
o = append(o, 0xa9, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65) o = append(o, 0xa9, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65)
if z.Replicate == nil { if z.Replicate == nil {
@ -298,12 +279,6 @@ func (z *BatchJobRequest) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "Started") err = msgp.WrapError(err, "Started")
return return
} }
case "Location":
z.Location, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Location")
return
}
case "Replicate": case "Replicate":
if msgp.IsNil(bts) { if msgp.IsNil(bts) {
bts, err = msgp.ReadNilBytes(bts) bts, err = msgp.ReadNilBytes(bts)
@ -369,7 +344,7 @@ func (z *BatchJobRequest) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *BatchJobRequest) Msgsize() (s int) { func (z *BatchJobRequest) Msgsize() (s int) {
s = 1 + 3 + msgp.StringPrefixSize + len(z.ID) + 5 + msgp.StringPrefixSize + len(z.User) + 8 + msgp.TimeSize + 9 + msgp.StringPrefixSize + len(z.Location) + 10 s = 1 + 3 + msgp.StringPrefixSize + len(z.ID) + 5 + msgp.StringPrefixSize + len(z.User) + 8 + msgp.TimeSize + 10
if z.Replicate == nil { if z.Replicate == nil {
s += msgp.NilSize s += msgp.NilSize
} else { } else {