diff --git a/fs-objects.go b/fs-objects.go index f82d573f4..d1f7b89f2 100644 --- a/fs-objects.go +++ b/fs-objects.go @@ -94,9 +94,6 @@ func (fs fsObjects) GetObject(bucket, object string, startOffset int64) (io.Read if !IsValidBucketName(bucket) { return nil, BucketNameInvalid{Bucket: bucket} } - if !isBucketExist(fs.storage, bucket) { - return nil, BucketNotFound{Bucket: bucket} - } // Verify if object is valid. if !IsValidObjectName(object) { return nil, ObjectNameInvalid{Bucket: bucket, Object: object} @@ -114,9 +111,6 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { if !IsValidBucketName(bucket) { return ObjectInfo{}, (BucketNameInvalid{Bucket: bucket}) } - if !isBucketExist(fs.storage, bucket) { - return ObjectInfo{}, BucketNotFound{Bucket: bucket} - } // Verify if object is valid. if !IsValidObjectName(object) { return ObjectInfo{}, (ObjectNameInvalid{Bucket: bucket, Object: object}) @@ -149,10 +143,6 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io. if !IsValidBucketName(bucket) { return "", BucketNameInvalid{Bucket: bucket} } - // Check whether the bucket exists. - if !isBucketExist(fs.storage, bucket) { - return "", BucketNotFound{Bucket: bucket} - } if !IsValidObjectName(object) { return "", ObjectNameInvalid{ Bucket: bucket, @@ -219,9 +209,6 @@ func (fs fsObjects) DeleteObject(bucket, object string) error { if !IsValidBucketName(bucket) { return BucketNameInvalid{Bucket: bucket} } - if !isBucketExist(fs.storage, bucket) { - return BucketNotFound{Bucket: bucket} - } if !IsValidObjectName(object) { return ObjectNameInvalid{Bucket: bucket, Object: object} } diff --git a/object-common-multipart.go b/object-common-multipart.go index 5f9a2b770..5d85bd8b5 100644 --- a/object-common-multipart.go +++ b/object-common-multipart.go @@ -80,7 +80,6 @@ func newMultipartUploadCommon(storage StorageAPI, bucket string, object string) if !isBucketExist(storage, bucket) { return "", BucketNotFound{Bucket: bucket} } - // Verify if object name is valid. if !IsValidObjectName(object) { return "", ObjectNameInvalid{Bucket: bucket, Object: object} @@ -229,7 +228,6 @@ func abortMultipartUploadCommon(storage StorageAPI, bucket, object, uploadID str if !IsValidBucketName(bucket) { return BucketNameInvalid{Bucket: bucket} } - // Verify whether the bucket exists. if !isBucketExist(storage, bucket) { return BucketNotFound{Bucket: bucket} } @@ -371,7 +369,6 @@ func listMultipartUploadsCommon(layer ObjectLayer, bucket, prefix, keyMarker, up case fsObjects: storage = l.storage } - result := ListMultipartsInfo{} // Verify if bucket is valid. if !IsValidBucketName(bucket) { diff --git a/object-common.go b/object-common.go index 2979c80fc..7f32cc17c 100644 --- a/object-common.go +++ b/object-common.go @@ -144,17 +144,14 @@ func listObjectsCommon(layer ObjectLayer, bucket, prefix, marker, delimiter stri case fsObjects: storage = l.storage } - // Verify if bucket is valid. if !IsValidBucketName(bucket) { return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket} } - - // Verify whether the bucket exists. + // Verify if bucket exists. if !isBucketExist(storage, bucket) { return ListObjectsInfo{}, BucketNotFound{Bucket: bucket} } - if !IsValidObjectPrefix(prefix) { return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} } @@ -174,6 +171,7 @@ func listObjectsCommon(layer ObjectLayer, bucket, prefix, marker, delimiter stri } } + // With max keys of zero we have reached eof, return right here. if maxKeys == 0 { return ListObjectsInfo{}, nil } diff --git a/posix.go b/posix.go index 3aaf33412..6cdea4cda 100644 --- a/posix.go +++ b/posix.go @@ -128,28 +128,8 @@ func checkDiskFree(diskPath string, minFreeDisk int64) (err error) { return nil } -func removeDuplicateVols(volsInfo []VolInfo) []VolInfo { - // Use map to record duplicates as we find them. - result := []VolInfo{} - - m := make(map[string]VolInfo) - for _, v := range volsInfo { - if _, found := m[v.Name]; !found { - m[v.Name] = v - } - } - - result = make([]VolInfo, 0, len(m)) - for _, v := range m { - result = append(result, v) - } - - // Return the new slice. - return result -} - -// gets all the unique directories from diskPath. -func getAllUniqueVols(dirPath string) ([]VolInfo, error) { +// List all the volumes from diskPath. +func listVols(dirPath string) ([]VolInfo, error) { if err := checkPathLength(dirPath); err != nil { return nil, err } @@ -179,14 +159,14 @@ func getAllUniqueVols(dirPath string) ([]VolInfo, error) { Created: fi.ModTime(), }) } - return removeDuplicateVols(volsInfo), nil + return volsInfo, nil } -// getVolumeDir - will convert incoming volume names to +// getVolDir - will convert incoming volume names to // corresponding valid volume names on the backend in a platform // compatible way for all operating systems. If volume is not found // an error is generated. -func (s fsStorage) getVolumeDir(volume string) (string, error) { +func (s fsStorage) getVolDir(volume string) (string, error) { if !isValidVolname(volume) { return "", errInvalidArgument } @@ -194,30 +174,7 @@ func (s fsStorage) getVolumeDir(volume string) (string, error) { return "", err } volumeDir := pathJoin(s.diskPath, volume) - _, err := os.Stat(volumeDir) - if err == nil { - return volumeDir, nil - } - if os.IsNotExist(err) { - var volsInfo []VolInfo - volsInfo, err = getAllUniqueVols(s.diskPath) - if err != nil { - // For any errors, treat it as disk not found. - return volumeDir, err - } - for _, vol := range volsInfo { - // Verify if lowercase version of the volume is equal to - // the incoming volume, then use the proper name. - if strings.ToLower(vol.Name) == volume { - volumeDir = pathJoin(s.diskPath, vol.Name) - return volumeDir, nil - } - } - return volumeDir, errVolumeNotFound - } else if os.IsPermission(err) { - return volumeDir, errVolumeAccessDenied - } - return volumeDir, err + return volumeDir, nil } // Make a volume entry. @@ -227,20 +184,17 @@ func (s fsStorage) MakeVol(volume string) (err error) { return err } - volumeDir, err := s.getVolumeDir(volume) - if err == nil { - // Volume already exists, return error. + volumeDir, err := s.getVolDir(volume) + if err != nil { + return err + } + // Make a volume entry. + err = os.Mkdir(volumeDir, 0700) + if err != nil && os.IsExist(err) { return errVolumeExists } - - // If volume not found create it. - if err == errVolumeNotFound { - // Make a volume entry. - return os.Mkdir(volumeDir, 0700) - } - - // For all other errors return here. - return err + // Success + return nil } // ListVols - list volumes. @@ -251,7 +205,7 @@ func (s fsStorage) ListVols() (volsInfo []VolInfo, err error) { if err != nil { return nil, err } - volsInfo, err = getAllUniqueVols(s.diskPath) + volsInfo, err = listVols(s.diskPath) if err != nil { return nil, err } @@ -275,7 +229,7 @@ func (s fsStorage) ListVols() (volsInfo []VolInfo, err error) { // StatVol - get volume info. func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) { // Verify if volume is valid and it exists. - volumeDir, err := s.getVolumeDir(volume) + volumeDir, err := s.getVolDir(volume) if err != nil { return VolInfo{}, err } @@ -309,7 +263,7 @@ func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) { // DeleteVol - delete a volume. func (s fsStorage) DeleteVol(volume string) error { // Verify if volume is valid and it exists. - volumeDir, err := s.getVolumeDir(volume) + volumeDir, err := s.getVolDir(volume) if err != nil { return err } @@ -321,8 +275,7 @@ func (s fsStorage) DeleteVol(volume string) error { // On windows the string is slightly different, handle it here. return errVolumeNotEmpty } else if strings.Contains(err.Error(), "directory not empty") { - // Hopefully for all other - // operating systems, this is + // Hopefully for all other operating systems, this is // assumed to be consistent. return errVolumeNotEmpty } @@ -335,7 +288,7 @@ func (s fsStorage) DeleteVol(volume string) error { // If an entry is a directory it will be returned with a trailing "/". func (s fsStorage) ListDir(volume, dirPath string) ([]string, error) { // Verify if volume is valid and it exists. - volumeDir, err := s.getVolumeDir(volume) + volumeDir, err := s.getVolDir(volume) if err != nil { return nil, err } @@ -352,10 +305,18 @@ func (s fsStorage) ListDir(volume, dirPath string) ([]string, error) { // ReadFile - read a file at a given offset. func (s fsStorage) ReadFile(volume string, path string, offset int64) (readCloser io.ReadCloser, err error) { - volumeDir, err := s.getVolumeDir(volume) + volumeDir, err := s.getVolDir(volume) if err != nil { return nil, err } + // Stat a volume entry. + _, err = os.Stat(volumeDir) + if err != nil { + if os.IsNotExist(err) { + return nil, errVolumeNotFound + } + return nil, err + } filePath := pathJoin(volumeDir, path) if err = checkPathLength(filePath); err != nil { @@ -388,10 +349,18 @@ func (s fsStorage) ReadFile(volume string, path string, offset int64) (readClose // CreateFile - create a file at path. func (s fsStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) { - volumeDir, err := s.getVolumeDir(volume) + volumeDir, err := s.getVolDir(volume) if err != nil { return nil, err } + // Stat a volume entry. + _, err = os.Stat(volumeDir) + if err != nil { + if os.IsNotExist(err) { + return nil, errVolumeNotFound + } + return nil, err + } if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil { return nil, err } @@ -419,10 +388,18 @@ func (s fsStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, // StatFile - get file info. func (s fsStorage) StatFile(volume, path string) (file FileInfo, err error) { - volumeDir, err := s.getVolumeDir(volume) + volumeDir, err := s.getVolDir(volume) if err != nil { return FileInfo{}, err } + // Stat a volume entry. + _, err = os.Stat(volumeDir) + if err != nil { + if os.IsNotExist(err) { + return FileInfo{}, errVolumeNotFound + } + return FileInfo{}, err + } filePath := slashpath.Join(volumeDir, path) if err = checkPathLength(filePath); err != nil { @@ -489,10 +466,18 @@ func deleteFile(basePath, deletePath string) error { // DeleteFile - delete a file at path. func (s fsStorage) DeleteFile(volume, path string) error { - volumeDir, err := s.getVolumeDir(volume) + volumeDir, err := s.getVolDir(volume) if err != nil { return err } + // Stat a volume entry. + _, err = os.Stat(volumeDir) + if err != nil { + if os.IsNotExist(err) { + return errVolumeNotFound + } + return err + } // Following code is needed so that we retain "/" suffix if any in // path argument. @@ -507,14 +492,29 @@ func (s fsStorage) DeleteFile(volume, path string) error { // RenameFile - rename file. func (s fsStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error { - srcVolumeDir, err := s.getVolumeDir(srcVolume) + srcVolumeDir, err := s.getVolDir(srcVolume) if err != nil { return err } - dstVolumeDir, err := s.getVolumeDir(dstVolume) + dstVolumeDir, err := s.getVolDir(dstVolume) if err != nil { return err } + // Stat a volume entry. + _, err = os.Stat(srcVolumeDir) + if err != nil { + if os.IsNotExist(err) { + return errVolumeNotFound + } + return err + } + _, err = os.Stat(dstVolumeDir) + if err != nil { + if os.IsNotExist(err) { + return errVolumeNotFound + } + } + srcIsDir := strings.HasSuffix(srcPath, slashSeparator) dstIsDir := strings.HasSuffix(dstPath, slashSeparator) // Either src and dst have to be directories or files, else return error. diff --git a/xl-erasure-v1-common.go b/xl-erasure-v1-common.go index 20ff66d57..663c26878 100644 --- a/xl-erasure-v1-common.go +++ b/xl-erasure-v1-common.go @@ -46,25 +46,32 @@ func listFileVersions(partsMetadata []xlMetaV1, errs []error) (versions []int64) return versions } -// errsToStorageErr - convert collection of errors into a single +// reduceError - convert collection of errors into a single // error based on total errors and read quorum. -func (xl XL) errsToStorageErr(errs []error) error { - notFoundCount := 0 +func (xl XL) reduceError(errs []error) error { + fileNotFoundCount := 0 diskNotFoundCount := 0 + volumeNotFoundCount := 0 diskAccessDeniedCount := 0 for _, err := range errs { if err == errFileNotFound { - notFoundCount++ + fileNotFoundCount++ } else if err == errDiskNotFound { diskNotFoundCount++ } else if err == errVolumeAccessDenied { diskAccessDeniedCount++ + } else if err == errVolumeNotFound { + volumeNotFoundCount++ } } // If we have errors with 'file not found' greater than - // readQuroum, return as errFileNotFound. - if notFoundCount > len(xl.storageDisks)-xl.readQuorum { + // readQuorum, return as errFileNotFound. + // else if we have errors with 'volume not found' greater than + // readQuorum, return as errVolumeNotFound. + if fileNotFoundCount > len(xl.storageDisks)-xl.readQuorum { return errFileNotFound + } else if volumeNotFoundCount > len(xl.storageDisks)-xl.readQuorum { + return errVolumeNotFound } // If we have errors with disk not found equal to the // number of disks, return as errDiskNotFound. @@ -72,7 +79,7 @@ func (xl XL) errsToStorageErr(errs []error) error { return errDiskNotFound } else if diskNotFoundCount > len(xl.storageDisks)-xl.readQuorum { // If we have errors with 'disk not found' greater than - // readQuroum, return as errFileNotFound. + // readQuorum, return as errFileNotFound. return errFileNotFound } // If we have errors with disk not found equal to the @@ -90,7 +97,7 @@ func (xl XL) errsToStorageErr(errs []error) error { // - error if any. func (xl XL) listOnlineDisks(volume, path string) (onlineDisks []StorageAPI, mdata xlMetaV1, heal bool, err error) { partsMetadata, errs := xl.getPartsMetadata(volume, path) - if err = xl.errsToStorageErr(errs); err != nil { + if err = xl.reduceError(errs); err != nil { return nil, xlMetaV1{}, false, err } highestVersion := int64(0) diff --git a/xl-erasure-v1-createfile.go b/xl-erasure-v1-createfile.go index 2a0e3de78..2ee6881f6 100644 --- a/xl-erasure-v1-createfile.go +++ b/xl-erasure-v1-createfile.go @@ -60,7 +60,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w // Convert errs into meaningful err to be sent upwards if possible // based on total number of errors and read quorum. - err := xl.errsToStorageErr(errs) + err := xl.reduceError(errs) if err != nil && err != errFileNotFound { reader.CloseWithError(err) return @@ -92,6 +92,10 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) return + } else if err == errVolumeNotFound { + xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) + reader.CloseWithError(err) + return } createFileError++ diff --git a/xl-erasure-v1.go b/xl-erasure-v1.go index 1c0cc512e..b024836b7 100644 --- a/xl-erasure-v1.go +++ b/xl-erasure-v1.go @@ -290,9 +290,9 @@ func (xl XL) ListVols() (volsInfo []VolInfo, err error) { return volsInfo, nil } -// getAllVolumeInfo - get bucket volume info from all disks. +// getAllVolInfo - list bucket volume info from all disks. // Returns error slice indicating the failed volume stat operations. -func (xl XL) getAllVolumeInfo(volume string) (volsInfo []VolInfo, errs []error) { +func (xl XL) getAllVolInfo(volume string) (volsInfo []VolInfo, errs []error) { // Create errs and volInfo slices of storageDisks size. errs = make([]error, len(xl.storageDisks)) volsInfo = make([]VolInfo, len(xl.storageDisks)) @@ -320,13 +320,14 @@ func (xl XL) getAllVolumeInfo(volume string) (volsInfo []VolInfo, errs []error) return volsInfo, errs } -// listAllVolumeInfo - list all stat volume info from all disks. +// listAllVolInfo - list all stat volume info from all disks. // Returns // - stat volume info for all online disks. // - boolean to indicate if healing is necessary. // - error if any. -func (xl XL) listAllVolumeInfo(volume string) ([]VolInfo, bool, error) { - volsInfo, errs := xl.getAllVolumeInfo(volume) +func (xl XL) listAllVolInfo(volume string) ([]VolInfo, bool, error) { + volsInfo, errs := xl.getAllVolInfo(volume) + volsInfo = removeDuplicateVols(volsInfo) notFoundCount := 0 for _, err := range errs { if err == errVolumeNotFound { @@ -373,7 +374,7 @@ func (xl XL) healVolume(volume string) error { defer nsMutex.RUnlock(volume, "") // Lists volume info for all online disks. - volsInfo, heal, err := xl.listAllVolumeInfo(volume) + volsInfo, heal, err := xl.listAllVolInfo(volume) if err != nil { return err } @@ -393,6 +394,26 @@ func (xl XL) healVolume(volume string) error { return nil } +// Removes any duplicate vols. +func removeDuplicateVols(volsInfo []VolInfo) []VolInfo { + // Use map to record duplicates as we find them. + result := []VolInfo{} + + m := make(map[string]VolInfo) + for _, v := range volsInfo { + if _, found := m[v.Name]; !found { + m[v.Name] = v + } + } + + result = make([]VolInfo, 0, len(m)) + for _, v := range m { + result = append(result, v) + } + // Return the new slice. + return result +} + // StatVol - get volume stat info. func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) { if !isValidVolname(volume) { @@ -401,7 +422,7 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) { // Acquire a read lock before reading. nsMutex.RLock(volume, "") - volsInfo, heal, err := xl.listAllVolumeInfo(volume) + volsInfo, heal, err := xl.listAllVolInfo(volume) nsMutex.RUnlock(volume, "") if err != nil { return VolInfo{}, err @@ -421,7 +442,7 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) { total += volInfo.Total } // Filter volsInfo and update the volInfo. - volInfo = removeDuplicateVols(volsInfo)[0] + volInfo = volsInfo[0] volInfo.Free = free volInfo.Total = total return volInfo, nil @@ -590,7 +611,6 @@ func (xl XL) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error { if errCount <= len(xl.storageDisks)-xl.writeQuorum { continue } - return err } } diff --git a/xl-objects.go b/xl-objects.go index 3792666d6..579039e3b 100644 --- a/xl-objects.go +++ b/xl-objects.go @@ -146,9 +146,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read if !IsValidBucketName(bucket) { return nil, BucketNameInvalid{Bucket: bucket} } - if !isBucketExist(xl.storage, bucket) { - return nil, BucketNotFound{Bucket: bucket} - } // Verify if object is valid. if !IsValidObjectName(object) { return nil, ObjectNameInvalid{Bucket: bucket, Object: object} @@ -261,10 +258,6 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { if !IsValidBucketName(bucket) { return ObjectInfo{}, BucketNameInvalid{Bucket: bucket} } - // Check whether the bucket exists. - if !isBucketExist(xl.storage, bucket) { - return ObjectInfo{}, BucketNotFound{Bucket: bucket} - } // Verify if object is valid. if !IsValidObjectName(object) { return ObjectInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} @@ -282,7 +275,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. if !IsValidBucketName(bucket) { return "", BucketNameInvalid{Bucket: bucket} } - // Check whether the bucket exists. + // Verify bucket exists. if !isBucketExist(xl.storage, bucket) { return "", BucketNotFound{Bucket: bucket} } @@ -433,9 +426,6 @@ func (xl xlObjects) DeleteObject(bucket, object string) error { if !IsValidBucketName(bucket) { return BucketNameInvalid{Bucket: bucket} } - if !isBucketExist(xl.storage, bucket) { - return BucketNotFound{Bucket: bucket} - } if !IsValidObjectName(object) { return ObjectNameInvalid{Bucket: bucket, Object: object} }