From 3da9ee15d316808c59bd6e7d374e388a01a734dc Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Fri, 19 Nov 2021 17:54:10 -0800 Subject: [PATCH] Add MaxNoncurrentVersions to NoncurrentExpiration action (#13580) This unit allows users to limit the maximum number of noncurrent versions of an object. To enable this rule you need the following *ilm.json* ``` cat >> ilm.json <= APIErrorCode(len(_APIErrorCode_index)-1) { diff --git a/cmd/bucket-lifecycle-handlers.go b/cmd/bucket-lifecycle-handlers.go index a1673324b..bcc32cc96 100644 --- a/cmd/bucket-lifecycle-handlers.go +++ b/cmd/bucket-lifecycle-handlers.go @@ -24,6 +24,7 @@ import ( "github.com/gorilla/mux" "github.com/minio/minio/internal/bucket/lifecycle" + "github.com/minio/minio/internal/bucket/object/lock" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/bucket/policy" @@ -79,6 +80,17 @@ func (api objectAPIHandlers) PutBucketLifecycleHandler(w http.ResponseWriter, r return } + // Disallow MaxNoncurrentVersions if bucket has object locking enabled + var rCfg lock.Retention + if rCfg, err = globalBucketObjectLockSys.Get(bucket); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + if rCfg.LockEnabled && bucketLifecycle.HasMaxNoncurrentVersions() { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidLifecycleWithObjectLock), r.URL) + return + } + // Validate the transition storage ARNs if err = validateTransitionTier(bucketLifecycle); err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 3744c7da4..4e4e13c8a 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -81,40 +81,58 @@ type expiryTask struct { } type expiryState struct { - once sync.Once - expiryCh chan expiryTask + once sync.Once + byDaysCh chan expiryTask + byMaxNoncurrentCh chan maxNoncurrentTask } // PendingTasks returns the number of pending ILM expiry tasks. func (es *expiryState) PendingTasks() int { - return len(es.expiryCh) + return len(es.byDaysCh) + len(es.byMaxNoncurrentCh) } -func (es *expiryState) queueExpiryTask(oi ObjectInfo, restoredObject bool, rmVersion bool) { +// close closes work channels exactly once. +func (es *expiryState) close() { + es.once.Do(func() { + close(es.byDaysCh) + close(es.byMaxNoncurrentCh) + }) +} + +// enqueueByDays enqueues object versions expired by days for expiry. +func (es *expiryState) enqueueByDays(oi ObjectInfo, restoredObject bool, rmVersion bool) { select { case <-GlobalContext.Done(): - es.once.Do(func() { - close(es.expiryCh) - }) - case es.expiryCh <- expiryTask{objInfo: oi, versionExpiry: rmVersion, restoredObject: restoredObject}: + es.close() + case es.byDaysCh <- expiryTask{objInfo: oi, versionExpiry: rmVersion, restoredObject: restoredObject}: default: } } -var ( - globalExpiryState *expiryState -) +// enqueueByMaxNoncurrent enqueues object versions expired by +// MaxNoncurrentVersions limit for expiry. +func (es *expiryState) enqueueByMaxNoncurrent(bucket string, versions []ObjectToDelete) { + select { + case <-GlobalContext.Done(): + es.close() + case es.byMaxNoncurrentCh <- maxNoncurrentTask{bucket: bucket, versions: versions}: + default: + } +} + +var globalExpiryState *expiryState func newExpiryState() *expiryState { return &expiryState{ - expiryCh: make(chan expiryTask, 10000), + byDaysCh: make(chan expiryTask, 10000), + byMaxNoncurrentCh: make(chan maxNoncurrentTask, 10000), } } func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) { globalExpiryState = newExpiryState() go func() { - for t := range globalExpiryState.expiryCh { + for t := range globalExpiryState.byDaysCh { if t.objInfo.TransitionedObject.Status != "" { applyExpiryOnTransitionedObject(ctx, objectAPI, t.objInfo, t.restoredObject) } else { @@ -122,6 +140,18 @@ func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) { } } }() + go func() { + for t := range globalExpiryState.byMaxNoncurrentCh { + deleteObjectVersions(ctx, objectAPI, t.bucket, t.versions) + } + }() +} + +// maxNoncurrentTask encapsulates arguments required by worker to expire objects +// by MaxNoncurrentVersions +type maxNoncurrentTask struct { + bucket string + versions []ObjectToDelete } type transitionState struct { diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index dbb94d65e..3ca149870 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -865,6 +865,7 @@ func (i *scannerItem) transformMetaDir() { } var applyActionsLogPrefix = color.Green("applyActions:") +var applyVersionActionsLogPrefix = color.Green("applyVersionActions:") func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, oi ObjectInfo) (size int64) { if i.debug { @@ -970,13 +971,57 @@ func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, oi O } +// applyMaxNoncurrentVersionLimit removes noncurrent versions older than the most recent MaxNoncurrentVersions configured. +// Note: This function doesn't update sizeSummary since it always removes versions that it doesn't return. +func (i *scannerItem) applyMaxNoncurrentVersionLimit(ctx context.Context, o ObjectLayer, fivs []FileInfo) ([]FileInfo, error) { + if i.lifeCycle == nil { + return fivs, nil + } + + lim := i.lifeCycle.NoncurrentVersionsExpirationLimit(lifecycle.ObjectOpts{Name: i.objectPath()}) + if lim == 0 || len(fivs) <= lim+1 { // fewer than lim _noncurrent_ versions + return fivs, nil + } + + overflowVersions := fivs[lim+1:] + // current version + most recent lim noncurrent versions + fivs = fivs[:lim+1] + + rcfg, _ := globalBucketObjectLockSys.Get(i.bucket) + toDel := make([]ObjectToDelete, 0, len(overflowVersions)) + for _, fi := range overflowVersions { + obj := fi.ToObjectInfo(i.bucket, i.objectPath()) + if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj) { + if i.debug { + if obj.VersionID != "" { + console.Debugf(applyVersionActionsLogPrefix+" lifecycle: %s v(%s) is locked, not deleting\n", obj.Name, obj.VersionID) + } else { + console.Debugf(applyVersionActionsLogPrefix+" lifecycle: %s is locked, not deleting\n", obj.Name) + } + } + continue + } + toDel = append(toDel, ObjectToDelete{ + ObjectName: fi.Name, + VersionID: fi.VersionID, + }) + } + + globalExpiryState.enqueueByMaxNoncurrent(i.bucket, toDel) + return fivs, nil +} + +// applyVersionActions will apply lifecycle checks on all versions of a scanned item. Returns versions that remain +// after applying lifecycle checks configured. +func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fivs []FileInfo) ([]FileInfo, error) { + return i.applyMaxNoncurrentVersionLimit(ctx, o, fivs) +} + // applyActions will apply lifecycle checks on to a scanned item. // The resulting size on disk will always be returned. // The metadata will be compared to consensus on the object layer before any changes are applied. // If no metadata is supplied, -1 is returned if no action is taken. func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) int64 { - i.applyTierObjSweep(ctx, o, oi) - applied, size := i.applyLifecycle(ctx, o, oi) // For instance, an applied lifecycle means we remove/transitioned an object // from the current deployment, which means we don't have to call healing @@ -1093,7 +1138,7 @@ func applyExpiryOnNonTransitionedObjects(ctx context.Context, objLayer ObjectLay // Apply object, object version, restored object or restored object version action on the given object func applyExpiryRule(obj ObjectInfo, restoredObject, applyOnVersion bool) bool { - globalExpiryState.queueExpiryTask(obj, restoredObject, applyOnVersion) + globalExpiryState.enqueueByDays(obj, restoredObject, applyOnVersion) return true } diff --git a/cmd/object-handlers-common.go b/cmd/object-handlers-common.go index ac3fb3091..251a0ec51 100644 --- a/cmd/object-handlers-common.go +++ b/cmd/object-handlers-common.go @@ -24,7 +24,9 @@ import ( "strconv" "time" + "github.com/minio/minio/internal/event" xhttp "github.com/minio/minio/internal/http" + "github.com/minio/minio/internal/logger" ) var ( @@ -264,3 +266,41 @@ func setPutObjHeaders(w http.ResponseWriter, objInfo ObjectInfo, delete bool) { } } } + +func deleteObjectVersions(ctx context.Context, o ObjectLayer, bucket string, toDel []ObjectToDelete) { + versioned := globalBucketVersioningSys.Enabled(bucket) + versionSuspended := globalBucketVersioningSys.Suspended(bucket) + for remaining := toDel; len(remaining) > 0; toDel = remaining { + if len(toDel) > maxDeleteList { + remaining = toDel[maxDeleteList:] + toDel = toDel[:maxDeleteList] + } else { + remaining = nil + } + deletedObjs, errs := o.DeleteObjects(ctx, bucket, toDel, ObjectOptions{ + Versioned: versioned, + VersionSuspended: versionSuspended, + }) + var logged bool + for i, err := range errs { + if err != nil { + if !logged { + // log the first error + logger.LogIf(ctx, err) + logged = true + } + continue + } + dobj := deletedObjs[i] + sendEvent(eventArgs{ + EventName: event.ObjectRemovedDelete, + BucketName: bucket, + Object: ObjectInfo{ + Name: dobj.ObjectName, + VersionID: dobj.VersionID, + }, + Host: "Internal: [ILM-EXPIRY]", + }) + } + } +} diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index b4e686a75..a568f6ea1 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -93,7 +93,8 @@ type FileInfoVersions struct { // latest version. LatestModTime time.Time `msg:"lm"` - Versions []FileInfo `msg:"vs"` + Versions []FileInfo `msg:"vs"` + FreeVersions []FileInfo `msg:"fvs"` } // findVersionIndex will return the version index where the version diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index 8d0d9ac10..5b9f4a3bc 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -1128,8 +1128,8 @@ func (z *FileInfoVersions) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err) return } - if zb0001 != 4 { - err = msgp.ArrayError{Wanted: 4, Got: zb0001} + if zb0001 != 5 { + err = msgp.ArrayError{Wanted: 5, Got: zb0001} return } z.Volume, err = dc.ReadString() @@ -1165,13 +1165,31 @@ func (z *FileInfoVersions) DecodeMsg(dc *msgp.Reader) (err error) { return } } + var zb0003 uint32 + zb0003, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "FreeVersions") + return + } + if cap(z.FreeVersions) >= int(zb0003) { + z.FreeVersions = (z.FreeVersions)[:zb0003] + } else { + z.FreeVersions = make([]FileInfo, zb0003) + } + for za0002 := range z.FreeVersions { + err = z.FreeVersions[za0002].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "FreeVersions", za0002) + return + } + } return } // EncodeMsg implements msgp.Encodable func (z *FileInfoVersions) EncodeMsg(en *msgp.Writer) (err error) { - // array header, size 4 - err = en.Append(0x94) + // array header, size 5 + err = en.Append(0x95) if err != nil { return } @@ -1202,14 +1220,26 @@ func (z *FileInfoVersions) EncodeMsg(en *msgp.Writer) (err error) { return } } + err = en.WriteArrayHeader(uint32(len(z.FreeVersions))) + if err != nil { + err = msgp.WrapError(err, "FreeVersions") + return + } + for za0002 := range z.FreeVersions { + err = z.FreeVersions[za0002].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "FreeVersions", za0002) + return + } + } return } // MarshalMsg implements msgp.Marshaler func (z *FileInfoVersions) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // array header, size 4 - o = append(o, 0x94) + // array header, size 5 + o = append(o, 0x95) o = msgp.AppendString(o, z.Volume) o = msgp.AppendString(o, z.Name) o = msgp.AppendTime(o, z.LatestModTime) @@ -1221,6 +1251,14 @@ func (z *FileInfoVersions) MarshalMsg(b []byte) (o []byte, err error) { return } } + o = msgp.AppendArrayHeader(o, uint32(len(z.FreeVersions))) + for za0002 := range z.FreeVersions { + o, err = z.FreeVersions[za0002].MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "FreeVersions", za0002) + return + } + } return } @@ -1232,8 +1270,8 @@ func (z *FileInfoVersions) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err) return } - if zb0001 != 4 { - err = msgp.ArrayError{Wanted: 4, Got: zb0001} + if zb0001 != 5 { + err = msgp.ArrayError{Wanted: 5, Got: zb0001} return } z.Volume, bts, err = msgp.ReadStringBytes(bts) @@ -1269,6 +1307,24 @@ func (z *FileInfoVersions) UnmarshalMsg(bts []byte) (o []byte, err error) { return } } + var zb0003 uint32 + zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "FreeVersions") + return + } + if cap(z.FreeVersions) >= int(zb0003) { + z.FreeVersions = (z.FreeVersions)[:zb0003] + } else { + z.FreeVersions = make([]FileInfo, zb0003) + } + for za0002 := range z.FreeVersions { + bts, err = z.FreeVersions[za0002].UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "FreeVersions", za0002) + return + } + } o = bts return } @@ -1279,6 +1335,10 @@ func (z *FileInfoVersions) Msgsize() (s int) { for za0001 := range z.Versions { s += z.Versions[za0001].Msgsize() } + s += msgp.ArrayHeaderSize + for za0002 := range z.FreeVersions { + s += z.FreeVersions[za0002].Msgsize() + } return } diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 6229a5eed..2bd6e803d 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -18,7 +18,7 @@ package cmd const ( - storageRESTVersion = "v41" // Optimized DeleteVersions API + storageRESTVersion = "v42" // Added FreeVersions to FileInfoVersions storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) diff --git a/cmd/xl-storage-format-utils.go b/cmd/xl-storage-format-utils.go index fba0dadbf..fe861201a 100644 --- a/cmd/xl-storage-format-utils.go +++ b/cmd/xl-storage-format-utils.go @@ -26,27 +26,6 @@ import ( ) func getFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVersions, error) { - fivs, err := getAllFileInfoVersions(xlMetaBuf, volume, path) - if err != nil { - return fivs, err - } - n := 0 - for _, fi := range fivs.Versions { - // Filter our tier object delete marker - if !fi.TierFreeVersion() { - fivs.Versions[n] = fi - n++ - } - } - fivs.Versions = fivs.Versions[:n] - // Update numversions - for i := range fivs.Versions { - fivs.Versions[i].NumVersions = n - } - return fivs, nil -} - -func getAllFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVersions, error) { if isXL2V1Format(xlMetaBuf) { var versions []FileInfo var err error @@ -63,10 +42,25 @@ func getAllFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVers return FileInfoVersions{}, err } + var freeVersions []FileInfo + n := 0 + for _, fi := range versions { + if fi.TierFreeVersion() { + freeVersions = append(freeVersions, fi) + continue + } + versions[n] = fi + n++ + } + versions = versions[:n] + for _, ver := range versions { + ver.NumVersions = n + } return FileInfoVersions{ Volume: volume, Name: path, Versions: versions, + FreeVersions: freeVersions, LatestModTime: versions[0].ModTime, }, nil } @@ -83,7 +77,7 @@ func getAllFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVers } fi.IsLatest = true // No versions so current version is latest. - fi.XLV1 = true // indicates older version + fi.NumVersions = 1 // just this version return FileInfoVersions{ Volume: volume, Name: path, diff --git a/cmd/xl-storage-format-utils_test.go b/cmd/xl-storage-format-utils_test.go index ee7d48fbf..0310bde4f 100644 --- a/cmd/xl-storage-format-utils_test.go +++ b/cmd/xl-storage-format-utils_test.go @@ -18,8 +18,11 @@ package cmd import ( + "sort" "testing" + "time" + "github.com/minio/minio/internal/bucket/lifecycle" xhttp "github.com/minio/minio/internal/http" ) @@ -108,3 +111,95 @@ func Test_hashDeterministicString(t *testing.T) { }) } } + +func TestGetFileInfoVersions(t *testing.T) { + basefi := FileInfo{ + Volume: "volume", + Name: "object-name", + VersionID: "756100c6-b393-4981-928a-d49bbc164741", + IsLatest: true, + Deleted: false, + TransitionStatus: "", + DataDir: "bffea160-ca7f-465f-98bc-9b4f1c3ba1ef", + XLV1: false, + ModTime: time.Now().UTC(), + Size: 0, + Mode: 0, + Metadata: nil, + Parts: nil, + Erasure: ErasureInfo{ + Algorithm: ReedSolomon.String(), + DataBlocks: 4, + ParityBlocks: 2, + BlockSize: 10000, + Index: 1, + Distribution: []int{1, 2, 3, 4, 5, 6, 7, 8}, + Checksums: []ChecksumInfo{{ + PartNumber: 1, + Algorithm: HighwayHash256S, + Hash: nil, + }}, + }, + MarkDeleted: false, + NumVersions: 1, + SuccessorModTime: time.Time{}, + } + xl := xlMetaV2{} + var versions []FileInfo + var freeVersionIDs []string + for i := 0; i < 5; i++ { + fi := basefi + fi.VersionID = mustGetUUID() + fi.DataDir = mustGetUUID() + fi.ModTime = basefi.ModTime.Add(time.Duration(i) * time.Second) + if err := xl.AddVersion(fi); err != nil { + t.Fatalf("%d: Failed to add version %v", i+1, err) + } + + if i > 3 { + // Simulate transition of a version + transfi := fi + transfi.TransitionStatus = lifecycle.TransitionComplete + transfi.TransitionTier = "MINIO-TIER" + transfi.TransitionedObjName = mustGetUUID() + xl.DeleteVersion(transfi) + + fi.SetTierFreeVersionID(mustGetUUID()) + // delete this version leading to a free version + xl.DeleteVersion(fi) + freeVersionIDs = append(freeVersionIDs, fi.TierFreeVersionID()) + } else { + versions = append(versions, fi) + } + } + buf, err := xl.AppendTo(nil) + if err != nil { + t.Fatalf("Failed to serialize xlmeta %v", err) + } + fivs, err := getFileInfoVersions(buf, basefi.Volume, basefi.Name) + if err != nil { + t.Fatalf("getFileInfoVersions failed: %v", err) + } + + sort.Slice(versions, func(i, j int) bool { + if versions[i].IsLatest { + return true + } + if versions[j].IsLatest { + return false + } + return versions[i].ModTime.After(versions[j].ModTime) + }) + + for i, fi := range fivs.Versions { + if fi.VersionID != versions[i].VersionID { + t.Fatalf("getFileInfoVersions: versions don't match at %d, version id expected %s but got %s", i, fi.VersionID, versions[i].VersionID) + } + } + + for i, free := range fivs.FreeVersions { + if free.VersionID != freeVersionIDs[i] { + t.Fatalf("getFileInfoVersions: free versions don't match at %d, version id expected %s but got %s", i, free.VersionID, freeVersionIDs[i]) + } + } +} diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 9f6c9b93d..ec2595a74 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -455,7 +455,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates // Remove filename which is the meta file. item.transformMetaDir() - fivs, err := getAllFileInfoVersions(buf, item.bucket, item.objectPath()) + fivs, err := getFileInfoVersions(buf, item.bucket, item.objectPath()) if err != nil { if intDataUpdateTracker.debug { console.Debugf(color.Green("scannerBucket:")+" reading xl.meta failed: %v: %w\n", item.Path, err) @@ -468,6 +468,13 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates sizeS.tiers = make(map[string]tierStats) } atomic.AddUint64(&globalScannerStats.accTotalObjects, 1) + fivs.Versions, err = item.applyVersionActions(ctx, objAPI, fivs.Versions) + if err != nil { + if intDataUpdateTracker.debug { + console.Debugf(color.Green("scannerBucket:")+" applying version actions failed: %v: %w\n", item.Path, err) + } + return sizeSummary{}, errSkipFile + } for _, version := range fivs.Versions { atomic.AddUint64(&globalScannerStats.accTotalVersions, 1) oi := version.ToObjectInfo(item.bucket, item.objectPath()) @@ -492,6 +499,12 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates } sizeS.tiers[tier] = sizeS.tiers[tier].add(oi.tierStats()) } + + // apply tier sweep action on free versions + for _, freeVersion := range fivs.FreeVersions { + oi := freeVersion.ToObjectInfo(item.bucket, item.objectPath()) + item.applyTierObjSweep(ctx, objAPI, oi) + } return sizeS, nil }) diff --git a/docs/bucket/lifecycle/README.md b/docs/bucket/lifecycle/README.md index 4446717ba..492f43dfd 100644 --- a/docs/bucket/lifecycle/README.md +++ b/docs/bucket/lifecycle/README.md @@ -81,7 +81,29 @@ e.g., To scan objects stored under `user-uploads/` prefix and remove versions ol } ``` -### 3.2 Automatic removal of delete markers with no other versions + +### 3.2 Automatic removal of noncurrent versions older than most recent + +It is possible to configure automatic removal of noncurrent versions older than the most recent `N` using MinIO specific lifecycle policy extension `MaxNoncurrentVersions`. + +e.g, To remove noncurrent versions of all objects older than most recent 5 noncurrent versions under the prefix `user-uploads/`, +``` +{ + "Rules": [ + { + "ID": "Remove noncurrent versions older than", + "Status": "Enabled", + "Filter": { + "Prefix": "users-uploads/" + }, + "NoncurrentVersionExpiration": { + "MaxNoncurrentVersions": 5 + } + } + ] +} +``` +### 3.3 Automatic removal of delete markers with no other versions When an object has only one version as a delete marker, the latter can be automatically removed after a certain number of days using the following configuration: diff --git a/internal/bucket/lifecycle/lifecycle.go b/internal/bucket/lifecycle/lifecycle.go index 1ad24d2bf..c622430f3 100644 --- a/internal/bucket/lifecycle/lifecycle.go +++ b/internal/bucket/lifecycle/lifecycle.go @@ -29,10 +29,11 @@ import ( ) var ( - errLifecycleTooManyRules = Errorf("Lifecycle configuration allows a maximum of 1000 rules") - errLifecycleNoRule = Errorf("Lifecycle configuration should have at least one rule") - errLifecycleDuplicateID = Errorf("Lifecycle configuration has rule with the same ID. Rule ID must be unique.") - errXMLNotWellFormed = Errorf("The XML you provided was not well-formed or did not validate against our published schema") + errLifecycleTooManyRules = Errorf("Lifecycle configuration allows a maximum of 1000 rules") + errLifecycleNoRule = Errorf("Lifecycle configuration should have at least one rule") + errLifecycleDuplicateID = Errorf("Lifecycle configuration has rule with the same ID. Rule ID must be unique.") + errXMLNotWellFormed = Errorf("The XML you provided was not well-formed or did not validate against our published schema") + errLifecycleInvalidNoncurrentExpiration = Errorf("Exactly one of NoncurrentDays (positive integer) or MaxNoncurrentVersions should be specified in a NoncurrentExpiration rule.") ) const ( @@ -140,6 +141,9 @@ func (lc Lifecycle) HasActiveRules(prefix string, recursive bool) bool { if rule.NoncurrentVersionExpiration.NoncurrentDays > 0 { return true } + if rule.NoncurrentVersionExpiration.MaxNoncurrentVersions > 0 { + return true + } if !rule.NoncurrentVersionTransition.IsNull() { return true } @@ -234,6 +238,10 @@ func (lc Lifecycle) FilterActionableRules(obj ObjectOpts) []Rule { rules = append(rules, rule) continue } + if rule.NoncurrentVersionExpiration.MaxNoncurrentVersions > 0 { + rules = append(rules, rule) + continue + } // The NoncurrentVersionTransition action requests MinIO to transition // noncurrent versions of objects x days after the objects become // noncurrent. @@ -468,3 +476,32 @@ func (lc Lifecycle) TransitionTier(obj ObjectOpts) string { } return "" } + +// NoncurrentVersionsExpirationLimit returns the minimum limit on number of +// noncurrent versions across rules. +func (lc Lifecycle) NoncurrentVersionsExpirationLimit(obj ObjectOpts) int { + var lim int + for _, rule := range lc.FilterActionableRules(obj) { + if rule.NoncurrentVersionExpiration.MaxNoncurrentVersions == 0 { + continue + } + if lim == 0 || lim > rule.NoncurrentVersionExpiration.MaxNoncurrentVersions { + lim = rule.NoncurrentVersionExpiration.MaxNoncurrentVersions + } + } + return lim +} + +// HasMaxNoncurrentVersions returns true if there exists a rule with +// MaxNoncurrentVersions limit set. +func (lc Lifecycle) HasMaxNoncurrentVersions() bool { + for _, rule := range lc.Rules { + if rule.Status == Disabled { + continue + } + if rule.NoncurrentVersionExpiration.MaxNoncurrentVersions > 0 { + return true + } + } + return false +} diff --git a/internal/bucket/lifecycle/lifecycle_test.go b/internal/bucket/lifecycle/lifecycle_test.go index 5d53c1e0c..6b4133ff3 100644 --- a/internal/bucket/lifecycle/lifecycle_test.go +++ b/internal/bucket/lifecycle/lifecycle_test.go @@ -23,6 +23,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "strconv" "strings" "testing" "time" @@ -111,6 +112,12 @@ func TestParseAndValidateLifecycleConfig(t *testing.T) { expectedParsingErr: nil, expectedValidationErr: nil, }, + // Lifecycle with max noncurrent versions + { + inputConfig: `rule>Enabled5`, + expectedParsingErr: nil, + expectedValidationErr: nil, + }, } for i, tc := range testCases { @@ -619,3 +626,24 @@ func TestTransitionTier(t *testing.T) { t.Fatalf("Expected TIER-2 but got %s", got) } } + +func TestNoncurrentVersionsLimit(t *testing.T) { + // test that the lowest max noncurrent versions limit is returned among + // matching rules + var rules []Rule + for i := 1; i <= 10; i++ { + rules = append(rules, Rule{ + ID: strconv.Itoa(i), + Status: "Enabled", + NoncurrentVersionExpiration: NoncurrentVersionExpiration{ + MaxNoncurrentVersions: i, + }, + }) + } + lc := Lifecycle{ + Rules: rules, + } + if lim := lc.NoncurrentVersionsExpirationLimit(ObjectOpts{Name: "obj"}); lim != 1 { + t.Fatalf("Expected max noncurrent versions limit to be 1 but got %d", lim) + } +} diff --git a/internal/bucket/lifecycle/noncurrentversion.go b/internal/bucket/lifecycle/noncurrentversion.go index 787df0763..cf9ff6a04 100644 --- a/internal/bucket/lifecycle/noncurrentversion.go +++ b/internal/bucket/lifecycle/noncurrentversion.go @@ -24,14 +24,15 @@ import ( // NoncurrentVersionExpiration - an action for lifecycle configuration rule. type NoncurrentVersionExpiration struct { - XMLName xml.Name `xml:"NoncurrentVersionExpiration"` - NoncurrentDays ExpirationDays `xml:"NoncurrentDays,omitempty"` - set bool + XMLName xml.Name `xml:"NoncurrentVersionExpiration"` + NoncurrentDays ExpirationDays `xml:"NoncurrentDays,omitempty"` + MaxNoncurrentVersions int `xml:"MaxNoncurrentVersions,omitempty"` + set bool } // MarshalXML if non-current days not set to non zero value func (n NoncurrentVersionExpiration) MarshalXML(e *xml.Encoder, start xml.StartElement) error { - if n.IsDaysNull() { + if n.IsNull() { return nil } type noncurrentVersionExpirationWrapper NoncurrentVersionExpiration @@ -51,6 +52,11 @@ func (n *NoncurrentVersionExpiration) UnmarshalXML(d *xml.Decoder, startElement return nil } +// IsNull returns if both NoncurrentDays and NoncurrentVersions are empty +func (n NoncurrentVersionExpiration) IsNull() bool { + return n.IsDaysNull() && n.MaxNoncurrentVersions == 0 +} + // IsDaysNull returns true if days field is null func (n NoncurrentVersionExpiration) IsDaysNull() bool { return n.NoncurrentDays == ExpirationDays(0) @@ -62,8 +68,17 @@ func (n NoncurrentVersionExpiration) Validate() error { return nil } val := int(n.NoncurrentDays) - if val <= 0 { + switch { + case val == 0 && n.MaxNoncurrentVersions == 0: + // both fields can't be zero return errXMLNotWellFormed + + case val > 0 && n.MaxNoncurrentVersions > 0: + // both tags can't be non-zero simultaneously + return errLifecycleInvalidNoncurrentExpiration + + case val < 0, n.MaxNoncurrentVersions < 0: + // negative values are not supported } return nil }