diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 9374557ff..38fdc0ce3 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -1211,13 +1211,17 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str } // Rename the multipart object to final location. - if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, - partsMetadata, bucket, object, writeQuorum); err != nil { + onlineDisks, versionsDisparity, err := renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, + partsMetadata, bucket, object, writeQuorum) + if err != nil { return oi, toObjectErr(err, bucket, object) } - defer NSUpdated(bucket, object) + if !opts.Speedtest && versionsDisparity { + listAndHeal(ctx, bucket, object, &er, healObjectVersionsDisparity) + } + // Check if there is any offline disk and add it to the MRF list for _, disk := range onlineDisks { if disk != nil && disk.IsOnline() { diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 3cf0626a3..b09f3ad72 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -729,7 +729,7 @@ func (er erasureObjects) getObjectInfoAndQuorum(ctx context.Context, bucket, obj } // Similar to rename but renames data from srcEntry to dstEntry at dataDir -func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry string, metadata []FileInfo, dstBucket, dstEntry string, writeQuorum int) ([]StorageAPI, error) { +func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry string, metadata []FileInfo, dstBucket, dstEntry string, writeQuorum int) ([]StorageAPI, bool, error) { g := errgroup.WithNErrs(len(disks)) fvID := mustGetUUID() @@ -768,20 +768,22 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str // Wait for all renames to finish. errs := g.Wait() + var versionsDisparity bool + err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) if err == nil { versions := reduceCommonVersions(diskVersions, writeQuorum) - for index, dversions := range diskVersions { + for _, dversions := range diskVersions { if versions != dversions { - errs[index] = errFileNeedsHealing + versionsDisparity = true + break } } - err = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) } // We can safely allow RenameData errors up to len(er.getDisks()) - writeQuorum // otherwise return failure. - return evalDisks(disks, errs), err + return evalDisks(disks, errs), versionsDisparity, err } func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { @@ -934,6 +936,47 @@ func (er erasureObjects) PutObject(ctx context.Context, bucket string, object st return er.putObject(ctx, bucket, object, data, opts) } +// Heal up to two versions of one object when there is disparity between disks +func healObjectVersionsDisparity(bucket string, entry metaCacheEntry) error { + if entry.isDir() { + return nil + } + // We might land at .metacache, .trash, .multipart + // no need to heal them skip, only when bucket + // is '.minio.sys' + if bucket == minioMetaBucket { + if wildcard.Match("buckets/*/.metacache/*", entry.name) { + return nil + } + if wildcard.Match("tmp/*", entry.name) { + return nil + } + if wildcard.Match("multipart/*", entry.name) { + return nil + } + if wildcard.Match("tmp-old/*", entry.name) { + return nil + } + } + + fivs, err := entry.fileInfoVersions(bucket) + if err != nil { + healObject(bucket, entry.name, "", madmin.HealNormalScan) + return err + } + + if len(fivs.Versions) > 2 { + return fmt.Errorf("bucket(%s)/object(%s) object with %d versions needs healing, allowing lazy healing via scanner instead here for versions greater than 2", + bucket, entry.name, len(fivs.Versions)) + } + + for _, version := range fivs.Versions { + healObject(bucket, entry.name, version.VersionID, madmin.HealNormalScan) + } + + return nil +} + // putObject wrapper for erasureObjects PutObject func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { auditObjectErasureSet(ctx, object, &er) @@ -1205,7 +1248,17 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st } // Rename the successfully written temporary object to final location. - onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, bucket, object, writeQuorum) + onlineDisks, versionsDisparity, err := renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, bucket, object, writeQuorum) + if err != nil { + if errors.Is(err, errFileNotFound) { + return ObjectInfo{}, toObjectErr(errErasureWriteQuorum, bucket, object) + } + logger.LogIf(ctx, err) + return ObjectInfo{}, toObjectErr(err, bucket, object) + } + + defer NSUpdated(bucket, object) + for i := 0; i < len(onlineDisks); i++ { if onlineDisks[i] != nil && onlineDisks[i].IsOnline() { // Object info is the same in all disks, so we can pick @@ -1227,63 +1280,12 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st er.addPartial(bucket, object, fi.VersionID, fi.Size) break } - } - healEntry := func(entry metaCacheEntry) error { - if entry.isDir() { - return nil - } - // We might land at .metacache, .trash, .multipart - // no need to heal them skip, only when bucket - // is '.minio.sys' - if bucket == minioMetaBucket { - if wildcard.Match("buckets/*/.metacache/*", entry.name) { - return nil - } - if wildcard.Match("tmp/*", entry.name) { - return nil - } - if wildcard.Match("multipart/*", entry.name) { - return nil - } - if wildcard.Match("tmp-old/*", entry.name) { - return nil - } - } - - fivs, err := entry.fileInfoVersions(bucket) - if err != nil { - healObject(bucket, entry.name, "", madmin.HealNormalScan) - return err - } - - if len(fivs.Versions) > 2 { - return fmt.Errorf("bucket(%s)/object(%s) object with %d versions needs healing, allowing lazy healing via scanner instead here for versions greater than 2", - bucket, entry.name, - len(fivs.Versions)) - } - - for _, version := range fivs.Versions { - healObject(bucket, entry.name, version.VersionID, madmin.HealNormalScan) - } - - return nil - } - - if err != nil { - if errors.Is(err, errFileNotFound) { - return ObjectInfo{}, toObjectErr(errErasureWriteQuorum, bucket, object) - } - if errors.Is(err, errFileNeedsHealing) && !opts.Speedtest { - listAndHeal(ctx, bucket, object, &er, healEntry) - } else { - logger.LogIf(ctx, err) - return ObjectInfo{}, toObjectErr(err, bucket, object) + if versionsDisparity { + listAndHeal(ctx, bucket, object, &er, healObjectVersionsDisparity) } } - defer NSUpdated(bucket, object) - fi.ReplicationState = opts.PutReplicationState() online = countOnlineDisks(onlineDisks) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index eb36a49a9..c8dde437e 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1954,7 +1954,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re // HealObjectFn closure function heals the object. type HealObjectFn func(bucket, object, versionID string) error -func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects, healEntry func(metaCacheEntry) error) error { +func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects, healEntry func(string, metaCacheEntry) error) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -1987,7 +1987,7 @@ func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects minDisks: 1, reportNotFound: false, agreed: func(entry metaCacheEntry) { - if err := healEntry(entry); err != nil { + if err := healEntry(bucket, entry); err != nil { logger.LogIf(ctx, err) cancel() } @@ -2000,7 +2000,7 @@ func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects entry, _ = entries.firstFound() } - if err := healEntry(*entry); err != nil { + if err := healEntry(bucket, *entry); err != nil { logger.LogIf(ctx, err) cancel() return @@ -2017,7 +2017,7 @@ func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects } func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObjectFn HealObjectFn) error { - healEntry := func(entry metaCacheEntry) error { + healEntry := func(bucket string, entry metaCacheEntry) error { if entry.isDir() { return nil } diff --git a/cmd/storage-errors.go b/cmd/storage-errors.go index 7686a29b6..06b56acd0 100644 --- a/cmd/storage-errors.go +++ b/cmd/storage-errors.go @@ -69,9 +69,6 @@ var errTooManyOpenFiles = StorageErr("too many open files, please increase 'ulim // errFileNameTooLong - given file name is too long than supported length. var errFileNameTooLong = StorageErr("file name too long") -// errFileNeedsHealing - given file name needs to heal. -var errFileNeedsHealing = StorageErr("file name needs healing") - // errVolumeExists - cannot create same volume again. var errVolumeExists = StorageErr("volume already exists")