diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index c1328a5bc..9deb9ab99 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -178,7 +178,10 @@ func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, latestMeta File return false } -const xMinIOHealing = ReservedMetadataPrefix + "healing" +const ( + xMinIOHealing = ReservedMetadataPrefix + "healing" + xMinIODataMov = ReservedMetadataPrefix + "data-mov" +) // SetHealing marks object (version) as being healed. // Note: this is to be used only from healObject @@ -196,6 +199,21 @@ func (fi FileInfo) Healing() bool { return ok } +// SetDataMov marks object (version) as being currently +// in movement, such as decommissioning or rebalance. +func (fi *FileInfo) SetDataMov() { + if fi.Metadata == nil { + fi.Metadata = make(map[string]string) + } + fi.Metadata[xMinIODataMov] = "true" +} + +// DataMov returns true if object is being in movement +func (fi FileInfo) DataMov() bool { + _, ok := fi.Metadata[xMinIODataMov] + return ok +} + // Heals an object by re-writing corrupt/missing erasure blocks. func (er *erasureObjects) healObject(ctx context.Context, bucket string, object string, versionID string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) { dryRun := opts.DryRun diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 1f44af1fe..88842d024 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -1244,6 +1244,10 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str // Save the consolidated actual size. fi.Metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10) + if opts.DataMovement { + fi.SetDataMov() + } + // Update all erasure metadata, make sure to not modify fields like // checksum which are different on each disks. for index := range partsMetadata { diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index b911846b9..66435618a 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -1516,13 +1516,12 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st partsMetadata[index].Metadata = userDefined partsMetadata[index].Size = n partsMetadata[index].ModTime = modTime - } - - if len(inlineBuffers) > 0 { - // Set an additional header when data is inlined. - for index := range partsMetadata { + if len(inlineBuffers) > 0 { partsMetadata[index].SetInlineData() } + if opts.DataMovement { + partsMetadata[index].SetDataMov() + } } // Rename the successfully written temporary object to final location. diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 83f35581a..557e8f504 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -653,7 +653,8 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri } } _, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, parts, ObjectOptions{ - MTime: objInfo.ModTime, + DataMovement: true, + MTime: objInfo.ModTime, }) if err != nil { err = fmt.Errorf("decommissionObject: CompleteMultipartUpload() %w", err) @@ -665,11 +666,13 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri if err != nil { return fmt.Errorf("decommissionObject: hash.NewReader() %w", err) } + _, err = z.PutObject(ctx, bucket, objInfo.Name, NewPutObjReader(hr), ObjectOptions{ + DataMovement: true, VersionID: objInfo.VersionID, MTime: objInfo.ModTime, UserDefined: objInfo.UserDefined, diff --git a/cmd/erasure-server-pool-rebalance.go b/cmd/erasure-server-pool-rebalance.go index cfcb30148..e0b7fd283 100644 --- a/cmd/erasure-server-pool-rebalance.go +++ b/cmd/erasure-server-pool-rebalance.go @@ -809,7 +809,8 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, } } _, err = z.CompleteMultipartUpload(ctx, bucket, oi.Name, res.UploadID, parts, ObjectOptions{ - MTime: oi.ModTime, + DataMovement: true, + MTime: oi.ModTime, }) if err != nil { err = fmt.Errorf("rebalanceObject: CompleteMultipartUpload() %w", err) @@ -821,11 +822,13 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, if err != nil { return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err) } + _, err = z.PutObject(ctx, bucket, oi.Name, NewPutObjReader(hr), ObjectOptions{ + DataMovement: true, VersionID: oi.VersionID, MTime: oi.ModTime, UserDefined: oi.UserDefined, diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index c8b96dd1e..9a290059d 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -109,6 +109,8 @@ type ObjectOptions struct { // participating in a rebalance operation. Typically set for 'write' operations. SkipRebalancing bool + DataMovement bool // indicates an going decommisionning or rebalacing + PrefixEnabledFn func(prefix string) bool // function which returns true if versioning is enabled on prefix // IndexCB will return any index created but the compression. diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index 8c970235f..4b234b314 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -1638,9 +1638,9 @@ func (x *xlMetaV2) AddVersion(fi FileInfo) error { if len(k) > len(ReservedMetadataPrefixLower) && strings.EqualFold(k[:len(ReservedMetadataPrefixLower)], ReservedMetadataPrefixLower) { // Skip tierFVID, tierFVMarker keys; it's used // only for creating free-version. - // Skip xMinIOHealing, it's used only in RenameData + // Also skip xMinIOHealing, xMinIODataMov as used only in RenameData switch k { - case tierFVIDKey, tierFVMarkerKey, xMinIOHealing: + case tierFVIDKey, tierFVMarkerKey, xMinIOHealing, xMinIODataMov: continue } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 69059aa61..f74374a00 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -2629,7 +2629,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f } diskHealthCheckOK(ctx, err) - if !fi.Versioned && !fi.Healing() { + if !fi.Versioned && !fi.DataMov() && !fi.Healing() { // Use https://man7.org/linux/man-pages/man2/rename.2.html if possible on unversioned bucket. if err := Rename2(pathutil.Join(srcVolumeDir, srcPath), pathutil.Join(dstVolumeDir, dstPath)); err == nil { return sign, nil