minio/object-api-multipart.go
Harshavardhana 57f35c2bcc xl: Introduce new blocking writer to make CreateFile atomic. (#1362)
Creates a new write closer that must be released
by the read consumer. This is necessary so that
while commiting the underlying writers in erasure
coding we need to make sure we reply success only if
we have committed to disk.

This in turn also fixes plethora of bugs related to
subsequent PutObject() races with namespace locking.

This patch also enables most of the tests, other than
ListObjects paging which has some issues still.

Fixes #1358, #1360
2016-04-25 12:47:31 -07:00

578 lines
16 KiB
Go

/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"crypto/md5"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"path"
"strconv"
"strings"
"github.com/minio/minio/pkg/probe"
"github.com/skyrings/skyring-common/tools/uuid"
)
const (
// Minio meta volume.
minioMetaVolume = ".minio"
)
// checks whether bucket exists.
func (o objectAPI) isBucketExist(bucketName string) (bool, error) {
// Check whether bucket exists.
if _, e := o.storage.StatVol(bucketName); e != nil {
if e == errVolumeNotFound {
return false, nil
}
return false, e
}
return true, nil
}
// checkLeafDirectory - verifies if a given path is leaf directory if
// yes returns all the files inside it.
func (o objectAPI) checkLeafDirectory(prefixPath string) (isLeaf bool, fis []FileInfo) {
var allFileInfos []FileInfo
var markerPath string
for {
fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, prefixPath, markerPath, false, 1000)
if e != nil {
break
}
allFileInfos = append(allFileInfos, fileInfos...)
if eof {
break
}
markerPath = allFileInfos[len(allFileInfos)-1].Name
}
for _, fileInfo := range allFileInfos {
if fileInfo.Mode.IsDir() {
isLeaf = false
return isLeaf, nil
}
fileName := path.Base(fileInfo.Name)
if !strings.Contains(fileName, ".") {
fis = append(fis, fileInfo)
}
}
isLeaf = true
return isLeaf, fis
}
// ListMultipartUploads - list multipart uploads.
func (o objectAPI) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) {
result := ListMultipartsInfo{}
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListMultipartsInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectPrefix(prefix) {
return ListMultipartsInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: prefix})
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListMultipartsInfo{}, probe.NewError(UnsupportedDelimiter{
Delimiter: delimiter,
})
}
// Verify if marker has prefix.
if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) {
return ListMultipartsInfo{}, probe.NewError(InvalidMarkerPrefixCombination{
Marker: keyMarker,
Prefix: prefix,
})
}
if uploadIDMarker != "" {
if strings.HasSuffix(keyMarker, slashSeparator) {
return result, probe.NewError(InvalidUploadIDKeyCombination{
UploadIDMarker: uploadIDMarker,
KeyMarker: keyMarker,
})
}
id, e := uuid.Parse(uploadIDMarker)
if e != nil {
return result, probe.NewError(e)
}
if id.IsZero() {
return result, probe.NewError(MalformedUploadID{
UploadID: uploadIDMarker,
})
}
}
recursive := true
if delimiter == slashSeparator {
recursive = false
}
result.IsTruncated = true
result.MaxUploads = maxUploads
newMaxUploads := 0
// Not using path.Join() as it strips off the trailing '/'.
// Also bucket should always be followed by '/' even if prefix is empty.
prefixPath := pathJoin(bucket, prefix)
if recursive {
keyMarkerPath := ""
if keyMarker != "" {
keyMarkerPath = path.Join(bucket, keyMarker, uploadIDMarker)
}
outerLoop:
for {
fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, prefixPath, keyMarkerPath, recursive, maxUploads-newMaxUploads)
if e != nil {
return ListMultipartsInfo{}, probe.NewError(e)
}
for _, fi := range fileInfos {
keyMarkerPath = fi.Name
// fi.Name will look like bucket/object/uploadID, extract object and uploadID.
uploadID := path.Base(fi.Name)
objectName := strings.TrimPrefix(path.Dir(fi.Name), retainSlash(bucket))
if strings.Contains(uploadID, ".") {
// Contains partnumber and md5sum info, skip this.
continue
}
result.Uploads = append(result.Uploads, uploadMetadata{
Object: objectName,
UploadID: uploadID,
Initiated: fi.ModTime,
})
result.NextKeyMarker = objectName
result.NextUploadIDMarker = uploadID
newMaxUploads++
if newMaxUploads == maxUploads {
if eof {
result.IsTruncated = false
}
break outerLoop
}
}
if eof {
result.IsTruncated = false
break
}
}
if !result.IsTruncated {
result.NextKeyMarker = ""
result.NextUploadIDMarker = ""
}
return result, nil
}
var fileInfos []FileInfo
// read all the "fileInfos" in the prefix
for {
marker := ""
fis, eof, e := o.storage.ListFiles(minioMetaVolume, prefixPath, marker, recursive, 1000)
if e != nil {
return ListMultipartsInfo{}, probe.NewError(e)
}
for _, fi := range fis {
marker = fi.Name
if fi.Mode.IsDir() {
fileInfos = append(fileInfos, fi)
}
}
if eof {
break
}
}
// Create "uploads" slice from "fileInfos" slice.
var uploads []uploadMetadata
for _, fi := range fileInfos {
leaf, entries := o.checkLeafDirectory(fi.Name)
objectName := strings.TrimPrefix(fi.Name, retainSlash(bucket))
if leaf {
for _, entry := range entries {
if strings.Contains(path.Base(entry.Name), ".") {
continue
}
uploads = append(uploads, uploadMetadata{
Object: path.Dir(objectName),
UploadID: path.Base(entry.Name),
Initiated: entry.ModTime,
})
}
continue
}
uploads = append(uploads, uploadMetadata{
Object: objectName,
})
}
index := 0
for i, upload := range uploads {
index = i
if upload.Object > keyMarker {
break
}
if uploads[index].Object == keyMarker && upload.UploadID > uploadIDMarker {
break
}
}
for ; index < len(uploads); index++ {
if (len(result.Uploads) + len(result.CommonPrefixes)) == maxUploads {
break
}
result.NextKeyMarker = uploads[index].Object
if strings.HasSuffix(uploads[index].Object, slashSeparator) {
// for a directory entry
result.CommonPrefixes = append(result.CommonPrefixes, uploads[index].Object)
continue
}
result.NextUploadIDMarker = uploads[index].UploadID
result.Uploads = append(result.Uploads, uploads[index])
}
if index == len(uploads) {
result.IsTruncated = false
result.NextKeyMarker = ""
result.NextUploadIDMarker = ""
}
return result, nil
}
func (o objectAPI) NewMultipartUpload(bucket, object string) (string, *probe.Error) {
// Verify if bucket name is valid.
if !IsValidBucketName(bucket) {
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object name is valid.
if !IsValidObjectName(object) {
return "", probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// Verify whether the bucket exists.
isExist, err := o.isBucketExist(bucket)
if err != nil {
return "", probe.NewError(err)
}
if !isExist {
return "", probe.NewError(BucketNotFound{Bucket: bucket})
}
if _, e := o.storage.StatVol(minioMetaVolume); e != nil {
if e == errVolumeNotFound {
e = o.storage.MakeVol(minioMetaVolume)
if e != nil {
if e == errDiskFull {
return "", probe.NewError(StorageFull{})
}
return "", probe.NewError(e)
}
}
}
for {
uuid, e := uuid.New()
if e != nil {
return "", probe.NewError(e)
}
uploadID := uuid.String()
uploadIDPath := path.Join(bucket, object, uploadID)
if _, e = o.storage.StatFile(minioMetaVolume, uploadIDPath); e != nil {
if e != errFileNotFound {
return "", probe.NewError(toObjectErr(e, minioMetaVolume, uploadIDPath))
}
// uploadIDPath doesn't exist, so create empty file to reserve the name
var w io.WriteCloser
if w, e = o.storage.CreateFile(minioMetaVolume, uploadIDPath); e == nil {
// Close the writer.
if e = w.Close(); e != nil {
return "", probe.NewError(e)
}
} else {
return "", probe.NewError(toObjectErr(e, minioMetaVolume, uploadIDPath))
}
return uploadID, nil
}
// uploadIDPath already exists.
// loop again to try with different uuid generated.
}
}
// isUploadIDExists - verify if a given uploadID exists and is valid.
func (o objectAPI) isUploadIDExists(bucket, object, uploadID string) (bool, error) {
uploadIDPath := path.Join(bucket, object, uploadID)
st, e := o.storage.StatFile(minioMetaVolume, uploadIDPath)
if e != nil {
// Upload id does not exist.
if e == errFileNotFound {
return false, nil
}
return false, e
}
// Upload id exists and is a regular file.
return st.Mode.IsRegular(), nil
}
// PutObjectPart - writes the multipart upload chunks.
func (o objectAPI) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, *probe.Error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// Verify whether the bucket exists.
isExist, err := o.isBucketExist(bucket)
if err != nil {
return "", probe.NewError(err)
}
if !isExist {
return "", probe.NewError(BucketNotFound{Bucket: bucket})
}
if status, e := o.isUploadIDExists(bucket, object, uploadID); e != nil {
return "", probe.NewError(e)
} else if !status {
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
}
partSuffix := fmt.Sprintf("%s.%d.%s", uploadID, partID, md5Hex)
fileWriter, e := o.storage.CreateFile(minioMetaVolume, path.Join(bucket, object, partSuffix))
if e != nil {
return "", probe.NewError(toObjectErr(e, bucket, object))
}
// Initialize md5 writer.
md5Writer := md5.New()
// Instantiate a new multi writer.
multiWriter := io.MultiWriter(md5Writer, fileWriter)
// Instantiate checksum hashers and create a multiwriter.
if size > 0 {
if _, e = io.CopyN(multiWriter, data, size); e != nil {
safeCloseAndRemove(fileWriter)
return "", probe.NewError(toObjectErr(e))
}
// Reader shouldn't have more data what mentioned in size argument.
// reading one more byte from the reader to validate it.
// expected to fail, success validates existence of more data in the reader.
if _, e = io.CopyN(ioutil.Discard, data, 1); e == nil {
safeCloseAndRemove(fileWriter)
return "", probe.NewError(UnExpectedDataSize{Size: int(size)})
}
} else {
if _, e = io.Copy(multiWriter, data); e != nil {
safeCloseAndRemove(fileWriter)
return "", probe.NewError(toObjectErr(e))
}
}
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
if md5Hex != "" {
if newMD5Hex != md5Hex {
safeCloseAndRemove(fileWriter)
return "", probe.NewError(BadDigest{md5Hex, newMD5Hex})
}
}
e = fileWriter.Close()
if e != nil {
return "", probe.NewError(e)
}
return newMD5Hex, nil
}
func (o objectAPI) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, *probe.Error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListPartsInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return ListPartsInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
if status, e := o.isUploadIDExists(bucket, object, uploadID); e != nil {
return ListPartsInfo{}, probe.NewError(e)
} else if !status {
return ListPartsInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
}
result := ListPartsInfo{}
var markerPath string
nextPartNumberMarker := 0
uploadIDPath := path.Join(bucket, object, uploadID)
// Figure out the marker for the next subsequent calls, if the
// partNumberMarker is already set.
if partNumberMarker > 0 {
partNumberMarkerPath := uploadIDPath + "." + strconv.Itoa(partNumberMarker) + "."
fileInfos, _, e := o.storage.ListFiles(minioMetaVolume, partNumberMarkerPath, "", false, 1)
if e != nil {
return result, probe.NewError(toObjectErr(e, minioMetaVolume, partNumberMarkerPath))
}
if len(fileInfos) == 0 {
return result, probe.NewError(InvalidPart{})
}
markerPath = fileInfos[0].Name
}
uploadIDPrefix := uploadIDPath + "."
fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, uploadIDPrefix, markerPath, false, maxParts)
if e != nil {
return result, probe.NewError(InvalidPart{})
}
for _, fileInfo := range fileInfos {
fileName := path.Base(fileInfo.Name)
splitResult := strings.Split(fileName, ".")
partNum, e := strconv.Atoi(splitResult[1])
if e != nil {
return result, probe.NewError(e)
}
md5sum := splitResult[2]
result.Parts = append(result.Parts, partInfo{
PartNumber: partNum,
LastModified: fileInfo.ModTime,
ETag: md5sum,
Size: fileInfo.Size,
})
nextPartNumberMarker = partNum
}
result.Bucket = bucket
result.Object = object
result.UploadID = uploadID
result.PartNumberMarker = partNumberMarker
result.NextPartNumberMarker = nextPartNumberMarker
result.MaxParts = maxParts
result.IsTruncated = !eof
return result, nil
}
// Create an s3 compatible MD5sum for complete multipart transaction.
func makeS3MD5(md5Strs ...string) (string, *probe.Error) {
var finalMD5Bytes []byte
for _, md5Str := range md5Strs {
md5Bytes, e := hex.DecodeString(md5Str)
if e != nil {
return "", probe.NewError(e)
}
finalMD5Bytes = append(finalMD5Bytes, md5Bytes...)
}
md5Hasher := md5.New()
md5Hasher.Write(finalMD5Bytes)
s3MD5 := fmt.Sprintf("%s-%d", hex.EncodeToString(md5Hasher.Sum(nil)), len(md5Strs))
return s3MD5, nil
}
func (o objectAPI) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, *probe.Error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", probe.NewError(ObjectNameInvalid{
Bucket: bucket,
Object: object,
})
}
if status, e := o.isUploadIDExists(bucket, object, uploadID); e != nil {
return "", probe.NewError(e)
} else if !status {
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
}
fileWriter, e := o.storage.CreateFile(bucket, object)
if e != nil {
return "", probe.NewError(toObjectErr(e, bucket, object))
}
var md5Sums []string
for _, part := range parts {
// Construct part suffix.
partSuffix := fmt.Sprintf("%s.%d.%s", uploadID, part.PartNumber, part.ETag)
var fileReader io.ReadCloser
fileReader, e = o.storage.ReadFile(minioMetaVolume, path.Join(bucket, object, partSuffix), 0)
if e != nil {
if e == errFileNotFound {
return "", probe.NewError(InvalidPart{})
}
return "", probe.NewError(e)
}
_, e = io.Copy(fileWriter, fileReader)
if e != nil {
return "", probe.NewError(e)
}
e = fileReader.Close()
if e != nil {
return "", probe.NewError(e)
}
md5Sums = append(md5Sums, part.ETag)
}
e = fileWriter.Close()
if e != nil {
return "", probe.NewError(e)
}
// Save the s3 md5.
s3MD5, err := makeS3MD5(md5Sums...)
if err != nil {
return "", err.Trace(md5Sums...)
}
// Cleanup all the parts.
o.removeMultipartUpload(bucket, object, uploadID)
// Return md5sum.
return s3MD5, nil
}
func (o objectAPI) removeMultipartUpload(bucket, object, uploadID string) *probe.Error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
marker := ""
for {
uploadIDPath := path.Join(bucket, object, uploadID)
fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, uploadIDPath, marker, false, 1000)
if e != nil {
return probe.NewError(InvalidUploadID{UploadID: uploadID})
}
for _, fileInfo := range fileInfos {
o.storage.DeleteFile(minioMetaVolume, fileInfo.Name)
marker = fileInfo.Name
}
if eof {
break
}
}
return nil
}
func (o objectAPI) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
if status, e := o.isUploadIDExists(bucket, object, uploadID); e != nil {
return probe.NewError(e)
} else if !status {
return probe.NewError(InvalidUploadID{UploadID: uploadID})
}
err := o.removeMultipartUpload(bucket, object, uploadID)
if err != nil {
return err.Trace(bucket, object, uploadID)
}
return nil
}