heal: Pass scan mode to HealObjects to deep scan full quorum objects (#9159)

As an optimization of the healing, HealObjects() avoid sending an
object to the background healing subsystem when the object is
present in all disks.

However, HealObjects() should have checked the scan type, if this
deep, always pass the object to the healing subsystem.
This commit is contained in:
Anis Elleuch 2020-03-19 01:50:00 +01:00 committed by GitHub
parent 09d35d3b4c
commit db2155551a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 37 additions and 37 deletions

View file

@ -666,7 +666,7 @@ func (h *healSequence) healMinioSysMeta(metaPrefix string) func() error {
// NOTE: Healing on meta is run regardless
// of any bucket being selected, this is to ensure that
// meta are always upto date and correct.
return objectAPI.HealObjects(h.ctx, minioMetaBucket, metaPrefix, func(bucket string, object string) error {
return objectAPI.HealObjects(h.ctx, minioMetaBucket, metaPrefix, h.settings, func(bucket string, object string) error {
if h.isQuitting() {
return errHealStopSignalled
}
@ -760,7 +760,7 @@ func (h *healSequence) healBucket(bucket string, bucketsOnly bool) error {
return nil
}
if err := objectAPI.HealObjects(h.ctx, bucket, h.objPrefix, h.healObject); err != nil {
if err := objectAPI.HealObjects(h.ctx, bucket, h.objPrefix, h.settings, h.healObject); err != nil {
return errFnHealFromAPIErr(h.ctx, err)
}
return nil

View file

@ -186,5 +186,5 @@ func bgHealObject(ctx context.Context, bucket, object string, opts madmin.HealOp
if objectAPI == nil {
return madmin.HealResultItem{}, errServerNotInitialized
}
return objectAPI.HealObject(ctx, bucket, object, opts.DryRun, opts.Remove, opts.ScanMode)
return objectAPI.HealObject(ctx, bucket, object, opts)
}

View file

@ -1257,7 +1257,7 @@ func (fs *FSObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealRe
}
// HealObject - no-op for fs. Valid only for XL.
func (fs *FSObjects) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (
func (fs *FSObjects) HealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (
res madmin.HealResultItem, err error) {
logger.LogIf(ctx, NotImplemented{})
return res, NotImplemented{}
@ -1280,7 +1280,7 @@ func (fs *FSObjects) Walk(ctx context.Context, bucket, prefix string, results ch
}
// HealObjects - no-op for fs. Valid only for XL.
func (fs *FSObjects) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) (e error) {
func (fs *FSObjects) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, fn healObjectFn) (e error) {
logger.LogIf(ctx, NotImplemented{})
return NotImplemented{}
}

View file

@ -390,7 +390,7 @@ func TestFSHealObject(t *testing.T) {
defer os.RemoveAll(disk)
obj := initFSObjects(disk, t)
_, err := obj.HealObject(context.Background(), "bucket", "object", false, false, madmin.HealDeepScan)
_, err := obj.HealObject(context.Background(), "bucket", "object", madmin.HealOpts{})
if err == nil || !isSameType(err, NotImplemented{}) {
t.Fatalf("Heal Object should return NotImplemented error ")
}
@ -402,7 +402,7 @@ func TestFSHealObjects(t *testing.T) {
defer os.RemoveAll(disk)
obj := initFSObjects(disk, t)
err := obj.HealObjects(context.Background(), "bucket", "prefix", nil)
err := obj.HealObjects(context.Background(), "bucket", "prefix", madmin.HealOpts{}, nil)
if err == nil || !isSameType(err, NotImplemented{}) {
t.Fatalf("Heal Object should return NotImplemented error ")
}

View file

@ -167,7 +167,7 @@ func (a GatewayUnsupported) ListBucketsHeal(ctx context.Context) (buckets []Buck
}
// HealObject - Not implemented stub
func (a GatewayUnsupported) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (h madmin.HealResultItem, e error) {
func (a GatewayUnsupported) HealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (h madmin.HealResultItem, e error) {
return h, NotImplemented{}
}
@ -182,7 +182,7 @@ func (a GatewayUnsupported) Walk(ctx context.Context, bucket, prefix string, res
}
// HealObjects - Not implemented stub
func (a GatewayUnsupported) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) (e error) {
func (a GatewayUnsupported) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, fn healObjectFn) (e error) {
return NotImplemented{}
}

View file

@ -101,9 +101,8 @@ type ObjectLayer interface {
ReloadFormat(ctx context.Context, dryRun bool) error
HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error)
HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error)
HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (madmin.HealResultItem, error)
HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) error
HealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (madmin.HealResultItem, error)
HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, fn healObjectFn) error
ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error)

View file

@ -1593,8 +1593,8 @@ func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun, remove b
}
// HealObject - heals inconsistent object on a hashedSet based on object name.
func (s *xlSets) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (madmin.HealResultItem, error) {
return s.getHashedSet(object).HealObject(ctx, bucket, object, dryRun, remove, scanMode)
func (s *xlSets) HealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
return s.getHashedSet(object).HealObject(ctx, bucket, object, opts)
}
// Lists all buckets which need healing.
@ -1655,7 +1655,7 @@ func (s *xlSets) Walk(ctx context.Context, bucket, prefix string, results chan<-
// HealObjects - Heal all objects recursively at a specified prefix, any
// dangling objects deleted as well automatically.
func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObject healObjectFn) error {
func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject healObjectFn) error {
endWalkCh := make(chan struct{})
defer close(endWalkCh)
@ -1669,7 +1669,7 @@ func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObj
break
}
if quorumCount == s.drivesPerSet {
if quorumCount == s.drivesPerSet && opts.ScanMode == madmin.HealNormalScan {
// Skip good entries.
continue
}

View file

@ -695,7 +695,7 @@ func isObjectDangling(metaArr []xlMetaV1, errs []error, dataErrs []error) (valid
}
// HealObject - heal the given object, automatically deletes the object if stale/corrupted if `remove` is true.
func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRun bool, remove bool, scanMode madmin.HealScanMode) (hr madmin.HealResultItem, err error) {
func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (hr madmin.HealResultItem, err error) {
// Create context that also contains information about the object and bucket.
// The top level handler might not have this information.
reqInfo := logger.GetReqInfo(ctx)
@ -709,7 +709,7 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRu
// Healing directories handle it separately.
if HasSuffix(object, SlashSeparator) {
return xl.healObjectDir(healCtx, bucket, object, dryRun)
return xl.healObjectDir(healCtx, bucket, object, opts.DryRun)
}
storageDisks := xl.getDisks()
@ -724,7 +724,7 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRu
if m.Erasure.DataBlocks == 0 {
writeQuorum = len(xl.getDisks())/2 + 1
}
if !dryRun && remove {
if !opts.DryRun && opts.Remove {
xl.deleteObject(healCtx, bucket, object, writeQuorum, false)
}
err = reduceReadQuorumErrs(ctx, errs, nil, writeQuorum-1)
@ -753,7 +753,7 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRu
if m.Erasure.DataBlocks == 0 {
writeQuorum = len(storageDisks)/2 + 1
}
if !dryRun && remove {
if !opts.DryRun && opts.Remove {
xl.deleteObject(ctx, bucket, object, writeQuorum, false)
}
}
@ -762,5 +762,5 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRu
}
// Heal the object.
return xl.healObject(healCtx, bucket, object, partsMetadata, errs, latestXLMeta, dryRun, remove, scanMode)
return xl.healObject(healCtx, bucket, object, partsMetadata, errs, latestXLMeta, opts.DryRun, opts.Remove, opts.ScanMode)
}

View file

@ -121,7 +121,7 @@ func TestHealObjectCorrupted(t *testing.T) {
t.Fatalf("Failed to delete a file - %v", err)
}
_, err = objLayer.HealObject(context.Background(), bucket, object, false, false, madmin.HealNormalScan)
_, err = objLayer.HealObject(context.Background(), bucket, object, madmin.HealOpts{ScanMode: madmin.HealNormalScan})
if err != nil {
t.Fatalf("Failed to heal object - %v", err)
}
@ -144,7 +144,7 @@ func TestHealObjectCorrupted(t *testing.T) {
if err != nil {
t.Errorf("Failure during creating part.1 - %v", err)
}
_, err = objLayer.HealObject(context.Background(), bucket, object, false, true, madmin.HealDeepScan)
_, err = objLayer.HealObject(context.Background(), bucket, object, madmin.HealOpts{DryRun: false, Remove: true, ScanMode: madmin.HealDeepScan})
if err != nil {
t.Errorf("Expected nil but received %v", err)
}
@ -170,7 +170,7 @@ func TestHealObjectCorrupted(t *testing.T) {
if err != nil {
t.Errorf("Failure during creating part.1 - %v", err)
}
_, err = objLayer.HealObject(context.Background(), bucket, object, false, true, madmin.HealDeepScan)
_, err = objLayer.HealObject(context.Background(), bucket, object, madmin.HealOpts{DryRun: false, Remove: true, ScanMode: madmin.HealDeepScan})
if err != nil {
t.Errorf("Expected nil but received %v", err)
}
@ -190,7 +190,7 @@ func TestHealObjectCorrupted(t *testing.T) {
}
// Try healing now, expect to receive errFileNotFound.
_, err = objLayer.HealObject(context.Background(), bucket, object, false, true, madmin.HealDeepScan)
_, err = objLayer.HealObject(context.Background(), bucket, object, madmin.HealOpts{DryRun: false, Remove: true, ScanMode: madmin.HealDeepScan})
if err != nil {
if _, ok := err.(ObjectNotFound); !ok {
t.Errorf("Expect %v but received %v", ObjectNotFound{Bucket: bucket, Object: object}, err)
@ -263,7 +263,7 @@ func TestHealObjectXL(t *testing.T) {
t.Fatalf("Failed to delete a file - %v", err)
}
_, err = obj.HealObject(context.Background(), bucket, object, false, false, madmin.HealNormalScan)
_, err = obj.HealObject(context.Background(), bucket, object, madmin.HealOpts{ScanMode: madmin.HealNormalScan})
if err != nil {
t.Fatalf("Failed to heal object - %v", err)
}
@ -285,7 +285,7 @@ func TestHealObjectXL(t *testing.T) {
z.zones[0].xlDisksMu.Unlock()
// Try healing now, expect to receive errDiskNotFound.
_, err = obj.HealObject(context.Background(), bucket, object, false, false, madmin.HealDeepScan)
_, err = obj.HealObject(context.Background(), bucket, object, madmin.HealOpts{ScanMode: madmin.HealDeepScan})
// since majority of xl.jsons are not available, object quorum can't be read properly and error will be errXLReadQuorum
if _, ok := err.(InsufficientReadQuorum); !ok {
t.Errorf("Expected %v but received %v", InsufficientReadQuorum{}, err)
@ -333,7 +333,7 @@ func TestHealEmptyDirectoryXL(t *testing.T) {
}
// Heal the object
hr, err := obj.HealObject(context.Background(), bucket, object, false, false, madmin.HealNormalScan)
hr, err := obj.HealObject(context.Background(), bucket, object, madmin.HealOpts{ScanMode: madmin.HealNormalScan})
if err != nil {
t.Fatalf("Failed to heal object - %v", err)
}
@ -357,7 +357,7 @@ func TestHealEmptyDirectoryXL(t *testing.T) {
}
// Heal the same object again
hr, err = obj.HealObject(context.Background(), bucket, object, false, false, madmin.HealNormalScan)
hr, err = obj.HealObject(context.Background(), bucket, object, madmin.HealOpts{ScanMode: madmin.HealNormalScan})
if err != nil {
t.Fatalf("Failed to heal object - %v", err)
}

View file

@ -20,6 +20,7 @@ import (
"context"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/madmin"
)
// This is not implemented/needed anymore, look for xl-sets.ListBucketHeal()
@ -29,7 +30,7 @@ func (xl xlObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
}
// This is not implemented/needed anymore, look for xl-sets.HealObjects()
func (xl xlObjects) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) error {
func (xl xlObjects) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, fn healObjectFn) error {
logger.LogIf(ctx, NotImplemented{})
return NotImplemented{}
}

View file

@ -414,7 +414,7 @@ func TestHealing(t *testing.T) {
t.Fatal(err)
}
_, err = xl.HealObject(context.Background(), bucket, object, false, false, madmin.HealNormalScan)
_, err = xl.HealObject(context.Background(), bucket, object, madmin.HealOpts{ScanMode: madmin.HealNormalScan})
if err != nil {
t.Fatal(err)
}
@ -443,7 +443,7 @@ func TestHealing(t *testing.T) {
t.Fatal(err)
}
_, err = xl.HealObject(context.Background(), bucket, object, false, false, madmin.HealDeepScan)
_, err = xl.HealObject(context.Background(), bucket, object, madmin.HealOpts{ScanMode: madmin.HealDeepScan})
if err != nil {
t.Fatal(err)
}

View file

@ -1407,7 +1407,7 @@ func (z *xlZones) Walk(ctx context.Context, bucket, prefix string, results chan<
type healObjectFn func(string, string) error
func (z *xlZones) HealObjects(ctx context.Context, bucket, prefix string, healObject healObjectFn) error {
func (z *xlZones) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject healObjectFn) error {
var zonesEntryChs [][]FileInfoCh
endWalkCh := make(chan struct{})
@ -1436,7 +1436,7 @@ func (z *xlZones) HealObjects(ctx context.Context, bucket, prefix string, healOb
break
}
if quorumCount == zoneDrivesPerSet[zoneIndex] {
if quorumCount == zoneDrivesPerSet[zoneIndex] && opts.ScanMode == madmin.HealNormalScan {
// Skip good entries.
continue
}
@ -1452,7 +1452,7 @@ func (z *xlZones) HealObjects(ctx context.Context, bucket, prefix string, healOb
return nil
}
func (z *xlZones) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (madmin.HealResultItem, error) {
func (z *xlZones) HealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
// Lock the object before healing. Use read lock since healing
// will only regenerate parts & xl.json of outdated disks.
objectLock := z.NewNSLock(ctx, bucket, object)
@ -1462,10 +1462,10 @@ func (z *xlZones) HealObject(ctx context.Context, bucket, object string, dryRun,
defer objectLock.RUnlock()
if z.SingleZone() {
return z.zones[0].HealObject(ctx, bucket, object, dryRun, remove, scanMode)
return z.zones[0].HealObject(ctx, bucket, object, opts)
}
for _, zone := range z.zones {
result, err := zone.HealObject(ctx, bucket, object, dryRun, remove, scanMode)
result, err := zone.HealObject(ctx, bucket, object, opts)
if err != nil {
if isErrObjectNotFound(err) {
continue