Correct to remove null version while ILM rule application (#16971)

Signed-off-by: Shubhendu Ram Tripathi <shubhendu@minio.io>
Co-authored-by: Harshavardhana <harsha@minio.io>
This commit is contained in:
Shubhendu 2023-04-07 02:40:01 +05:30 committed by GitHub
parent 8fd6be0827
commit 4c204707fd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 25 deletions

View file

@ -991,27 +991,35 @@ func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, oi O
// applyNewerNoncurrentVersionLimit removes noncurrent versions older than the most recent NewerNoncurrentVersions configured.
// Note: This function doesn't update sizeSummary since it always removes versions that it doesn't return.
func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo) ([]FileInfo, error) {
if i.lifeCycle == nil {
return fivs, nil
}
func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo) ([]ObjectInfo, error) {
done := globalScannerMetrics.time(scannerMetricApplyNonCurrent)
defer done()
_, days, lim := i.lifeCycle.NoncurrentVersionsExpirationLimit(lifecycle.ObjectOpts{Name: i.objectPath()})
if lim == 0 || len(fivs) <= lim+1 { // fewer than lim _noncurrent_ versions
return fivs, nil
}
overflowVersions := fivs[lim+1:]
// current version + most recent lim noncurrent versions
fivs = fivs[:lim+1]
rcfg, _ := globalBucketObjectLockSys.Get(i.bucket)
vcfg, _ := globalBucketVersioningSys.Get(i.bucket)
versioned := vcfg != nil && vcfg.Versioned(i.objectPath())
// current version + most recent lim noncurrent versions
objectInfos := make([]ObjectInfo, 0, len(fivs))
if i.lifeCycle == nil {
for _, fi := range fivs {
objectInfos = append(objectInfos, fi.ToObjectInfo(i.bucket, i.objectPath(), versioned))
}
return objectInfos, nil
}
_, days, lim := i.lifeCycle.NoncurrentVersionsExpirationLimit(lifecycle.ObjectOpts{Name: i.objectPath()})
if lim == 0 || len(fivs) <= lim+1 { // fewer than lim _noncurrent_ versions
for _, fi := range fivs {
objectInfos = append(objectInfos, fi.ToObjectInfo(i.bucket, i.objectPath(), versioned))
}
return objectInfos, nil
}
overflowVersions := fivs[lim+1:]
toDel := make([]ObjectToDelete, 0, len(overflowVersions))
for _, fi := range overflowVersions {
obj := fi.ToObjectInfo(i.bucket, i.objectPath(), versioned)
@ -1026,7 +1034,7 @@ func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ Ob
}
// add this version back to remaining versions for
// subsequent lifecycle policy applications
fivs = append(fivs, fi)
objectInfos = append(objectInfos, obj)
continue
}
@ -1040,19 +1048,19 @@ func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ Ob
toDel = append(toDel, ObjectToDelete{
ObjectV: ObjectV{
ObjectName: fi.Name,
VersionID: fi.VersionID,
ObjectName: obj.Name,
VersionID: obj.VersionID,
},
})
}
globalExpiryState.enqueueByNewerNoncurrent(i.bucket, toDel)
return fivs, nil
return objectInfos, nil
}
// applyVersionActions will apply lifecycle checks on all versions of a scanned item. Returns versions that remain
// after applying lifecycle checks configured.
func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fivs []FileInfo) ([]FileInfo, error) {
func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fivs []FileInfo) ([]ObjectInfo, error) {
if i.heal.enabled {
if healDeleteDangling {
done := globalScannerMetrics.time(scannerMetricCleanAbandoned)
@ -1063,13 +1071,14 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi
}
}
}
fivs, err := i.applyNewerNoncurrentVersionLimit(ctx, o, fivs)
objInfos, err := i.applyNewerNoncurrentVersionLimit(ctx, o, fivs)
if err != nil {
return fivs, err
return nil, err
}
// Check if we have many versions after applyNewerNoncurrentVersionLimit.
if len(fivs) > dataScannerExcessiveVersionsThreshold {
if len(objInfos) > dataScannerExcessiveVersionsThreshold {
// Notify object accessed via a GET request.
sendEvent(eventArgs{
EventName: event.ObjectManyVersions,
@ -1083,7 +1092,7 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi
})
}
return fivs, nil
return objInfos, nil
}
// applyActions will apply lifecycle checks on to a scanned item.

View file

@ -514,7 +514,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
}
done := globalScannerMetrics.time(scannerMetricApplyAll)
fivs.Versions, err = item.applyVersionActions(ctx, objAPI, fivs.Versions)
objInfos, err := item.applyVersionActions(ctx, objAPI, fivs.Versions)
done()
if err != nil {
@ -524,8 +524,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
versioned := vcfg != nil && vcfg.Versioned(item.objectPath())
for _, version := range fivs.Versions {
oi := version.ToObjectInfo(item.bucket, item.objectPath(), versioned)
for _, oi := range objInfos {
done = globalScannerMetrics.time(scannerMetricApplyVersion)
sz := item.applyActions(ctx, objAPI, oi, &sizeS)
done()