fix: deleteVersions causing xl.meta to have empty Versions[] slice (#14483)

This is a side-affect of the optimization done in PR #13544 which
causes a certain type of delete operations on given object versions
can cause lastVersion indication to be skipped, which leads to
an `xl.meta` where Versions[] slice is empty while the entire
file is intact by itself.

This PR tries to ensure that such files are visible and deletable
by regular means of listing as null 'delete-marker' and also
avoid the situation where this potential issue might arise.
This commit is contained in:
Harshavardhana 2022-03-04 20:01:26 -08:00 committed by GitHub
parent bbc914e174
commit b0c84e3de7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 164 additions and 47 deletions

View file

@ -23,6 +23,7 @@ import (
"crypto/subtle"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"io"
@ -2427,7 +2428,9 @@ func (a adminAPIHandlers) InspectDataHandler(w http.ResponseWriter, r *http.Requ
}
return nil
})
logger.LogIf(ctx, err)
if !errors.Is(err, errFileNotFound) {
logger.LogIf(ctx, err)
}
}
func createHostAnonymizerForFSMode() map[string]string {

View file

@ -83,7 +83,10 @@ func commonTime(modTimes []time.Time) (modTime time.Time) {
}
// Beginning of unix time is treated as sentinel value here.
var timeSentinel = time.Unix(0, 0).UTC()
var (
timeSentinel = time.Unix(0, 0).UTC()
timeSentinel1970 = time.Unix(0, 1).UTC() // 1970 used for special cases when xlmeta.version == 0
)
// Boot modTimes up to disk count, setting the value to time sentinel.
func bootModtimes(diskCount int) []time.Time {

View file

@ -128,6 +128,86 @@ func TestErasureDeleteObjectBasic(t *testing.T) {
removeRoots(fsDirs)
}
func TestDeleteObjectsVersioned(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
obj, fsDirs, err := prepareErasure(ctx, 16)
if err != nil {
t.Fatal("Unable to initialize 'Erasure' object layer.", err)
}
// Remove all dirs.
for _, dir := range fsDirs {
defer os.RemoveAll(dir)
}
type testCaseType struct {
bucket string
object string
}
bucketName := "bucket"
testCases := []testCaseType{
{bucketName, "dir/obj1"},
{bucketName, "dir/obj1"},
}
err = obj.MakeBucketWithLocation(ctx, bucketName, BucketOptions{
VersioningEnabled: true,
})
if err != nil {
t.Fatal(err)
}
names := make([]ObjectToDelete, len(testCases))
for i, testCase := range testCases {
objInfo, err := obj.PutObject(ctx, testCase.bucket, testCase.object,
mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), ObjectOptions{
Versioned: true,
})
if err != nil {
t.Fatalf("Erasure Object upload failed: <ERROR> %s", err)
}
names[i] = ObjectToDelete{
ObjectV: ObjectV{
ObjectName: objInfo.Name,
VersionID: objInfo.VersionID,
},
}
}
names = append(names, ObjectToDelete{
ObjectV: ObjectV{
ObjectName: "dir/obj1",
VersionID: mustGetUUID(), // add a non-existent UUID.
},
})
_, delErrs := obj.DeleteObjects(ctx, bucketName, names, ObjectOptions{
Versioned: true,
})
for i := range delErrs {
if delErrs[i] != nil {
t.Errorf("Failed to remove object `%v` with the error: `%v`", names[i], delErrs[i])
}
}
for i, test := range testCases {
_, statErr := obj.GetObjectInfo(ctx, test.bucket, test.object, ObjectOptions{
VersionID: names[i].ObjectV.VersionID,
})
switch statErr.(type) {
case VersionNotFound:
default:
t.Fatalf("Object %s is not removed", test.bucket+SlashSeparator+test.object)
}
}
if _, err = ioutil.ReadFile(pathJoin(fsDirs[0], bucketName, "dir/obj1", "xl.meta")); err == nil {
t.Fatalf("xl.meta still present after removal")
}
}
func TestErasureDeleteObjectsErasureSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View file

@ -191,6 +191,16 @@ func (e *metaCacheEntry) fileInfo(bucket string) (FileInfo, error) {
}, nil
}
if e.cached != nil {
if len(e.cached.versions) == 0 {
// This special case is needed to handle xlMeta.versions == 0
return FileInfo{
Volume: bucket,
Name: e.name,
Deleted: true,
IsLatest: true,
ModTime: timeSentinel1970,
}, nil
}
return e.cached.ToFileInfo(bucket, e.name, "")
}
return getFileInfo(e.metadata, bucket, e.name, "", false)

View file

@ -18,6 +18,8 @@
package cmd
import (
"errors"
"github.com/zeebo/xxh3"
)
@ -57,7 +59,19 @@ func getAllFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVers
}
versions, err = xlMeta.ListVersions(volume, path)
}
if err != nil || len(versions) == 0 {
if err == nil && len(versions) == 0 {
// This special case is needed to handle len(xlMeta.versions) == 0
versions = []FileInfo{
{
Volume: volume,
Name: path,
Deleted: true,
IsLatest: true,
ModTime: timeSentinel1970,
},
}
}
if err != nil {
return FileInfoVersions{}, err
}
@ -76,11 +90,33 @@ func getFileInfo(xlMetaBuf []byte, volume, path, versionID string, data bool) (F
if buf, data := isIndexedMetaV2(xlMetaBuf); buf != nil {
inData = data
fi, err = buf.ToFileInfo(volume, path, versionID)
if len(buf) != 0 && errors.Is(err, errFileNotFound) {
// This special case is needed to handle len(xlMeta.versions) == 0
return FileInfo{
Volume: volume,
Name: path,
VersionID: versionID,
Deleted: true,
IsLatest: true,
ModTime: timeSentinel1970,
}, nil
}
} else {
var xlMeta xlMetaV2
if err := xlMeta.LoadOrConvert(xlMetaBuf); err != nil {
return FileInfo{}, err
}
if len(xlMeta.versions) == 0 {
// This special case is needed to handle len(xlMeta.versions) == 0
return FileInfo{
Volume: volume,
Name: path,
VersionID: versionID,
Deleted: true,
IsLatest: true,
ModTime: timeSentinel1970,
}, nil
}
inData = xlMeta.data
fi, err = xlMeta.ToFileInfo(volume, path, versionID)
}

View file

@ -1150,7 +1150,7 @@ func (x *xlMetaV2) sortByModTime() {
// DeleteVersion deletes the version specified by version id.
// returns to the caller which dataDir to delete, also
// indicates if this is the last version.
func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, error) {
// This is a situation where versionId is explicitly
// specified as "null", as we do not save "null"
// string it is considered empty. But empty also
@ -1164,7 +1164,7 @@ func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
if fi.VersionID != "" {
uv, err = uuid.Parse(fi.VersionID)
if err != nil {
return "", false, errFileVersionNotFound
return "", errFileVersionNotFound
}
}
@ -1179,7 +1179,7 @@ func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
},
}
if !ventry.Valid() {
return "", false, errors.New("internal error: invalid version entry generated")
return "", errors.New("internal error: invalid version entry generated")
}
}
updateVersion := false
@ -1225,18 +1225,18 @@ func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
case LegacyType:
ver, err := x.getIdx(i)
if err != nil {
return "", false, err
return "", err
}
x.versions = append(x.versions[:i], x.versions[i+1:]...)
if fi.Deleted {
err = x.addVersion(ventry)
}
return ver.ObjectV1.DataDir, len(x.versions) == 0, err
return ver.ObjectV1.DataDir, err
case DeleteType:
if updateVersion {
ver, err := x.getIdx(i)
if err != nil {
return "", false, err
return "", err
}
if len(ver.DeleteMarker.MetaSys) == 0 {
ver.DeleteMarker.MetaSys = make(map[string][]byte)
@ -1258,26 +1258,26 @@ func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
ver.DeleteMarker.MetaSys[k] = []byte(v)
}
err = x.setIdx(i, *ver)
return "", len(x.versions) == 0, err
return "", err
}
var err error
x.versions = append(x.versions[:i], x.versions[i+1:]...)
if fi.MarkDeleted && (fi.VersionPurgeStatus().Empty() || (fi.VersionPurgeStatus() != Complete)) {
err = x.addVersion(ventry)
}
return "", len(x.versions) == 0, err
return "", err
case ObjectType:
if updateVersion && !fi.Deleted {
ver, err := x.getIdx(i)
if err != nil {
return "", false, err
return "", err
}
ver.ObjectV2.MetaSys[VersionPurgeStatusKey] = []byte(fi.ReplicationState.VersionPurgeStatusInternal)
for k, v := range fi.ReplicationState.ResetStatusesMap {
ver.ObjectV2.MetaSys[k] = []byte(v)
}
err = x.setIdx(i, *ver)
return "", len(x.versions) == 0, err
return "", err
}
}
}
@ -1288,7 +1288,7 @@ func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
}
ver, err := x.getIdx(i)
if err != nil {
return "", false, err
return "", err
}
switch {
case fi.ExpireRestored:
@ -1314,16 +1314,16 @@ func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
if x.SharedDataDirCount(ver.ObjectV2.VersionID, ver.ObjectV2.DataDir) > 0 {
// Found that another version references the same dataDir
// we shouldn't remove it, and only remove the version instead
return "", len(x.versions) == 0, nil
return "", nil
}
return uuid.UUID(ver.ObjectV2.DataDir).String(), len(x.versions) == 0, err
return uuid.UUID(ver.ObjectV2.DataDir).String(), err
}
if fi.Deleted {
err = x.addVersion(ventry)
return "", false, err
return "", err
}
return "", false, errFileVersionNotFound
return "", errFileVersionNotFound
}
// xlMetaDataDirDecoder is a shallow decoder for decoding object datadir only.

View file

@ -366,7 +366,7 @@ func TestDeleteVersionWithSharedDataDir(t *testing.T) {
count := len(testCases)
for i := 4; i < len(testCases); i++ {
tc := testCases[i]
dataDir, _, err := xl.DeleteVersion(fileInfos[i])
dataDir, err := xl.DeleteVersion(fileInfos[i])
failOnErr(count+1, err)
if dataDir != tc.expectedDataDir {
t.Fatalf("Expected %s but got %s", tc.expectedDataDir, dataDir)

View file

@ -433,7 +433,7 @@ func BenchmarkXlMetaV2Shallow(b *testing.B) {
// Update a random version.
fi.VersionID = ids[rng.Intn(size)]
// Delete...
_, _, err = xl.DeleteVersion(fi)
_, err = xl.DeleteVersion(fi)
if err != nil {
b.Fatal(err)
}

View file

@ -102,7 +102,7 @@ func TestFreeVersion(t *testing.T) {
tierfi.TransitionedObjName = mustGetUUID()
tierfi.TransitionTier = "MINIOTIER-1"
var err error
_, _, err = xl.DeleteVersion(tierfi)
_, err = xl.DeleteVersion(tierfi)
fatalErr(err)
report()
@ -124,7 +124,7 @@ func TestFreeVersion(t *testing.T) {
newtierfi.TransitionStatus = ""
newtierfi.SetTierFreeVersionID(fvIDs[1])
report()
_, _, err = xl.DeleteVersion(newtierfi)
_, err = xl.DeleteVersion(newtierfi)
report()
fatalErr(err)
@ -141,7 +141,7 @@ func TestFreeVersion(t *testing.T) {
freefi := newtierfi
for _, fvID := range fvIDs {
freefi.VersionID = fvID
_, _, err = xl.DeleteVersion(freefi)
_, err = xl.DeleteVersion(freefi)
fatalErr(err)
}
report()

View file

@ -880,22 +880,15 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
return err
}
var (
dataDir string
lastVersion bool
updated bool
)
for _, fi := range fis {
dataDir, lastVersion, err = xlMeta.DeleteVersion(fi)
dataDir, err := xlMeta.DeleteVersion(fi)
if err != nil {
if !fi.Deleted && (err == errFileNotFound || err == errFileVersionNotFound) {
// Ignore these since
// Ignore these since they do not exist
continue
}
return err
}
updated = true
if dataDir != "" {
versionID := fi.VersionID
if versionID == "" {
@ -918,11 +911,8 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
}
}
lastVersion := len(xlMeta.versions) == 0
if !lastVersion {
if !updated {
return nil
}
buf, err = xlMeta.AppendTo(metaDataPoolGet())
defer metaDataPoolPut(buf)
if err != nil {
@ -933,12 +923,7 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
}
// Move xl.meta to trash
filePath := pathJoin(volumeDir, path, xlStorageFormatFile)
if err = checkPathLength(filePath); err != nil {
return err
}
err = s.moveToTrash(filePath, false)
err = s.moveToTrash(pathJoin(volumeDir, path, xlStorageFormatFile), false)
if err == nil || err == errFileNotFound {
s.deleteFile(volumeDir, pathJoin(volumeDir, path), false)
}
@ -1021,7 +1006,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
return err
}
dataDir, lastVersion, err := xlMeta.DeleteVersion(fi)
dataDir, err := xlMeta.DeleteVersion(fi)
if err != nil {
return err
}
@ -1046,7 +1031,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
}
}
if !lastVersion {
if len(xlMeta.versions) != 0 {
buf, err = xlMeta.AppendTo(metaDataPoolGet())
defer metaDataPoolPut(buf)
if err != nil {

View file

@ -7,9 +7,9 @@ x-minio-common: &minio-common
expose:
- "9000"
- "9001"
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: minio123
# environment:
# MINIO_ROOT_USER: minioadmin
# MINIO_ROOT_PASSWORD: minioadmin
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s