1
0
mirror of https://github.com/minio/minio synced 2024-07-08 19:56:05 +00:00

fs/xl: Combine input checks into re-usable functions. (#3383)

Repeated code around both object layers are moved
and combined into simple re-usable functions.
This commit is contained in:
Harshavardhana 2016-12-01 23:15:17 -08:00 committed by GitHub
parent 918924796f
commit ff4ce0ee14
20 changed files with 216 additions and 418 deletions

View File

@ -33,7 +33,7 @@ import (
// Enforces bucket policies for a bucket for a given tatusaction.
func enforceBucketPolicy(bucket string, action string, reqURL *url.URL) (s3Error APIErrorCode) {
// Verify if bucket actually exists
if err := isBucketExist(bucket, newObjectLayerFn()); err != nil {
if err := checkBucketExist(bucket, newObjectLayerFn()); err != nil {
err = errorCause(err)
switch err.(type) {
case BucketNameInvalid:

View File

@ -27,20 +27,6 @@ func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool {
return err == nil
}
// Checks whether bucket exists.
func (fs fsObjects) isBucketExist(bucket string) bool {
// Check whether bucket exists.
_, err := fs.storage.StatVol(bucket)
if err != nil {
if err == errVolumeNotFound {
return false
}
errorIf(err, "Stat failed on bucket "+bucket+".")
return false
}
return true
}
// isUploadIDExists - verify if a given uploadID exists and is valid.
func (fs fsObjects) isUploadIDExists(bucket, object, uploadID string) bool {
uploadIDPath := path.Join(bucket, object, uploadID)

View File

@ -23,39 +23,6 @@ import (
"time"
)
// TestFSIsBucketExist - complete test of isBucketExist
func TestFSIsBucketExist(t *testing.T) {
// Prepare for testing
disk := filepath.Join(os.TempDir(), "minio-"+nextSuffix())
defer removeAll(disk)
obj := initFSObjects(disk, t)
fs := obj.(fsObjects)
bucketName := "bucket"
if err := obj.MakeBucket(bucketName); err != nil {
t.Fatal("Cannot create bucket, err: ", err)
}
// Test with a valid bucket
if found := fs.isBucketExist(bucketName); !found {
t.Fatal("isBucketExist should true")
}
// Test with a inexistant bucket
if found := fs.isBucketExist("foo"); found {
t.Fatal("isBucketExist should false")
}
// Using a faulty disk
fsStorage := fs.storage.(*retryStorage)
naughty := newNaughtyDisk(fsStorage, nil, errFaultyDisk)
fs.storage = naughty
if found := fs.isBucketExist(bucketName); found {
t.Fatal("isBucketExist should return false because it is wired to a corrupted disk")
}
}
// TestFSIsUploadExists - complete test with valid and invalid cases
func TestFSIsUploadExists(t *testing.T) {
// Prepare for testing

View File

@ -26,8 +26,6 @@ import (
"path"
"strings"
"time"
"github.com/skyrings/skyring-common/tools/uuid"
)
// listMultipartUploads - lists all multipart uploads.
@ -172,45 +170,8 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
// ListMultipartsInfo structure is unmarshalled directly into XML and
// replied back to the client.
func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
// Validate input arguments.
if !IsValidBucketName(bucket) {
return ListMultipartsInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
if !fs.isBucketExist(bucket) {
return ListMultipartsInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectPrefix(prefix) {
return ListMultipartsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: prefix})
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListMultipartsInfo{}, traceError(UnsupportedDelimiter{
Delimiter: delimiter,
})
}
// Verify if marker has prefix.
if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) {
return ListMultipartsInfo{}, traceError(InvalidMarkerPrefixCombination{
Marker: keyMarker,
Prefix: prefix,
})
}
if uploadIDMarker != "" {
if strings.HasSuffix(keyMarker, slashSeparator) {
return ListMultipartsInfo{}, traceError(InvalidUploadIDKeyCombination{
UploadIDMarker: uploadIDMarker,
KeyMarker: keyMarker,
})
}
id, err := uuid.Parse(uploadIDMarker)
if err != nil {
return ListMultipartsInfo{}, traceError(err)
}
if id.IsZero() {
return ListMultipartsInfo{}, traceError(MalformedUploadID{
UploadID: uploadIDMarker,
})
}
if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, fs); err != nil {
return ListMultipartsInfo{}, err
}
return fs.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
}
@ -256,17 +217,8 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st
//
// Implements S3 compatible initiate multipart API.
func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) {
// Verify if bucket name is valid.
if !IsValidBucketName(bucket) {
return "", traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify whether the bucket exists.
if !fs.isBucketExist(bucket) {
return "", traceError(BucketNotFound{Bucket: bucket})
}
// Verify if object name is valid.
if !IsValidObjectName(object) {
return "", traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
if err := checkNewMultipartArgs(bucket, object, fs); err != nil {
return "", err
}
return fs.newMultipartUpload(bucket, object, meta)
}
@ -290,16 +242,8 @@ func partToAppend(fsMeta fsMetaV1, fsAppendMeta fsMetaV1) (part objectPartInfo,
// written to '.minio.sys/tmp' location and safely renamed to
// '.minio.sys/multipart' for reach parts.
func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify whether the bucket exists.
if !fs.isBucketExist(bucket) {
return "", traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
if err := checkPutObjectPartArgs(bucket, object, fs); err != nil {
return "", err
}
uploadIDPath := path.Join(bucket, object, uploadID)
@ -488,16 +432,8 @@ func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberM
// ListPartsInfo structure is unmarshalled directly into XML and
// replied back to the client.
func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListPartsInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify whether the bucket exists.
if !fs.isBucketExist(bucket) {
return ListPartsInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return ListPartsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
if err := checkListPartsArgs(bucket, object, fs); err != nil {
return ListPartsInfo{}, err
}
// Hold lock so that there is no competing
@ -532,19 +468,8 @@ func (fs fsObjects) totalObjectSize(fsMeta fsMetaV1, parts []completePart) (int6
//
// Implements S3 compatible Complete multipart API.
func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify whether the bucket exists.
if !fs.isBucketExist(bucket) {
return "", traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", traceError(ObjectNameInvalid{
Bucket: bucket,
Object: object,
})
if err := checkCompleteMultipartArgs(bucket, object, fs); err != nil {
return "", err
}
uploadIDPath := path.Join(bucket, object, uploadID)
@ -739,15 +664,8 @@ func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error
// no affect and further requests to the same uploadID would not be
// honored.
func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
if !fs.isBucketExist(bucket) {
return traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
if err := checkAbortMultipartArgs(bucket, object, fs); err != nil {
return err
}
// Hold lock so that there is no competing

View File

@ -219,13 +219,8 @@ func (fs fsObjects) DeleteBucket(bucket string) error {
// GetObject - get an object.
func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) (err error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
if err = checkGetObjArgs(bucket, object); err != nil {
return err
}
// Offset and length cannot be negative.
if offset < 0 || length < 0 {
@ -352,28 +347,16 @@ func (fs fsObjects) getObjectInfo(bucket, object string) (ObjectInfo, error) {
// GetObjectInfo - get object info.
func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ObjectInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return ObjectInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
if err := checkGetObjArgs(bucket, object); err != nil {
return ObjectInfo{}, err
}
return fs.getObjectInfo(bucket, object)
}
// PutObject - create an object.
func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (objInfo ObjectInfo, err error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ObjectInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return ObjectInfo{}, traceError(ObjectNameInvalid{
Bucket: bucket,
Object: object,
})
if err = checkPutObjectArgs(bucket, object, fs); err != nil {
return ObjectInfo{}, err
}
// No metadata is set, allocate a new one.
if metadata == nil {
@ -505,12 +488,8 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
// DeleteObject - deletes an object from a bucket, this operation is destructive
// and there are no rollbacks supported.
func (fs fsObjects) DeleteObject(bucket, object string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
if err := checkDelObjArgs(bucket, object); err != nil {
return err
}
// Lock the object before deleting so that an in progress GetObject does not return
@ -550,31 +529,8 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
return
}
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListObjectsInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if bucket exists.
if !fs.isBucketExist(bucket) {
return ListObjectsInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectPrefix(prefix) {
return ListObjectsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: prefix})
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListObjectsInfo{}, traceError(UnsupportedDelimiter{
Delimiter: delimiter,
})
}
// Verify if marker has prefix.
if marker != "" {
if !strings.HasPrefix(marker, prefix) {
return ListObjectsInfo{}, traceError(InvalidMarkerPrefixCombination{
Marker: marker,
Prefix: prefix,
})
}
if err := checkListObjsArgs(bucket, prefix, marker, delimiter, fs); err != nil {
return ListObjectsInfo{}, err
}
// With max keys of zero we have reached eof, return right here.

View File

@ -276,15 +276,3 @@ func cleanupDir(storage StorageAPI, volume, dirPath string) error {
err := delFunc(retainSlash(pathJoin(dirPath)))
return err
}
// Checks whether bucket exists.
func isBucketExist(bucket string, obj ObjectLayer) error {
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
}
_, err := obj.GetBucketInfo(bucket)
if err != nil {
return BucketNotFound{Bucket: bucket}
}
return nil
}

View File

@ -0,0 +1,161 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"strings"
"github.com/skyrings/skyring-common/tools/uuid"
)
// Checks on GetObject arguments, bucket and object.
func checkGetObjArgs(bucket, object string) error {
return checkBucketAndObjectNames(bucket, object)
}
// Checks on DeleteObject arguments, bucket and object.
func checkDelObjArgs(bucket, object string) error {
return checkBucketAndObjectNames(bucket, object)
}
// Checks bucket and object name validity, returns nil if both are valid.
func checkBucketAndObjectNames(bucket, object string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
return nil
}
// Checks for all ListObjects arguments validity.
func checkListObjsArgs(bucket, prefix, marker, delimiter string, obj ObjectLayer) error {
// Verify if bucket exists before validating object name.
// This is done on purpose since the order of errors is
// important here bucket does not exist error should
// happen before we return an error for invalid object name.
// FIXME: should be moved to handler layer.
if err := checkBucketExist(bucket, obj); err != nil {
return traceError(err)
}
// Validates object prefix validity after bucket exists.
if !IsValidObjectPrefix(prefix) {
return traceError(ObjectNameInvalid{
Bucket: bucket,
Object: prefix,
})
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return traceError(UnsupportedDelimiter{
Delimiter: delimiter,
})
}
// Verify if marker has prefix.
if marker != "" && !strings.HasPrefix(marker, prefix) {
return traceError(InvalidMarkerPrefixCombination{
Marker: marker,
Prefix: prefix,
})
}
return nil
}
// Checks for all ListMultipartUploads arguments validity.
func checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, obj ObjectLayer) error {
if err := checkListObjsArgs(bucket, prefix, keyMarker, delimiter, obj); err != nil {
return err
}
if uploadIDMarker != "" {
if strings.HasSuffix(keyMarker, slashSeparator) {
return traceError(InvalidUploadIDKeyCombination{
UploadIDMarker: uploadIDMarker,
KeyMarker: keyMarker,
})
}
id, err := uuid.Parse(uploadIDMarker)
if err != nil {
return traceError(err)
}
if id.IsZero() {
return traceError(MalformedUploadID{
UploadID: uploadIDMarker,
})
}
}
return nil
}
// Checks for NewMultipartUpload arguments validity, also validates if bucket exists.
func checkNewMultipartArgs(bucket, object string, obj ObjectLayer) error {
return checkPutObjectArgs(bucket, object, obj)
}
// Checks for PutObjectPart arguments validity, also validates if bucket exists.
func checkPutObjectPartArgs(bucket, object string, obj ObjectLayer) error {
return checkPutObjectArgs(bucket, object, obj)
}
// Checks for ListParts arguments validity, also validates if bucket exists.
func checkListPartsArgs(bucket, object string, obj ObjectLayer) error {
return checkPutObjectArgs(bucket, object, obj)
}
// Checks for CompleteMultipartUpload arguments validity, also validates if bucket exists.
func checkCompleteMultipartArgs(bucket, object string, obj ObjectLayer) error {
return checkPutObjectArgs(bucket, object, obj)
}
// Checks for AbortMultipartUpload arguments validity, also validates if bucket exists.
func checkAbortMultipartArgs(bucket, object string, obj ObjectLayer) error {
return checkPutObjectArgs(bucket, object, obj)
}
// Checks for PutObject arguments validity, also validates if bucket exists.
func checkPutObjectArgs(bucket, object string, obj ObjectLayer) error {
// Verify if bucket exists before validating object name.
// This is done on purpose since the order of errors is
// important here bucket does not exist error should
// happen before we return an error for invalid object name.
// FIXME: should be moved to handler layer.
if err := checkBucketExist(bucket, obj); err != nil {
return traceError(err)
}
// Validates object name validity after bucket exists.
if !IsValidObjectName(object) {
return traceError(ObjectNameInvalid{
Bucket: bucket,
Object: object,
})
}
return nil
}
// Checks whether bucket exists and returns appropriate error if not.
func checkBucketExist(bucket string, obj ObjectLayer) error {
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
}
_, err := obj.GetBucketInfo(bucket)
if err != nil {
return BucketNotFound{Bucket: bucket}
}
return nil
}

View File

@ -146,19 +146,6 @@ func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err
return BucketInfo{}, err
}
// Checks whether bucket exists.
func (xl xlObjects) isBucketExist(bucket string) bool {
// Check whether bucket exists.
_, err := xl.getBucketInfo(bucket)
if err != nil {
if err == errVolumeNotFound {
return false
}
return false
}
return true
}
// GetBucketInfo - returns BucketInfo for a bucket.
func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) {
// Verify if bucket is valid.

View File

@ -28,9 +28,9 @@ func healFormatXL(storageDisks []StorageAPI) (err error) {
// Attempt to load all `format.json`.
formatConfigs, sErrs := loadAllFormats(storageDisks)
// Generic format check validates
// if (no quorum) return error
// if (disks not recognized) // Always error.
// Generic format check.
// - if (no quorum) return error
// - if (disks not recognized) // Always error.
if err = genericFormatCheck(formatConfigs, sErrs); err != nil {
return err
}
@ -58,14 +58,8 @@ func healFormatXL(storageDisks []StorageAPI) (err error) {
// also heals the missing entries for bucket metadata files
// `policy.json, notification.xml, listeners.json`.
func (xl xlObjects) HealBucket(bucket string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if bucket exists.
if !xl.isBucketExist(bucket) {
return traceError(BucketNotFound{Bucket: bucket})
if err := checkBucketExist(bucket, xl); err != nil {
return err
}
// Heal bucket.
@ -347,14 +341,8 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
// and later the disk comes back up again, heal on the object
// should delete it.
func (xl xlObjects) HealObject(bucket, object string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
if err := checkGetObjArgs(bucket, object); err != nil {
return err
}
// Lock the object before healing.

View File

@ -162,31 +162,8 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma
// ListObjects - list all objects at prefix, delimited by '/'.
func (xl xlObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListObjectsInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if bucket exists.
if !xl.isBucketExist(bucket) {
return ListObjectsInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectPrefix(prefix) {
return ListObjectsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: prefix})
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListObjectsInfo{}, traceError(UnsupportedDelimiter{
Delimiter: delimiter,
})
}
// Verify if marker has prefix.
if marker != "" {
if !strings.HasPrefix(marker, prefix) {
return ListObjectsInfo{}, traceError(InvalidMarkerPrefixCombination{
Marker: marker,
Prefix: prefix,
})
}
if err := checkListObjsArgs(bucket, prefix, marker, delimiter, xl); err != nil {
return ListObjectsInfo{}, err
}
// With max keys of zero we have reached eof, return right here.

View File

@ -100,31 +100,8 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
// ListObjects - list all objects at prefix, delimited by '/'.
func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListObjectsInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if bucket exists.
if !xl.isBucketExist(bucket) {
return ListObjectsInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectPrefix(prefix) {
return ListObjectsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: prefix})
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListObjectsInfo{}, traceError(UnsupportedDelimiter{
Delimiter: delimiter,
})
}
// Verify if marker has prefix.
if marker != "" {
if !strings.HasPrefix(marker, prefix) {
return ListObjectsInfo{}, traceError(InvalidMarkerPrefixCombination{
Marker: marker,
Prefix: prefix,
})
}
if err := checkListObjsArgs(bucket, prefix, marker, delimiter, xl); err != nil {
return ListObjectsInfo{}, err
}
// With max keys of zero we have reached eof, return right here.

View File

@ -29,7 +29,6 @@ import (
"time"
"github.com/minio/minio/pkg/mimedb"
"github.com/skyrings/skyring-common/tools/uuid"
)
// listMultipartUploads - lists all multipart uploads.
@ -210,48 +209,10 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
// ListMultipartsInfo structure is unmarshalled directly into XML and
// replied back to the client.
func (xl xlObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
result := ListMultipartsInfo{}
if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, xl); err != nil {
return ListMultipartsInfo{}, err
}
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListMultipartsInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
if !xl.isBucketExist(bucket) {
return ListMultipartsInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectPrefix(prefix) {
return ListMultipartsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: prefix})
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListMultipartsInfo{}, traceError(UnsupportedDelimiter{
Delimiter: delimiter,
})
}
// Verify if marker has prefix.
if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) {
return ListMultipartsInfo{}, traceError(InvalidMarkerPrefixCombination{
Marker: keyMarker,
Prefix: prefix,
})
}
if uploadIDMarker != "" {
if strings.HasSuffix(keyMarker, slashSeparator) {
return result, traceError(InvalidUploadIDKeyCombination{
UploadIDMarker: uploadIDMarker,
KeyMarker: keyMarker,
})
}
id, err := uuid.Parse(uploadIDMarker)
if err != nil {
return result, traceError(err)
}
if id.IsZero() {
return result, traceError(MalformedUploadID{
UploadID: uploadIDMarker,
})
}
}
return xl.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
}
@ -319,17 +280,8 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
//
// Implements S3 compatible initiate multipart API.
func (xl xlObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) {
// Verify if bucket name is valid.
if !IsValidBucketName(bucket) {
return "", traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify whether the bucket exists.
if !xl.isBucketExist(bucket) {
return "", traceError(BucketNotFound{Bucket: bucket})
}
// Verify if object name is valid.
if !IsValidObjectName(object) {
return "", traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
if err := checkNewMultipartArgs(bucket, object, xl); err != nil {
return "", err
}
// No metadata is set, allocate a new one.
if meta == nil {
@ -344,16 +296,8 @@ func (xl xlObjects) NewMultipartUpload(bucket, object string, meta map[string]st
//
// Implements S3 compatible Upload Part API.
func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify whether the bucket exists.
if !xl.isBucketExist(bucket) {
return "", traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
if err := checkPutObjectPartArgs(bucket, object, xl); err != nil {
return "", err
}
var partsMetadata []xlMetaV1
@ -607,16 +551,8 @@ func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberM
// ListPartsInfo structure is unmarshalled directly into XML and
// replied back to the client.
func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListPartsInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify whether the bucket exists.
if !xl.isBucketExist(bucket) {
return ListPartsInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return ListPartsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
if err := checkListPartsArgs(bucket, object, xl); err != nil {
return ListPartsInfo{}, err
}
// Hold lock so that there is no competing
@ -640,19 +576,8 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
//
// Implements S3 compatible Complete multipart API.
func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify whether the bucket exists.
if !xl.isBucketExist(bucket) {
return "", traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", traceError(ObjectNameInvalid{
Bucket: bucket,
Object: object,
})
if err := checkCompleteMultipartArgs(bucket, object, xl); err != nil {
return "", err
}
// Hold lock so that
@ -884,15 +809,8 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e
// that this is an atomic idempotent operation. Subsequent calls have
// no affect and further requests to the same uploadID would not be honored.
func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
if !xl.isBucketExist(bucket) {
return traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
if err := checkAbortMultipartArgs(bucket, object, xl); err != nil {
return err
}
// Hold lock so that there is no competing

View File

@ -50,13 +50,8 @@ var objectOpIgnoredErrs = []error{
// object to be read at. length indicates the total length of the
// object requested by client.
func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
if err := checkGetObjArgs(bucket, object); err != nil {
return err
}
// Start offset and length cannot be negative.
if startOffset < 0 || length < 0 {
@ -223,13 +218,8 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ObjectInfo{}, BucketNameInvalid{Bucket: bucket}
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return ObjectInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object}
if err := checkGetObjArgs(bucket, object); err != nil {
return ObjectInfo{}, err
}
objectLock := nsMutex.NewNSLock(bucket, object)
@ -365,19 +355,8 @@ func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject
// writes `xl.json` which carries the necessary metadata for future
// object operations.
func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (objInfo ObjectInfo, err error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ObjectInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify bucket exists.
if !xl.isBucketExist(bucket) {
return ObjectInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return ObjectInfo{}, traceError(ObjectNameInvalid{
Bucket: bucket,
Object: object,
})
if err = checkPutObjectArgs(bucket, object, xl); err != nil {
return ObjectInfo{}, err
}
// No metadata is set, allocate a new one.
if metadata == nil {
@ -623,12 +602,8 @@ func (xl xlObjects) deleteObject(bucket, object string) error {
// any error as it is not necessary for the handler to reply back a
// response to the client request.
func (xl xlObjects) DeleteObject(bucket, object string) (err error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
if err = checkDelObjArgs(bucket, object); err != nil {
return err
}
objectLock := nsMutex.NewNSLock(bucket, object)