erasure: Handle failed disks so that we initialize properly if they are missing. (#1607)

Fixes #1592
Fixes #1579
This commit is contained in:
Harshavardhana 2016-05-11 18:58:32 -07:00
parent d4745c7d6a
commit 50431e91a6
6 changed files with 52 additions and 20 deletions

View file

@ -93,25 +93,25 @@ func newPosix(diskPath string) (StorageAPI, error) {
log.Error("Disk cannot be empty") log.Error("Disk cannot be empty")
return nil, errInvalidArgument return nil, errInvalidArgument
} }
if err := checkPathLength(diskPath); err != nil { fs := fsStorage{
return nil, err diskPath: diskPath,
minFreeDisk: fsMinSpacePercent, // Minimum 5% disk should be free.
} }
st, err := os.Stat(diskPath) st, err := os.Stat(diskPath)
if err != nil { if err != nil {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"diskPath": diskPath, "diskPath": diskPath,
}).Debugf("Stat failed, with error %s.", err) }).Debugf("Stat failed, with error %s.", err)
return nil, err if os.IsNotExist(err) {
return fs, errDiskNotFound
}
return fs, err
} }
if !st.IsDir() { if !st.IsDir() {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"diskPath": diskPath, "diskPath": diskPath,
}).Debugf("Disk %s.", syscall.ENOTDIR) }).Debugf("Disk %s.", syscall.ENOTDIR)
return nil, syscall.ENOTDIR return fs, syscall.ENOTDIR
}
fs := fsStorage{
diskPath: diskPath,
minFreeDisk: fsMinSpacePercent, // Minimum 5% disk should be free.
} }
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"diskPath": diskPath, "diskPath": diskPath,

View file

@ -129,7 +129,7 @@ func (s *storageServer) RenameFileHandler(arg *RenameFileArgs, reply *GenericRep
func newRPCServer(exportPath string) (*storageServer, error) { func newRPCServer(exportPath string) (*storageServer, error) {
// Initialize posix storage API. // Initialize posix storage API.
storage, err := newPosix(exportPath) storage, err := newPosix(exportPath)
if err != nil { if err != nil && err != errDiskNotFound {
return nil, err return nil, err
} }
return &storageServer{ return &storageServer{

View file

@ -56,18 +56,24 @@ func listFileVersions(partsMetadata []xlMetaV1, errs []error) (versions []int64)
func (xl XL) listOnlineDisks(volume, path string) (onlineDisks []StorageAPI, mdata xlMetaV1, heal bool, err error) { func (xl XL) listOnlineDisks(volume, path string) (onlineDisks []StorageAPI, mdata xlMetaV1, heal bool, err error) {
partsMetadata, errs := xl.getPartsMetadata(volume, path) partsMetadata, errs := xl.getPartsMetadata(volume, path)
notFoundCount := 0 notFoundCount := 0
// FIXME: take care of the situation when a disk has failed and been removed diskNotFoundCount := 0
// by looking at the error returned from the fs layer. fs-layer will have
// to return an error indicating that the disk is not available and should be
// different from ErrNotExist.
for _, err := range errs { for _, err := range errs {
if err == errFileNotFound { if err == errFileNotFound {
notFoundCount++ notFoundCount++
// If we have errors with file not found equal to the number of disks. // If we have errors with file not found greater than
// writeQuroum, return as errFileNotFound.
if notFoundCount > len(xl.storageDisks)-xl.readQuorum { if notFoundCount > len(xl.storageDisks)-xl.readQuorum {
return nil, xlMetaV1{}, false, errFileNotFound return nil, xlMetaV1{}, false, errFileNotFound
} }
} }
if err == errDiskNotFound {
diskNotFoundCount++
// If we have errors with disk not found equal to the
// number of disks, return as errDiskNotFound.
if diskNotFoundCount == len(xl.storageDisks) {
return nil, xlMetaV1{}, false, errDiskNotFound
}
}
} }
highestVersion := int64(0) highestVersion := int64(0)

View file

@ -19,10 +19,13 @@ package main
import "errors" import "errors"
// errMaxDisks - returned for reached maximum of disks. // errMaxDisks - returned for reached maximum of disks.
var errMaxDisks = errors.New("Total number of disks specified is higher than supported maximum of '16'") var errMaxDisks = errors.New("Number of disks are higher than supported maximum count '16'")
// errNumDisks - returned for odd numebr of disks. // errMinDisks - returned for minimum number of disks.
var errNumDisks = errors.New("Invalid number of disks provided, should be always multiples of '2'") var errMinDisks = errors.New("Number of disks are smaller than supported minimum count '8'")
// errNumDisks - returned for odd number of disks.
var errNumDisks = errors.New("Number of disks should be multiples of '2'")
// errUnexpected - returned for any unexpected error. // errUnexpected - returned for any unexpected error.
var errUnexpected = errors.New("Unexpected error - please report at https://github.com/minio/minio/issues") var errUnexpected = errors.New("Unexpected error - please report at https://github.com/minio/minio/issues")

View file

@ -34,6 +34,8 @@ const (
xlMetaV1File = "file.json" xlMetaV1File = "file.json"
// Maximum erasure blocks. // Maximum erasure blocks.
maxErasureBlocks = 16 maxErasureBlocks = 16
// Minimum erasure blocks.
minErasureBlocks = 8
) )
// XL layer structure. // XL layer structure.
@ -51,18 +53,22 @@ func newXL(disks ...string) (StorageAPI, error) {
// Initialize XL. // Initialize XL.
xl := &XL{} xl := &XL{}
// Verify disks. // Verify total number of disks.
totalDisks := len(disks) totalDisks := len(disks)
if totalDisks > maxErasureBlocks { if totalDisks > maxErasureBlocks {
return nil, errMaxDisks return nil, errMaxDisks
} }
if totalDisks < minErasureBlocks {
return nil, errMinDisks
}
// isEven function to verify if a given number if even. // isEven function to verify if a given number if even.
isEven := func(number int) bool { isEven := func(number int) bool {
return number%2 == 0 return number%2 == 0
} }
// TODO: verify if this makes sense in future. // Verify if we have even number of disks.
// only combination of 8, 10, 12, 14, 16 are supported.
if !isEven(totalDisks) { if !isEven(totalDisks) {
return nil, errNumDisks return nil, errNumDisks
} }
@ -85,8 +91,13 @@ func newXL(disks ...string) (StorageAPI, error) {
storageDisks := make([]StorageAPI, len(disks)) storageDisks := make([]StorageAPI, len(disks))
for index, disk := range disks { for index, disk := range disks {
var err error var err error
// Intentionally ignore disk not found errors while
// initializing POSIX, so that we have successfully
// initialized posix Storage.
// Subsequent calls to XL/Erasure will manage any errors
// related to disks.
storageDisks[index], err = newPosix(disk) storageDisks[index], err = newPosix(disk)
if err != nil { if err != nil && err != errDiskNotFound {
return nil, err return nil, err
} }
} }
@ -153,6 +164,9 @@ func (xl XL) MakeVol(volume string) error {
// Make a volume entry on all underlying storage disks. // Make a volume entry on all underlying storage disks.
for index, disk := range xl.storageDisks { for index, disk := range xl.storageDisks {
if disk == nil {
continue
}
wg.Add(1) wg.Add(1)
// Make a volume inside a go-routine. // Make a volume inside a go-routine.
go func(index int, disk StorageAPI) { go func(index int, disk StorageAPI) {

View file

@ -18,6 +18,7 @@ package main
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"path/filepath" "path/filepath"
@ -91,6 +92,14 @@ func newXLObjects(exportPaths ...string) (ObjectLayer, error) {
} }
} else { } else {
log.Errorf("Unable to check backend format %s", err) log.Errorf("Unable to check backend format %s", err)
if err == errReadQuorum {
errMsg := fmt.Sprintf("Not all disks %s on command line are available to meet the read quroum.", exportPaths)
return nil, errors.New(errMsg)
}
if err == errDiskNotFound {
errMsg := fmt.Sprintf("All disks %s on command line are not available.", exportPaths)
return nil, errors.New(errMsg)
}
return nil, err return nil, err
} }
} }