posix: return errFaultyDisk on I/O errors. (#1885)

When I/O error is occured more than allowed limit, posix returns
errFaultyDisk.

Fixes #1884
This commit is contained in:
Bala FA 2016-06-09 10:32:10 +05:30 committed by Harshavardhana
parent 1b9db9ee6c
commit 61598ed02f
9 changed files with 132 additions and 27 deletions

View file

@ -83,7 +83,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
// For any walk error return right away.
if walkResult.err != nil {
// File not found or Disk not found is a valid case.
if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound {
if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound || walkResult.err == errFaultyDisk {
eof = true
break
}

View file

@ -32,7 +32,7 @@ func fsHouseKeeping(storageDisk StorageAPI) error {
// Attempt to create `.minio`.
err := storageDisk.MakeVol(minioMetaBucket)
if err != nil {
if err != errVolumeExists && err != errDiskNotFound {
if err != errVolumeExists && err != errDiskNotFound && err != errFaultyDisk {
return err
}
}
@ -77,7 +77,7 @@ func xlHouseKeeping(storageDisks []StorageAPI) error {
// Attempt to create `.minio`.
err := disk.MakeVol(minioMetaBucket)
if err != nil && err != errVolumeExists && err != errDiskNotFound {
if err != nil && err != errVolumeExists && err != errDiskNotFound && err != errFaultyDisk {
errs[index] = err
return
}

121
posix.go
View file

@ -18,6 +18,7 @@ package main
import (
"bytes"
"errors"
"io"
"os"
slashpath "path"
@ -31,14 +32,18 @@ import (
const (
fsMinSpacePercent = 5
maxAllowedIOError = 5
)
// posix - implements StorageAPI interface.
type posix struct {
diskPath string
minFreeDisk int64
ioErrCount int
}
var errFaultyDisk = errors.New("Faulty disk")
// checkPathLength - returns error if given path name length more than 255
func checkPathLength(pathName string) error {
// For MS Windows, the maximum path length is 255
@ -183,6 +188,16 @@ func (s posix) getVolDir(volume string) (string, error) {
// Make a volume entry.
func (s posix) MakeVol(volume string) (err error) {
defer func() {
if err == syscall.EIO {
s.ioErrCount++
}
}()
if s.ioErrCount > maxAllowedIOError {
return errFaultyDisk
}
// Validate if disk is free.
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return err
@ -203,6 +218,16 @@ func (s posix) MakeVol(volume string) (err error) {
// ListVols - list volumes.
func (s posix) ListVols() (volsInfo []VolInfo, err error) {
defer func() {
if err == syscall.EIO {
s.ioErrCount++
}
}()
if s.ioErrCount > maxAllowedIOError {
return nil, errFaultyDisk
}
volsInfo, err = listVols(s.diskPath)
if err != nil {
return nil, err
@ -219,6 +244,16 @@ func (s posix) ListVols() (volsInfo []VolInfo, err error) {
// StatVol - get volume info.
func (s posix) StatVol(volume string) (volInfo VolInfo, err error) {
defer func() {
if err == syscall.EIO {
s.ioErrCount++
}
}()
if s.ioErrCount > maxAllowedIOError {
return VolInfo{}, errFaultyDisk
}
// Validate if disk is free.
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return VolInfo{}, err
@ -248,9 +283,19 @@ func (s posix) StatVol(volume string) (volInfo VolInfo, err error) {
}
// DeleteVol - delete a volume.
func (s posix) DeleteVol(volume string) error {
func (s posix) DeleteVol(volume string) (err error) {
defer func() {
if err == syscall.EIO {
s.ioErrCount++
}
}()
if s.ioErrCount > maxAllowedIOError {
return errFaultyDisk
}
// Validate if disk is free.
if err := checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return err
}
@ -278,9 +323,19 @@ func (s posix) DeleteVol(volume string) error {
// ListDir - return all the entries at the given directory path.
// If an entry is a directory it will be returned with a trailing "/".
func (s posix) ListDir(volume, dirPath string) ([]string, error) {
func (s posix) ListDir(volume, dirPath string) (entries []string, err error) {
defer func() {
if err == syscall.EIO {
s.ioErrCount++
}
}()
if s.ioErrCount > maxAllowedIOError {
return nil, errFaultyDisk
}
// Validate if disk is free.
if err := checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return nil, err
}
@ -306,6 +361,16 @@ func (s posix) ListDir(volume, dirPath string) ([]string, error) {
// for io.EOF. Additionally ReadFile also starts reading from an
// offset.
func (s posix) ReadFile(volume string, path string, offset int64, buf []byte) (n int64, err error) {
defer func() {
if err == syscall.EIO {
s.ioErrCount++
}
}()
if s.ioErrCount > maxAllowedIOError {
return 0, errFaultyDisk
}
// Validate if disk is free.
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return 0, err
@ -371,6 +436,16 @@ func (s posix) ReadFile(volume string, path string, offset int64, buf []byte) (n
// AppendFile - append a byte array at path, if file doesn't exist at
// path this call explicitly creates it.
func (s posix) AppendFile(volume, path string, buf []byte) (n int64, err error) {
defer func() {
if err == syscall.EIO {
s.ioErrCount++
}
}()
if s.ioErrCount > maxAllowedIOError {
return 0, errFaultyDisk
}
// Validate if disk is free.
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return 0, err
@ -423,6 +498,16 @@ func (s posix) AppendFile(volume, path string, buf []byte) (n int64, err error)
// StatFile - get file info.
func (s posix) StatFile(volume, path string) (file FileInfo, err error) {
defer func() {
if err == syscall.EIO {
s.ioErrCount++
}
}()
if s.ioErrCount > maxAllowedIOError {
return FileInfo{}, errFaultyDisk
}
// Validate if disk is free.
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return FileInfo{}, err
@ -504,9 +589,19 @@ func deleteFile(basePath, deletePath string) error {
}
// DeleteFile - delete a file at path.
func (s posix) DeleteFile(volume, path string) error {
func (s posix) DeleteFile(volume, path string) (err error) {
defer func() {
if err == syscall.EIO {
s.ioErrCount++
}
}()
if s.ioErrCount > maxAllowedIOError {
return errFaultyDisk
}
// Validate if disk is free.
if err := checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return err
}
@ -535,9 +630,19 @@ func (s posix) DeleteFile(volume, path string) error {
}
// RenameFile - rename source path to destination path atomically.
func (s posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error {
func (s posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) {
defer func() {
if err == syscall.EIO {
s.ioErrCount++
}
}()
if s.ioErrCount > maxAllowedIOError {
return errFaultyDisk
}
// Validate if disk is free.
if err := checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return err
}

View file

@ -44,7 +44,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string)
if err != nil {
// For any reason disk was deleted or goes offline, continue
// and list form other disks if possible.
if err == errDiskNotFound {
if err == errDiskNotFound || err == errFaultyDisk {
continue
}
break

View file

@ -98,7 +98,7 @@ func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err
volInfo, err = disk.StatVol(bucketName)
if err != nil {
// For any reason disk went offline continue and pick the next one.
if err == errDiskNotFound {
if err == errDiskNotFound || err == errFaultyDisk {
continue
}
return BucketInfo{}, err
@ -153,7 +153,7 @@ func (xl xlObjects) listBuckets() (bucketsInfo []BucketInfo, err error) {
var volsInfo []VolInfo
volsInfo, err = disk.ListVols()
// Ignore any disks not found.
if err == errDiskNotFound {
if err == errDiskNotFound || err == errFaultyDisk {
continue
}
if err == nil {
@ -231,8 +231,8 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
// an unknown error.
for _, err := range dErrs {
if err != nil {
// We ignore error if errVolumeNotFound or errDiskNotFound
if err == errVolumeNotFound || err == errDiskNotFound {
// We ignore error if errVolumeNotFound, errDiskNotFound or errFaultyDisk
if err == errVolumeNotFound || err == errDiskNotFound || err == errFaultyDisk {
volumeNotFoundErrCnt++
continue
}

View file

@ -66,8 +66,8 @@ func (xl xlObjects) isObject(bucket, prefix string) (ok bool) {
if err == nil {
return true
}
// Ignore for file not found and disk not found.
if err == errFileNotFound || err == errDiskNotFound {
// Ignore for file not found, disk not found or faulty disk.
if err == errFileNotFound || err == errDiskNotFound || err == errFaultyDisk {
continue
}
errorIf(err, "Unable to stat a file %s/%s/%s", bucket, prefix, xlMetaJSONFile)

View file

@ -210,7 +210,7 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err
buf, err = readAll(disk, bucket, path.Join(object, xlMetaJSONFile))
if err != nil {
// For any reason disk is not available continue and read from other disks.
if err == errDiskNotFound {
if err == errDiskNotFound || err == errFaultyDisk {
continue
}
return xlMetaV1{}, err

View file

@ -364,7 +364,7 @@ func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool {
_, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile))
if err != nil {
// For any reason disk was deleted or goes offline, continue
if err == errDiskNotFound {
if err == errDiskNotFound || err == errFaultyDisk {
continue
}
return false
@ -385,7 +385,7 @@ func (xl xlObjects) listUploadsInfo(prefixPath string) (uploadsInfo []uploadInfo
uploadsJSON, err = readUploadsJSON(splitPrefixes[1], splitPrefixes[2], disk)
if err != nil {
// For any reason disk was deleted or goes offline, continue
if err == errDiskNotFound {
if err == errDiskNotFound || err == errFaultyDisk {
continue
}
if err == errFileNotFound {

View file

@ -68,7 +68,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
continue
}
uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, disk)
if err == errDiskNotFound {
if err == errDiskNotFound || err == errFaultyDisk {
continue
}
break
@ -97,7 +97,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
// For any walk error return right away.
if walkResult.err != nil {
// File not found or Disk not found is a valid case.
if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound {
if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound || walkResult.err == errFaultyDisk {
continue
}
return ListMultipartsInfo{}, err
@ -127,14 +127,14 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
continue
}
newUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, disk)
if err == errDiskNotFound {
if err == errDiskNotFound || err == errFaultyDisk {
continue
}
break
}
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry))
if err != nil {
if err == errFileNotFound || walkResult.err == errDiskNotFound {
if err == errFileNotFound || walkResult.err == errDiskNotFound || walkResult.err == errFaultyDisk {
continue
}
return ListMultipartsInfo{}, err
@ -671,7 +671,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
continue
}
uploadsJSON, err = readUploadsJSON(bucket, object, disk)
if err == errDiskNotFound {
if err == errDiskNotFound || err == errFaultyDisk {
continue
}
break
@ -723,7 +723,7 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e
continue
}
uploadsJSON, err = readUploadsJSON(bucket, object, disk)
if err == errDiskNotFound {
if err == errDiskNotFound || err == errFaultyDisk {
continue
}
break