mirror of
https://github.com/minio/minio
synced 2024-11-05 17:34:01 +00:00
XL: Implement ignore errors. (#2136)
Each metadata ops have a list of errors which can be ignored, this is essentially needed when - disks are not found - disks are found but cannot be accessed (permission denied) - disks are there but fresh disks were added This is needed since we don't have healing code in place where it would have healed the fresh disks added. Fixes #2072
This commit is contained in:
parent
4c21d6d09d
commit
ca1b1921c4
8 changed files with 122 additions and 76 deletions
|
@ -85,7 +85,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
|
|||
// For any walk error return right away.
|
||||
if walkResult.err != nil {
|
||||
// File not found or Disk not found is a valid case.
|
||||
if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound || walkResult.err == errFaultyDisk {
|
||||
if isErrIgnored(walkResult.err, walkResultIgnoredErrs) {
|
||||
eof = true
|
||||
break
|
||||
}
|
||||
|
|
|
@ -43,6 +43,16 @@ func registerShutdown(callback func()) {
|
|||
}()
|
||||
}
|
||||
|
||||
// isErrIgnored should we ignore this error?, takes a list of errors which can be ignored.
|
||||
func isErrIgnored(err error, ignoredErrs []error) bool {
|
||||
for _, ignoredErr := range ignoredErrs {
|
||||
if ignoredErr == err {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// House keeping code needed for FS.
|
||||
func fsHouseKeeping(storageDisk StorageAPI) error {
|
||||
// Cleanup all temp entries upon start.
|
||||
|
|
50
tree-walk.go
50
tree-walk.go
|
@ -21,6 +21,15 @@ import (
|
|||
"strings"
|
||||
)
|
||||
|
||||
// list of all errors that can be ignored in tree walk operation.
|
||||
var walkResultIgnoredErrs = []error{
|
||||
errFileNotFound,
|
||||
errVolumeNotFound,
|
||||
errDiskNotFound,
|
||||
errDiskAccessDenied,
|
||||
errFaultyDisk,
|
||||
}
|
||||
|
||||
// Tree walk result carries results of tree walking.
|
||||
type treeWalkResult struct {
|
||||
entry string
|
||||
|
@ -48,32 +57,31 @@ func listDirFactory(isLeaf func(string, string) bool, disks ...StorageAPI) listD
|
|||
continue
|
||||
}
|
||||
entries, err = disk.ListDir(bucket, prefixDir)
|
||||
if err != nil {
|
||||
// For any reason disk was deleted or goes offline, continue
|
||||
// and list from other disks if possible.
|
||||
if err == errDiskNotFound || err == errFaultyDisk {
|
||||
continue
|
||||
if err == nil {
|
||||
// Skip the entries which do not match the prefixEntry.
|
||||
for i, entry := range entries {
|
||||
if !strings.HasPrefix(entry, prefixEntry) {
|
||||
entries[i] = ""
|
||||
continue
|
||||
}
|
||||
if isLeaf(bucket, pathJoin(prefixDir, entry)) {
|
||||
entries[i] = strings.TrimSuffix(entry, slashSeparator)
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
// Skip the entries which do not match the prefixEntry.
|
||||
for i, entry := range entries {
|
||||
if !strings.HasPrefix(entry, prefixEntry) {
|
||||
entries[i] = ""
|
||||
continue
|
||||
}
|
||||
if isLeaf(bucket, pathJoin(prefixDir, entry)) {
|
||||
entries[i] = strings.TrimSuffix(entry, slashSeparator)
|
||||
sort.Strings(entries)
|
||||
// Skip the empty strings
|
||||
for len(entries) > 0 && entries[0] == "" {
|
||||
entries = entries[1:]
|
||||
}
|
||||
return entries, nil
|
||||
}
|
||||
sort.Strings(entries)
|
||||
// Skip the empty strings
|
||||
for len(entries) > 0 && entries[0] == "" {
|
||||
entries = entries[1:]
|
||||
// For any reason disk was deleted or goes offline, continue
|
||||
// and list from other disks if possible.
|
||||
if isErrIgnored(err, walkResultIgnoredErrs) {
|
||||
continue
|
||||
}
|
||||
return entries, nil
|
||||
break
|
||||
}
|
||||
|
||||
// Return error at the end.
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -102,6 +102,13 @@ func (xl xlObjects) undoMakeBucket(bucket string) {
|
|||
wg.Wait()
|
||||
}
|
||||
|
||||
// list all errors that can be ignored in a bucket metadata operation.
|
||||
var bucketMetadataOpIgnoredErrs = []error{
|
||||
errDiskNotFound,
|
||||
errDiskAccessDenied,
|
||||
errFaultyDisk,
|
||||
}
|
||||
|
||||
// getBucketInfo - returns the BucketInfo from one of the load balanced disks.
|
||||
func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err error) {
|
||||
for _, disk := range xl.getLoadBalancedQuorumDisks() {
|
||||
|
@ -110,20 +117,20 @@ func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err
|
|||
}
|
||||
var volInfo VolInfo
|
||||
volInfo, err = disk.StatVol(bucketName)
|
||||
if err != nil {
|
||||
// For any reason disk went offline continue and pick the next one.
|
||||
if err == errDiskNotFound || err == errFaultyDisk {
|
||||
continue
|
||||
if err == nil {
|
||||
bucketInfo = BucketInfo{
|
||||
Name: volInfo.Name,
|
||||
Created: volInfo.Created,
|
||||
}
|
||||
return BucketInfo{}, err
|
||||
return bucketInfo, nil
|
||||
}
|
||||
bucketInfo = BucketInfo{
|
||||
Name: volInfo.Name,
|
||||
Created: volInfo.Created,
|
||||
// For any reason disk went offline continue and pick the next one.
|
||||
if isErrIgnored(err, bucketMetadataOpIgnoredErrs) {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
return bucketInfo, nil
|
||||
return BucketInfo{}, err
|
||||
}
|
||||
|
||||
// Checks whether bucket exists.
|
||||
|
@ -166,10 +173,6 @@ func (xl xlObjects) listBuckets() (bucketsInfo []BucketInfo, err error) {
|
|||
}
|
||||
var volsInfo []VolInfo
|
||||
volsInfo, err = disk.ListVols()
|
||||
// Ignore any disks not found.
|
||||
if err == errDiskNotFound || err == errFaultyDisk {
|
||||
continue
|
||||
}
|
||||
if err == nil {
|
||||
// NOTE: The assumption here is that volumes across all disks in
|
||||
// readQuorum have consistent view i.e they all have same number
|
||||
|
@ -189,6 +192,10 @@ func (xl xlObjects) listBuckets() (bucketsInfo []BucketInfo, err error) {
|
|||
}
|
||||
return bucketsInfo, nil
|
||||
}
|
||||
// Ignore any disks not found.
|
||||
if isErrIgnored(err, bucketMetadataOpIgnoredErrs) {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
return nil, err
|
||||
|
@ -245,8 +252,7 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
|
|||
// an unknown error.
|
||||
for _, err := range dErrs {
|
||||
if err != nil {
|
||||
// We ignore error if errVolumeNotFound, errDiskNotFound or errFaultyDisk
|
||||
if err == errVolumeNotFound || err == errDiskNotFound || err == errFaultyDisk {
|
||||
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
|
||||
volumeNotFoundErrCnt++
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ func (xl xlObjects) isObject(bucket, prefix string) (ok bool) {
|
|||
return true
|
||||
}
|
||||
// Ignore for file not found, disk not found or faulty disk.
|
||||
if err == errFileNotFound || err == errDiskNotFound || err == errFaultyDisk {
|
||||
if isErrIgnored(err, walkResultIgnoredErrs) {
|
||||
continue
|
||||
}
|
||||
errorIf(err, "Unable to stat a file %s/%s/%s", bucket, prefix, xlMetaJSONFile)
|
||||
|
|
|
@ -194,6 +194,14 @@ func pickValidXLMeta(xlMetas []xlMetaV1) xlMetaV1 {
|
|||
panic("Unable to look for valid XL metadata content")
|
||||
}
|
||||
|
||||
// list of all errors that can be ignored in a metadata operation.
|
||||
var objMetadataOpIgnoredErrs = []error{
|
||||
errDiskNotFound,
|
||||
errDiskAccessDenied,
|
||||
errFaultyDisk,
|
||||
errVolumeNotFound,
|
||||
}
|
||||
|
||||
// readXLMetadata - returns the object metadata `xl.json` content from
|
||||
// one of the disks picked at random.
|
||||
func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err error) {
|
||||
|
@ -202,16 +210,18 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err
|
|||
continue
|
||||
}
|
||||
xlMeta, err = readXLMeta(disk, bucket, object)
|
||||
if err != nil {
|
||||
// For any reason disk is not available continue and read from other disks.
|
||||
if err == errDiskNotFound || err == errFaultyDisk {
|
||||
continue
|
||||
}
|
||||
return xlMetaV1{}, err
|
||||
if err == nil {
|
||||
return xlMeta, nil
|
||||
}
|
||||
// For any reason disk or bucket is not available continue
|
||||
// and read from other disks.
|
||||
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
return xlMeta, nil
|
||||
// Return error here.
|
||||
return xlMetaV1{}, err
|
||||
}
|
||||
|
||||
// Undo rename xl metadata, renames successfully renamed `xl.json` back to source location.
|
||||
|
|
|
@ -178,16 +178,16 @@ func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool {
|
|||
continue
|
||||
}
|
||||
_, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile))
|
||||
if err != nil {
|
||||
// For any reason disk was deleted or goes offline, continue
|
||||
if err == errDiskNotFound || err == errFaultyDisk {
|
||||
continue
|
||||
}
|
||||
return false
|
||||
if err == nil {
|
||||
return true
|
||||
}
|
||||
// For any reason disk was deleted or goes offline, continue
|
||||
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
return true
|
||||
return false
|
||||
}
|
||||
|
||||
// listUploadsInfo - list all uploads info.
|
||||
|
@ -199,20 +199,20 @@ func (xl xlObjects) listUploadsInfo(prefixPath string) (uploadsInfo []uploadInfo
|
|||
splitPrefixes := strings.SplitN(prefixPath, "/", 3)
|
||||
var uploadsJSON uploadsV1
|
||||
uploadsJSON, err = readUploadsJSON(splitPrefixes[1], splitPrefixes[2], disk)
|
||||
if err != nil {
|
||||
// For any reason disk was deleted or goes offline, continue
|
||||
if err == errDiskNotFound || err == errFaultyDisk {
|
||||
continue
|
||||
}
|
||||
if err == errFileNotFound {
|
||||
return []uploadInfo{}, nil
|
||||
}
|
||||
return nil, err
|
||||
if err == nil {
|
||||
uploadsInfo = uploadsJSON.Uploads
|
||||
return uploadsInfo, nil
|
||||
}
|
||||
if err == errFileNotFound {
|
||||
return []uploadInfo{}, nil
|
||||
}
|
||||
// For any reason disk was deleted or goes offline, continue
|
||||
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
|
||||
continue
|
||||
}
|
||||
uploadsInfo = uploadsJSON.Uploads
|
||||
break
|
||||
}
|
||||
return uploadsInfo, nil
|
||||
return []uploadInfo{}, err
|
||||
}
|
||||
|
||||
// isUploadIDExists - verify if a given uploadID exists and is valid.
|
||||
|
@ -249,16 +249,16 @@ func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInf
|
|||
continue
|
||||
}
|
||||
fileInfo, err = disk.StatFile(minioMetaBucket, partNamePath)
|
||||
if err != nil {
|
||||
// For any reason disk was deleted or goes offline, continue
|
||||
if err == errDiskNotFound {
|
||||
continue
|
||||
}
|
||||
return FileInfo{}, err
|
||||
if err == nil {
|
||||
return fileInfo, nil
|
||||
}
|
||||
// For any reason disk was deleted or goes offline, continue
|
||||
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
return fileInfo, nil
|
||||
return FileInfo{}, err
|
||||
}
|
||||
|
||||
// commitXLMetadata - commit `xl.json` from source prefix to destination prefix.
|
||||
|
|
|
@ -68,7 +68,10 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
|
|||
continue
|
||||
}
|
||||
uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, disk)
|
||||
if err == errDiskNotFound || err == errFaultyDisk {
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
|
||||
continue
|
||||
}
|
||||
break
|
||||
|
@ -100,7 +103,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
|
|||
// For any walk error return right away.
|
||||
if walkResult.err != nil {
|
||||
// File not found or Disk not found is a valid case.
|
||||
if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound || walkResult.err == errFaultyDisk {
|
||||
if isErrIgnored(walkResult.err, walkResultIgnoredErrs) {
|
||||
continue
|
||||
}
|
||||
return ListMultipartsInfo{}, err
|
||||
|
@ -130,14 +133,17 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
|
|||
continue
|
||||
}
|
||||
newUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, disk)
|
||||
if err == errDiskNotFound || err == errFaultyDisk {
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry))
|
||||
if err != nil {
|
||||
if err == errFileNotFound || walkResult.err == errDiskNotFound || walkResult.err == errFaultyDisk {
|
||||
if isErrIgnored(err, walkResultIgnoredErrs) {
|
||||
continue
|
||||
}
|
||||
return ListMultipartsInfo{}, err
|
||||
|
@ -723,7 +729,10 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
|||
continue
|
||||
}
|
||||
uploadsJSON, err = readUploadsJSON(bucket, object, disk)
|
||||
if err == errDiskNotFound || err == errFaultyDisk {
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
|
||||
continue
|
||||
}
|
||||
break
|
||||
|
@ -774,7 +783,10 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e
|
|||
continue
|
||||
}
|
||||
uploadsJSON, err = readUploadsJSON(bucket, object, disk)
|
||||
if err == errDiskNotFound || err == errFaultyDisk {
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
|
||||
continue
|
||||
}
|
||||
break
|
||||
|
|
Loading…
Reference in a new issue