Indicate RenameData is called by healObject (#16997)

This commit is contained in:
Krishnan Parthasarathi 2023-04-09 10:25:37 -07:00 committed by GitHub
parent 1f1c267b6c
commit 25f7a8e406
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 76 additions and 2 deletions

View file

@ -294,6 +294,26 @@ func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, latestMeta File
return false
}
const xMinIOHealing = ReservedMetadataPrefix + "healing"
// SetHealing marks object (version) as being healed.
// Note: this is to be used only from healObject
func (fi *FileInfo) SetHealing() {
if fi.Metadata == nil {
fi.Metadata = make(map[string]string)
}
fi.Metadata[xMinIOHealing] = "true"
}
// Healing returns true if object is being healed (i.e fi is being passed down
// from healObject)
func (fi FileInfo) Healing() bool {
if _, ok := fi.Metadata[xMinIOHealing]; ok {
return true
}
return false
}
// Heals an object by re-writing corrupt/missing erasure blocks.
func (er *erasureObjects) healObject(ctx context.Context, bucket string, object string, versionID string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) {
dryRun := opts.DryRun
@ -664,6 +684,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
partsMetadata[i].Erasure.Index = i + 1
// Attempt a rename now from healed data to final location.
partsMetadata[i].SetHealing()
if _, err = disk.RenameData(ctx, minioMetaTmpBucket, tmpID, partsMetadata[i], bucket, object); err != nil {
logger.LogIf(ctx, err)
return result, err

View file

@ -1583,8 +1583,9 @@ func (x *xlMetaV2) AddVersion(fi FileInfo) error {
if len(k) > len(ReservedMetadataPrefixLower) && strings.EqualFold(k[:len(ReservedMetadataPrefixLower)], ReservedMetadataPrefixLower) {
// Skip tierFVID, tierFVMarker keys; it's used
// only for creating free-version.
// Skip xMinIOHealing, it's used only in RenameData
switch k {
case tierFVIDKey, tierFVMarkerKey:
case tierFVIDKey, tierFVMarkerKey, xMinIOHealing:
continue
}

View file

@ -1010,3 +1010,51 @@ func Test_mergeXLV2Versions2(t *testing.T) {
})
}
}
func TestXMinIOHealingSkip(t *testing.T) {
xl := xlMetaV2{}
failOnErr := func(err error) {
t.Helper()
if err != nil {
t.Fatalf("Test failed with %v", err)
}
}
fi := FileInfo{
Volume: "volume",
Name: "object-name",
VersionID: "756100c6-b393-4981-928a-d49bbc164741",
IsLatest: true,
Deleted: false,
ModTime: time.Now(),
Size: 1 << 10,
Mode: 0,
Erasure: ErasureInfo{
Algorithm: ReedSolomon.String(),
DataBlocks: 4,
ParityBlocks: 2,
BlockSize: 10000,
Index: 1,
Distribution: []int{1, 2, 3, 4, 5, 6, 7, 8},
Checksums: []ChecksumInfo{{
PartNumber: 1,
Algorithm: HighwayHash256S,
Hash: nil,
}},
},
NumVersions: 1,
}
fi.SetHealing()
failOnErr(xl.AddVersion(fi))
var err error
fi, err = xl.ToFileInfo(fi.Volume, fi.Name, fi.VersionID, false)
if err != nil {
t.Fatalf("xl.ToFileInfo failed with %v", err)
}
if fi.Healing() {
t.Fatal("Expected fi.Healing to be false")
}
}

View file

@ -2357,7 +2357,11 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
// suspended or disabled on this bucket. RenameData will replace
// the 'null' version. We add a free-version to track its tiered
// content for asynchronous deletion.
if fi.VersionID == "" && !fi.IsRestoreObjReq() {
//
// Note: RestoreObject and HealObject requests don't end up replacing the
// null version and therefore don't require the free-version to track
// anything
if fi.VersionID == "" && !fi.IsRestoreObjReq() && !fi.Healing() {
// Note: Restore object request reuses PutObject/Multipart
// upload to copy back its data from the remote tier. This
// doesn't replace the existing version, so we don't need to add