use partInfo only for intermediate part.x.meta (#15353)

This commit is contained in:
Harshavardhana 2022-07-19 18:56:24 -07:00 committed by GitHub
parent cae9aeca00
commit ce8397f7d9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 103 additions and 72 deletions

View file

@ -500,6 +500,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
partSize := latestMeta.Parts[partIndex].Size
partActualSize := latestMeta.Parts[partIndex].ActualSize
partModTime := latestMeta.Parts[partIndex].ModTime
partNumber := latestMeta.Parts[partIndex].Number
partIdx := latestMeta.Parts[partIndex].Index
tillOffset := erasure.ShardFileOffset(0, partSize, partSize)
@ -551,7 +552,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
}
partsMetadata[i].DataDir = dstDataDir
partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize, partIdx)
partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize, partModTime, partIdx)
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
PartNumber: partNumber,
Algorithm: checksumAlgo,

View file

@ -237,12 +237,13 @@ func objectPartIndex(parts []ObjectPartInfo, partNumber int) int {
}
// AddObjectPart - add a new object part in order.
func (fi *FileInfo) AddObjectPart(partNumber int, partETag string, partSize, actualSize int64, idx []byte) {
func (fi *FileInfo) AddObjectPart(partNumber int, partETag string, partSize, actualSize int64, modTime time.Time, idx []byte) {
partInfo := ObjectPartInfo{
Number: partNumber,
ETag: partETag,
Size: partSize,
ActualSize: actualSize,
ModTime: modTime,
Index: idx,
}

View file

@ -58,7 +58,7 @@ func TestAddObjectPart(t *testing.T) {
for _, testCase := range testCases {
if testCase.expectedIndex > -1 {
partNumString := strconv.Itoa(testCase.partNum)
fi.AddObjectPart(testCase.partNum, "etag."+partNumString, int64(testCase.partNum+humanize.MiByte), ActualSize, nil)
fi.AddObjectPart(testCase.partNum, "etag."+partNumString, int64(testCase.partNum+humanize.MiByte), ActualSize, UTCNow(), nil)
}
if index := objectPartIndex(fi.Parts, testCase.partNum); index != testCase.expectedIndex {
@ -91,7 +91,7 @@ func TestObjectPartIndex(t *testing.T) {
// Add some parts for testing.
for _, testCase := range testCases {
partNumString := strconv.Itoa(testCase.partNum)
fi.AddObjectPart(testCase.partNum, "etag."+partNumString, int64(testCase.partNum+humanize.MiByte), ActualSize, nil)
fi.AddObjectPart(testCase.partNum, "etag."+partNumString, int64(testCase.partNum+humanize.MiByte), ActualSize, UTCNow(), nil)
}
// Add failure test case.
@ -121,7 +121,7 @@ func TestObjectToPartOffset(t *testing.T) {
// Total size of all parts is 5,242,899 bytes.
for _, partNum := range []int{1, 2, 4, 5, 7} {
partNumString := strconv.Itoa(partNum)
fi.AddObjectPart(partNum, "etag."+partNumString, int64(partNum+humanize.MiByte), ActualSize, nil)
fi.AddObjectPart(partNum, "etag."+partNumString, int64(partNum+humanize.MiByte), ActualSize, UTCNow(), nil)
}
testCases := []struct {
@ -160,7 +160,7 @@ func TestObjectToPartOffset(t *testing.T) {
func TestFindFileInfoInQuorum(t *testing.T) {
getNFInfo := func(n int, quorum int, t int64, dataDir string) []FileInfo {
fi := newFileInfo("test", 8, 8)
fi.AddObjectPart(1, "etag", 100, 100, nil)
fi.AddObjectPart(1, "etag", 100, 100, UTCNow(), nil)
fi.ModTime = time.Unix(t, 0)
fi.DataDir = dataDir
fis := make([]FileInfo, n)

View file

@ -691,36 +691,38 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
md5hex = opts.PreserveETag
}
// Once part is successfully committed, proceed with saving erasure metadata for part.
fi.ModTime = UTCNow()
var index []byte
if opts.IndexCB != nil {
index = opts.IndexCB()
}
// Add the current part.
fi.AddObjectPart(partID, md5hex, n, data.ActualSize(), index)
part := ObjectPartInfo{
Number: partID,
ETag: md5hex,
Size: n,
ActualSize: data.ActualSize(),
ModTime: UTCNow(),
Index: index,
}
// Save part info as partPath+".meta"
fiMsg, err := fi.MarshalMsg(nil)
partMsg, err := part.MarshalMsg(nil)
if err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
}
// Write part metadata to all disks.
onlineDisks, err = writeAllDisks(ctx, onlineDisks, minioMetaMultipartBucket, partPath+".meta", fiMsg, writeQuorum)
onlineDisks, err = writeAllDisks(ctx, onlineDisks, minioMetaMultipartBucket, partPath+".meta", partMsg, writeQuorum)
if err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
}
// Return success.
return PartInfo{
PartNumber: partID,
ETag: md5hex,
LastModified: fi.ModTime,
Size: n,
ActualSize: data.ActualSize(),
PartNumber: part.Number,
ETag: part.ETag,
LastModified: part.ModTime,
Size: part.Size,
ActualSize: part.ActualSize,
}, nil
}
@ -864,26 +866,26 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
maxParts = maxPartsList
}
var partFI FileInfo
var partI ObjectPartInfo
for i, part := range partInfoFiles {
if part.Error != "" || !part.Exists {
continue
}
_, err := partFI.UnmarshalMsg(part.Data)
_, err := partI.UnmarshalMsg(part.Data)
if err != nil {
// Maybe crash or similar.
logger.LogIf(ctx, err)
continue
}
if len(partFI.Parts) != 1 {
logger.LogIf(ctx, fmt.Errorf("unexpected part count: %d", len(partFI.Parts)))
if i+1 != partI.Number {
logger.LogIf(ctx, fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", i+1, i+1, partI.Number))
continue
}
addPart := partFI.Parts[0]
// Add the current part.
fi.AddObjectPart(i+1, addPart.ETag, addPart.Size, addPart.ActualSize, addPart.Index)
fi.AddObjectPart(partI.Number, partI.ETag, partI.Size, partI.ActualSize, partI.ModTime, partI.Index)
}
// Only parts with higher part numbers will be listed.
@ -893,11 +895,13 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
parts = fi.Parts[partIdx+1:]
}
count := maxParts
result.Parts = make([]PartInfo, 0, len(parts))
for _, part := range parts {
result.Parts = append(result.Parts, PartInfo{
PartNumber: part.Number,
ETag: part.ETag,
LastModified: fi.ModTime,
LastModified: part.ModTime,
ActualSize: part.ActualSize,
Size: part.Size,
})
count--
@ -971,6 +975,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
Bucket: minioMetaMultipartBucket,
Prefix: partPath,
MaxSize: 1 << 20, // Each part should realistically not be > 1MiB.
Files: make([]string, 0, len(parts)),
}
for _, part := range parts {
req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", part.PartNumber))
@ -986,7 +991,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
return oi, toObjectErr(err, bucket, object)
}
var partFI FileInfo
var partI ObjectPartInfo
for i, part := range partInfoFiles {
partID := parts[i].PartNumber
if part.Error != "" || !part.Exists {
@ -994,7 +999,8 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
PartNumber: partID,
}
}
_, err := partFI.UnmarshalMsg(part.Data)
_, err := partI.UnmarshalMsg(part.Data)
if err != nil {
// Maybe crash or similar.
logger.LogIf(ctx, err)
@ -1002,15 +1008,16 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
PartNumber: partID,
}
}
if len(partFI.Parts) != 1 {
logger.LogIf(ctx, fmt.Errorf("unexpected part count: %d", len(partFI.Parts)))
if partID != partI.Number {
logger.LogIf(ctx, fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", partID, partID, partI.Number))
return oi, InvalidPart{
PartNumber: partID,
}
}
addPart := partFI.Parts[0]
// Add the current part.
fi.AddObjectPart(partID, addPart.ETag, addPart.Size, addPart.ActualSize, addPart.Index)
fi.AddObjectPart(partI.Number, partI.ETag, partI.Size, partI.ActualSize, partI.ModTime, partI.Index)
}
// Calculate full object size.
@ -1072,6 +1079,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
Number: part.PartNumber,
Size: currentFI.Parts[partIdx].Size,
ActualSize: currentFI.Parts[partIdx].ActualSize,
ModTime: currentFI.Parts[partIdx].ModTime,
Index: currentFI.Parts[partIdx].Index,
}
}

View file

@ -820,6 +820,8 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *
index = opts.IndexCB()
}
modTime := UTCNow()
for i, w := range writers {
if w == nil {
// Make sure to avoid writing to disks which we couldn't complete in erasure.Encode()
@ -827,7 +829,7 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *
continue
}
partsMetadata[i].Data = inlineBuffers[i].Bytes()
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize(), index)
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize(), modTime, index)
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
PartNumber: 1,
Algorithm: DefaultBitrotAlgorithm,
@ -835,8 +837,6 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *
})
}
modTime := UTCNow()
// Fill all the necessary metadata.
// Update `xl.meta` content on each disks.
for index := range partsMetadata {
@ -1082,6 +1082,11 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
defer lk.Unlock(lkctx.Cancel)
}
modTime := opts.MTime
if opts.MTime.IsZero() {
modTime = UTCNow()
}
for i, w := range writers {
if w == nil {
onlineDisks[i] = nil
@ -1092,7 +1097,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
} else {
partsMetadata[i].Data = nil
}
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize(), compIndex)
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize(), modTime, compIndex)
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
PartNumber: 1,
Algorithm: DefaultBitrotAlgorithm,
@ -1112,11 +1117,6 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
userDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object))
}
modTime := opts.MTime
if opts.MTime.IsZero() {
modTime = UTCNow()
}
// Fill all the necessary metadata.
// Update `xl.meta` content on each disks.
for index := range partsMetadata {

View file

@ -854,6 +854,8 @@ func (es *erasureSingle) putMetacacheObject(ctx context.Context, key string, r *
index = opts.IndexCB()
}
modTime := UTCNow()
for i, w := range writers {
if w == nil {
// Make sure to avoid writing to disks which we couldn't complete in erasure.Encode()
@ -861,7 +863,7 @@ func (es *erasureSingle) putMetacacheObject(ctx context.Context, key string, r *
continue
}
partsMetadata[i].Data = inlineBuffers[i].Bytes()
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize(), index)
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize(), modTime, index)
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
PartNumber: 1,
Algorithm: DefaultBitrotAlgorithm,
@ -869,8 +871,6 @@ func (es *erasureSingle) putMetacacheObject(ctx context.Context, key string, r *
})
}
modTime := UTCNow()
// Fill all the necessary metadata.
// Update `xl.meta` content on each disks.
for index := range partsMetadata {
@ -1092,6 +1092,11 @@ func (es *erasureSingle) putObject(ctx context.Context, bucket string, object st
index = opts.IndexCB()
}
modTime := opts.MTime
if opts.MTime.IsZero() {
modTime = UTCNow()
}
for i, w := range writers {
if w == nil {
onlineDisks[i] = nil
@ -1102,7 +1107,7 @@ func (es *erasureSingle) putObject(ctx context.Context, bucket string, object st
} else {
partsMetadata[i].Data = nil
}
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize(), index)
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize(), modTime, index)
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
PartNumber: 1,
Algorithm: DefaultBitrotAlgorithm,
@ -1118,11 +1123,6 @@ func (es *erasureSingle) putObject(ctx context.Context, bucket string, object st
opts.UserDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object))
}
modTime := opts.MTime
if opts.MTime.IsZero() {
modTime = UTCNow()
}
// Fill all the necessary metadata.
// Update `xl.meta` content on each disks.
for index := range partsMetadata {
@ -2385,7 +2385,7 @@ func (es *erasureSingle) PutObjectPart(ctx context.Context, bucket, object, uplo
}
// Add the current part.
fi.AddObjectPart(partID, md5hex, n, data.ActualSize(), index)
fi.AddObjectPart(partID, md5hex, n, data.ActualSize(), fi.ModTime, index)
for i, disk := range onlineDisks {
if disk == OfflineDisk {

View file

@ -820,12 +820,7 @@ func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *ht
}
for i, p := range listPartsInfo.Parts {
listPartsInfo.Parts[i].ETag = tryDecryptETag(objectEncryptionKey, p.ETag, kind != crypto.S3)
size, err := sio.DecryptedSize(uint64(p.Size))
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
listPartsInfo.Parts[i].Size = int64(size)
listPartsInfo.Parts[i].Size = p.ActualSize
}
}

View file

@ -128,11 +128,12 @@ const (
// ObjectPartInfo Info of each part kept in the multipart metadata
// file after CompleteMultipartUpload() is called.
type ObjectPartInfo struct {
ETag string `json:"etag,omitempty"`
Number int `json:"number"`
Size int64 `json:"size"`
ActualSize int64 `json:"actualSize"`
Index []byte `json:"index,omitempty" msg:"index,omitempty"`
ETag string `json:"etag,omitempty"`
Number int `json:"number"`
Size int64 `json:"size"` // Size of the part on the disk.
ActualSize int64 `json:"actualSize"` // Original size of the part without compression or encryption bytes.
ModTime time.Time `json:"modTime"` // Date and time at which the part was uploaded.
Index []byte `json:"index,omitempty" msg:"index,omitempty"`
}
// ChecksumInfo - carries checksums of individual scattered parts per disk.

View file

@ -593,6 +593,12 @@ func (z *ObjectPartInfo) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "ActualSize")
return
}
case "ModTime":
z.ModTime, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "ModTime")
return
}
case "index":
z.Index, err = dc.ReadBytes(z.Index)
if err != nil {
@ -613,11 +619,11 @@ func (z *ObjectPartInfo) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *ObjectPartInfo) EncodeMsg(en *msgp.Writer) (err error) {
// omitempty: check for empty values
zb0001Len := uint32(5)
var zb0001Mask uint8 /* 5 bits */
zb0001Len := uint32(6)
var zb0001Mask uint8 /* 6 bits */
if z.Index == nil {
zb0001Len--
zb0001Mask |= 0x10
zb0001Mask |= 0x20
}
// variable map header, size zb0001Len
err = en.Append(0x80 | uint8(zb0001Len))
@ -667,7 +673,17 @@ func (z *ObjectPartInfo) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "ActualSize")
return
}
if (zb0001Mask & 0x10) == 0 { // if not empty
// write "ModTime"
err = en.Append(0xa7, 0x4d, 0x6f, 0x64, 0x54, 0x69, 0x6d, 0x65)
if err != nil {
return
}
err = en.WriteTime(z.ModTime)
if err != nil {
err = msgp.WrapError(err, "ModTime")
return
}
if (zb0001Mask & 0x20) == 0 { // if not empty
// write "index"
err = en.Append(0xa5, 0x69, 0x6e, 0x64, 0x65, 0x78)
if err != nil {
@ -686,11 +702,11 @@ func (z *ObjectPartInfo) EncodeMsg(en *msgp.Writer) (err error) {
func (z *ObjectPartInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// omitempty: check for empty values
zb0001Len := uint32(5)
var zb0001Mask uint8 /* 5 bits */
zb0001Len := uint32(6)
var zb0001Mask uint8 /* 6 bits */
if z.Index == nil {
zb0001Len--
zb0001Mask |= 0x10
zb0001Mask |= 0x20
}
// variable map header, size zb0001Len
o = append(o, 0x80|uint8(zb0001Len))
@ -709,7 +725,10 @@ func (z *ObjectPartInfo) MarshalMsg(b []byte) (o []byte, err error) {
// string "ActualSize"
o = append(o, 0xaa, 0x41, 0x63, 0x74, 0x75, 0x61, 0x6c, 0x53, 0x69, 0x7a, 0x65)
o = msgp.AppendInt64(o, z.ActualSize)
if (zb0001Mask & 0x10) == 0 { // if not empty
// string "ModTime"
o = append(o, 0xa7, 0x4d, 0x6f, 0x64, 0x54, 0x69, 0x6d, 0x65)
o = msgp.AppendTime(o, z.ModTime)
if (zb0001Mask & 0x20) == 0 { // if not empty
// string "index"
o = append(o, 0xa5, 0x69, 0x6e, 0x64, 0x65, 0x78)
o = msgp.AppendBytes(o, z.Index)
@ -759,6 +778,12 @@ func (z *ObjectPartInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "ActualSize")
return
}
case "ModTime":
z.ModTime, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "ModTime")
return
}
case "index":
z.Index, bts, err = msgp.ReadBytesBytes(bts, z.Index)
if err != nil {
@ -779,7 +804,7 @@ func (z *ObjectPartInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *ObjectPartInfo) Msgsize() (s int) {
s = 1 + 5 + msgp.StringPrefixSize + len(z.ETag) + 7 + msgp.IntSize + 5 + msgp.Int64Size + 11 + msgp.Int64Size + 6 + msgp.BytesPrefixSize + len(z.Index)
s = 1 + 5 + msgp.StringPrefixSize + len(z.ETag) + 7 + msgp.IntSize + 5 + msgp.Int64Size + 11 + msgp.Int64Size + 8 + msgp.TimeSize + 6 + msgp.BytesPrefixSize + len(z.Index)
return
}

View file

@ -571,7 +571,7 @@ func (j xlMetaV2Object) ToFileInfo(volume, path string) (FileInfo, error) {
for i := range fi.Parts {
fi.Parts[i].Number = j.PartNumbers[i]
fi.Parts[i].Size = j.PartSizes[i]
if len(j.PartETags) > 0 {
if len(j.PartETags) == len(fi.Parts) {
fi.Parts[i].ETag = j.PartETags[i]
}
fi.Parts[i].ActualSize = j.PartActualSizes[i]