diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index f03f2acfc..3175aae13 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -341,8 +341,10 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID // Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete. defer xl.deleteObject(ctx, minioMetaTmpBucket, tmpPart, writeQuorum, false) - if data.Size() > 0 || data.Size() == -1 { - if pErr := xl.prepareFile(ctx, minioMetaTmpBucket, tmpPartPath, data.Size(), onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, writeQuorum); pErr != nil { + + if data.Size() >= 0 { + if pErr := xl.prepareFile(ctx, minioMetaTmpBucket, tmpPartPath, data.Size(), + onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, writeQuorum); pErr != nil { return pi, toObjectErr(pErr, bucket, object) } @@ -369,6 +371,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID if len(buffer) > int(xlMeta.Erasure.BlockSize) { buffer = buffer[:xlMeta.Erasure.BlockSize] } + writers := make([]*bitrotWriter, len(onlineDisks)) for i, disk := range onlineDisks { if disk == nil { @@ -376,6 +379,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID } writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, DefaultBitrotAlgorithm) } + n, err := erasure.Encode(ctx, data, writers, buffer, erasure.dataBlocks+1) if err != nil { return pi, toObjectErr(err, bucket, object) @@ -674,13 +678,6 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string, } } - // Last part could have been uploaded as 0bytes, do not need - // to save it in final `xl.json`. - if (i == len(parts)-1) && currentXLMeta.Parts[partIdx].Size == 0 { - xlMeta.Parts = xlMeta.Parts[:i] // Skip the part. - continue - } - // Save for total object size. objectSize += currentXLMeta.Parts[partIdx].Size diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index b6e6a3b04..6b1b254bf 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -450,12 +450,17 @@ func (xl xlObjects) GetObjectInfo(ctx context.Context, bucket, object string, op func (xl xlObjects) isObjectCorrupted(metaArr []xlMetaV1, errs []error) (validMeta xlMetaV1, ok bool) { // We can consider an object data not reliable // when xl.json is not found in read quorum disks. - var notFoundXLJSON int + var notFoundXLJSON, corruptedXLJSON int for _, readErr := range errs { if readErr == errFileNotFound { notFoundXLJSON++ } } + for _, readErr := range errs { + if readErr == errCorruptedFormat { + corruptedXLJSON++ + } + } for _, m := range metaArr { if !m.IsValid() { @@ -466,24 +471,35 @@ func (xl xlObjects) isObjectCorrupted(metaArr []xlMetaV1, errs []error) (validMe } // Return if the object is indeed corrupted. - return validMeta, len(xl.getDisks())-notFoundXLJSON < validMeta.Erasure.DataBlocks + return validMeta, len(xl.getDisks())-notFoundXLJSON < validMeta.Erasure.DataBlocks || len(xl.getDisks()) == corruptedXLJSON } const xlCorruptedSuffix = ".CORRUPTED" // Renames the corrupted object and makes it visible. -func renameCorruptedObject(ctx context.Context, bucket, object string, validMeta xlMetaV1, disks []StorageAPI, errs []error) { +func (xl xlObjects) renameCorruptedObject(ctx context.Context, bucket, object string, validMeta xlMetaV1, disks []StorageAPI, errs []error) { + // if errs returned are corrupted + if validMeta.Erasure.DataBlocks == 0 { + validMeta = newXLMetaV1(object, len(disks)/2, len(disks)/2) + } writeQuorum := validMeta.Erasure.DataBlocks + 1 // Move all existing objects into corrupted suffix. - rename(ctx, disks, bucket, object, bucket, object+xlCorruptedSuffix, true, writeQuorum, []error{errFileNotFound}) + oldObj := mustGetUUID() + + rename(ctx, disks, bucket, object, minioMetaTmpBucket, oldObj, true, writeQuorum, []error{errFileNotFound}) + + // Delete temporary object in the event of failure. + // If PutObject succeeded there would be no temporary + // object to delete. + defer xl.deleteObject(ctx, minioMetaTmpBucket, oldObj, writeQuorum, false) tempObj := mustGetUUID() // Get all the disks which do not have the file. var cdisks = make([]StorageAPI, len(disks)) for i, merr := range errs { - if merr == errFileNotFound { + if merr == errFileNotFound || merr == errCorruptedFormat { cdisks[i] = disks[i] } } @@ -497,18 +513,24 @@ func renameCorruptedObject(ctx context.Context, bucket, object string, validMeta disk.AppendFile(minioMetaTmpBucket, pathJoin(tempObj, "part.1"), []byte{}) // Write algorithm hash for empty part file. - alg := validMeta.Erasure.Checksums[0].Algorithm.New() - alg.Write([]byte{}) + var algorithm = DefaultBitrotAlgorithm + h := algorithm.New() + h.Write([]byte{}) // Update the checksums and part info. - validMeta.Erasure.Checksums[0] = ChecksumInfo{ - Name: validMeta.Erasure.Checksums[0].Name, - Algorithm: validMeta.Erasure.Checksums[0].Algorithm, - Hash: alg.Sum(nil), + validMeta.Erasure.Checksums = []ChecksumInfo{ + { + Name: "part.1", + Algorithm: algorithm, + Hash: h.Sum(nil), + }, } - validMeta.Parts[0] = objectPartInfo{ - Number: 1, - Name: "part.1", + + validMeta.Parts = []objectPartInfo{ + { + Number: 1, + Name: "part.1", + }, } // Write the `xl.json` with the newly calculated metadata. @@ -530,7 +552,7 @@ func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string) (o // Having read quorum means we have xl.json in at least N/2 disks. if !strings.HasSuffix(object, xlCorruptedSuffix) { if validMeta, ok := xl.isObjectCorrupted(metaArr, errs); ok { - renameCorruptedObject(ctx, bucket, object, validMeta, disks, errs) + xl.renameCorruptedObject(ctx, bucket, object, validMeta, disks, errs) // Return err file not found since we renamed now the corrupted object return objInfo, errFileNotFound }