diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index d4197fa2c..164867693 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -138,6 +138,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ // Forward some errors? return nil } + diskHealthCheckOK(ctx, err) if len(entries) == 0 { return nil } @@ -179,6 +180,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ s.walkReadMu.Lock() meta.metadata, err = s.readMetadata(ctx, pathJoin(volumeDir, current, entry)) s.walkReadMu.Unlock() + diskHealthCheckOK(ctx, err) if err != nil { logger.LogIf(ctx, err) continue @@ -196,6 +198,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ s.walkReadMu.Lock() meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, current, entry)) s.walkReadMu.Unlock() + diskHealthCheckOK(ctx, err) if err != nil { logger.LogIf(ctx, err) continue @@ -257,6 +260,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ s.walkReadMu.Lock() meta.metadata, err = s.readMetadata(ctx, pathJoin(volumeDir, meta.name, xlStorageFormatFile)) s.walkReadMu.Unlock() + diskHealthCheckOK(ctx, err) switch { case err == nil: // It was an object @@ -266,6 +270,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ out <- meta case osIsNotExist(err), isSysErrIsDir(err): meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1)) + diskHealthCheckOK(ctx, err) if err == nil { // It was an object out <- meta @@ -303,16 +308,12 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ return scanDir(opts.BaseDir) } -func (p *xlStorageDiskIDCheck) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) error { - defer p.updateStorageMetrics(storageMetricWalkDir, opts.Bucket, opts.BaseDir)() - - if contextCanceled(ctx) { - return ctx.Err() - } - - if err := p.checkDiskStale(); err != nil { +func (p *xlStorageDiskIDCheck) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) (err error) { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricWalkDir, opts.Bucket, opts.BaseDir) + if err != nil { return err } + defer done(&err) return p.storage.WalkDir(ctx, opts, wr) } diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index b81dcef54..6f9cd159e 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -29,17 +29,40 @@ type StorageAPI interface { String() string // Storage operations. - IsOnline() bool // Returns true if disk is online. - LastConn() time.Time // Returns the last time this disk (re)-connected + // Returns true if disk is online and its valid i.e valid format.json. + // This has nothing to do with if the drive is hung or not responding. + // For that individual storage API calls will fail properly. The purpose + // of this function is to know if the "drive" has "format.json" or not + // if it has a "format.json" then is it correct "format.json" or not. + IsOnline() bool + + // Returns the last time this disk (re)-connected + LastConn() time.Time + + // Indicates if disk is local or not. IsLocal() bool - Hostname() string // Returns host name if remote host. - Endpoint() Endpoint // Returns endpoint. + // Returns hostname if disk is remote. + Hostname() string + + // Returns the entire endpoint. + Endpoint() Endpoint + + // Close the disk, mark it purposefully closed, only implemented for remote disks. Close() error + + // Returns the unique 'uuid' of this disk. GetDiskID() (string, error) + + // Set a unique 'uuid' for this disk, only used when + // disk is replaced and formatted. SetDiskID(id string) - Healing() *healingTracker // Returns nil if disk is not healing. + + // Returns healing information for a newly replaced disk, + // returns 'nil' once healing is complete or if the disk + // has never been replaced. + Healing() *healingTracker DiskInfo(ctx context.Context) (info DiskInfo, err error) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error) diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 99a293a7d..8708d6cd4 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -52,7 +52,7 @@ var errDiskStale = errors.New("disk stale") // To abstract a disk over network. type storageRESTServer struct { - storage *xlStorage + storage *xlStorageDiskIDCheck } func (s *storageRESTServer) writeErrorResponse(w http.ResponseWriter, err error) { @@ -1233,7 +1233,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin endpoint := storage.Endpoint() - server := &storageRESTServer{storage: storage} + server := &storageRESTServer{storage: newXLStorageDiskIDCheck(storage)} + server.storage.SetDiskID(storage.diskID) subrouter := router.PathPrefix(path.Join(storageRESTPrefix, endpoint.Path)).Subrouter() diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index de4cf0e12..66b13c670 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -19,13 +19,19 @@ package cmd import ( "context" + "errors" + "fmt" "io" + "math/rand" + "os" + "strconv" "strings" "sync" "sync/atomic" "time" "github.com/minio/madmin-go" + "github.com/minio/minio/internal/logger" ) //go:generate stringer -type=storageMetric -trimprefix=storageMetric $GOFILE @@ -69,10 +75,11 @@ type xlStorageDiskIDCheck struct { // do not re-order them, if you add new fields // please use `fieldalignment ./...` to check // if your changes are not causing any problems. - storage StorageAPI + storage *xlStorage apiLatencies [storageMetricLast]*lockedLastMinuteLatency diskID string apiCalls [storageMetricLast]uint64 + health *diskHealthTracker } func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics { @@ -109,6 +116,7 @@ func (e *lockedLastMinuteLatency) value() uint64 { func newXLStorageDiskIDCheck(storage *xlStorage) *xlStorageDiskIDCheck { xl := xlStorageDiskIDCheck{ storage: storage, + health: newDiskHealthTracker(), } for i := range xl.apiLatencies[:] { xl.apiLatencies[i] = &lockedLastMinuteLatency{} @@ -215,25 +223,31 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context) (info DiskInfo, err return info, errDiskNotFound } } + + if p.health.isFaulty() { + // if disk is already faulty return faulty for 'mc admin info' output and prometheus alerts. + return info, errFaultyDisk + } + return info, nil } func (p *xlStorageDiskIDCheck) MakeVolBulk(ctx context.Context, volumes ...string) (err error) { - defer p.updateStorageMetrics(storageMetricMakeVolBulk, volumes...)() - - if contextCanceled(ctx) { - return ctx.Err() - } - - if err = p.checkDiskStale(); err != nil { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricMakeVolBulk, volumes...) + if err != nil { return err } + defer done(&err) + return p.storage.MakeVolBulk(ctx, volumes...) } func (p *xlStorageDiskIDCheck) MakeVol(ctx context.Context, volume string) (err error) { - defer p.updateStorageMetrics(storageMetricMakeVol, volume)() - + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricMakeVol, volume) + if err != nil { + return err + } + defer done(&err) if contextCanceled(ctx) { return ctx.Err() } @@ -244,167 +258,122 @@ func (p *xlStorageDiskIDCheck) MakeVol(ctx context.Context, volume string) (err return p.storage.MakeVol(ctx, volume) } -func (p *xlStorageDiskIDCheck) ListVols(ctx context.Context) ([]VolInfo, error) { - defer p.updateStorageMetrics(storageMetricListVols, "/")() - - if contextCanceled(ctx) { - return nil, ctx.Err() - } - - if err := p.checkDiskStale(); err != nil { +func (p *xlStorageDiskIDCheck) ListVols(ctx context.Context) (vi []VolInfo, err error) { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricListVols, "/") + if err != nil { return nil, err } + defer done(&err) + return p.storage.ListVols(ctx) } func (p *xlStorageDiskIDCheck) StatVol(ctx context.Context, volume string) (vol VolInfo, err error) { - defer p.updateStorageMetrics(storageMetricStatVol, volume)() - - if contextCanceled(ctx) { - return VolInfo{}, ctx.Err() - } - - if err = p.checkDiskStale(); err != nil { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricStatVol, volume) + if err != nil { return vol, err } + defer done(&err) + return p.storage.StatVol(ctx, volume) } func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, forceDelete bool) (err error) { - defer p.updateStorageMetrics(storageMetricDeleteVol, volume)() - - if contextCanceled(ctx) { - return ctx.Err() - } - - if err = p.checkDiskStale(); err != nil { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricDeleteVol, volume) + if err != nil { return err } + defer done(&err) + return p.storage.DeleteVol(ctx, volume, forceDelete) } -func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, volume, dirPath string, count int) ([]string, error) { - defer p.updateStorageMetrics(storageMetricListDir, volume, dirPath)() - - if contextCanceled(ctx) { - return nil, ctx.Err() - } - - if err := p.checkDiskStale(); err != nil { +func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, volume, dirPath string, count int) (s []string, err error) { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricListDir, volume, dirPath) + if err != nil { return nil, err } + defer done(&err) return p.storage.ListDir(ctx, volume, dirPath, count) } func (p *xlStorageDiskIDCheck) ReadFile(ctx context.Context, volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) { - defer p.updateStorageMetrics(storageMetricReadFile, volume, path)() - - if contextCanceled(ctx) { - return 0, ctx.Err() - } - - if err := p.checkDiskStale(); err != nil { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricReadFile, volume, path) + if err != nil { return 0, err } + defer done(&err) return p.storage.ReadFile(ctx, volume, path, offset, buf, verifier) } func (p *xlStorageDiskIDCheck) AppendFile(ctx context.Context, volume string, path string, buf []byte) (err error) { - defer p.updateStorageMetrics(storageMetricAppendFile, volume, path)() - - if contextCanceled(ctx) { - return ctx.Err() - } - - if err = p.checkDiskStale(); err != nil { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricAppendFile, volume, path) + if err != nil { return err } + defer done(&err) return p.storage.AppendFile(ctx, volume, path, buf) } -func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) error { - defer p.updateStorageMetrics(storageMetricCreateFile, volume, path)() - - if contextCanceled(ctx) { - return ctx.Err() - } - - if err := p.checkDiskStale(); err != nil { +func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) (err error) { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricCreateFile, volume, path) + if err != nil { return err } + defer done(&err) return p.storage.CreateFile(ctx, volume, path, size, reader) } func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) { - defer p.updateStorageMetrics(storageMetricReadFileStream, volume, path)() - - if contextCanceled(ctx) { - return nil, ctx.Err() - } - - if err := p.checkDiskStale(); err != nil { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricReadFileStream, volume, path) + if err != nil { return nil, err } + defer done(&err) return p.storage.ReadFileStream(ctx, volume, path, offset, length) } -func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) error { - defer p.updateStorageMetrics(storageMetricRenameFile, srcVolume, srcPath, dstVolume, dstPath)() - - if contextCanceled(ctx) { - return ctx.Err() - } - - if err := p.checkDiskStale(); err != nil { +func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) (err error) { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricRenameFile, srcVolume, srcPath, dstVolume, dstPath) + if err != nil { return err } + defer done(&err) return p.storage.RenameFile(ctx, srcVolume, srcPath, dstVolume, dstPath) } -func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) error { - defer p.updateStorageMetrics(storageMetricRenameData, srcPath, fi.DataDir, dstVolume, dstPath)() - - if contextCanceled(ctx) { - return ctx.Err() - } - - if err := p.checkDiskStale(); err != nil { +func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (err error) { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricRenameData, srcPath, fi.DataDir, dstVolume, dstPath) + if err != nil { return err } + defer done(&err) return p.storage.RenameData(ctx, srcVolume, srcPath, fi, dstVolume, dstPath) } func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, path string, fi FileInfo) (err error) { - defer p.updateStorageMetrics(storageMetricCheckParts, volume, path)() - - if contextCanceled(ctx) { - return ctx.Err() - } - - if err = p.checkDiskStale(); err != nil { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricCheckParts, volume, path) + if err != nil { return err } + defer done(&err) return p.storage.CheckParts(ctx, volume, path, fi) } func (p *xlStorageDiskIDCheck) Delete(ctx context.Context, volume string, path string, recursive bool) (err error) { - defer p.updateStorageMetrics(storageMetricDelete, volume, path)() - - if contextCanceled(ctx) { - return ctx.Err() - } - - if err = p.checkDiskStale(); err != nil { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricDelete, volume, path) + if err != nil { return err } + defer done(&err) return p.storage.Delete(ctx, volume, path, recursive) } @@ -417,136 +386,102 @@ func (p *xlStorageDiskIDCheck) DeleteVersions(ctx context.Context, volume string if len(versions) > 0 { path = versions[0].Name } - - defer p.updateStorageMetrics(storageMetricDeleteVersions, volume, path)() - errs = make([]error, len(versions)) - - if contextCanceled(ctx) { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricDeleteVersions, volume, path) + if err != nil { for i := range errs { errs[i] = ctx.Err() } return errs } - - if err := p.checkDiskStale(); err != nil { - for i := range errs { - errs[i] = err + defer done(&err) + errs = p.storage.DeleteVersions(ctx, volume, versions) + for i := range errs { + if errs[i] != nil { + err = errs[i] + break } - return errs } - return p.storage.DeleteVersions(ctx, volume, versions) + return errs } -func (p *xlStorageDiskIDCheck) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error { - defer p.updateStorageMetrics(storageMetricVerifyFile, volume, path)() - - if contextCanceled(ctx) { - return ctx.Err() - } - - if err := p.checkDiskStale(); err != nil { +func (p *xlStorageDiskIDCheck) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) (err error) { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricVerifyFile, volume, path) + if err != nil { return err } + defer done(&err) return p.storage.VerifyFile(ctx, volume, path, fi) } func (p *xlStorageDiskIDCheck) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) { - defer p.updateStorageMetrics(storageMetricWriteAll, volume, path)() - - if contextCanceled(ctx) { - return ctx.Err() - } - - if err = p.checkDiskStale(); err != nil { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricWriteAll, volume, path) + if err != nil { return err } + defer done(&err) return p.storage.WriteAll(ctx, volume, path, b) } func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) { - defer p.updateStorageMetrics(storageMetricDeleteVersion, volume, path)() - - if contextCanceled(ctx) { - return ctx.Err() - } - - if err = p.checkDiskStale(); err != nil { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricDeleteVersion, volume, path) + if err != nil { return err } + defer done(&err) return p.storage.DeleteVersion(ctx, volume, path, fi, forceDelMarker) } func (p *xlStorageDiskIDCheck) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { - defer p.updateStorageMetrics(storageMetricUpdateMetadata, volume, path)() - - if contextCanceled(ctx) { - return ctx.Err() - } - - if err = p.checkDiskStale(); err != nil { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricUpdateMetadata, volume, path) + if err != nil { return err } + defer done(&err) return p.storage.UpdateMetadata(ctx, volume, path, fi) } func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { - defer p.updateStorageMetrics(storageMetricWriteMetadata, volume, path)() - - if contextCanceled(ctx) { - return ctx.Err() - } - - if err = p.checkDiskStale(); err != nil { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricWriteMetadata, volume, path) + if err != nil { return err } + defer done(&err) return p.storage.WriteMetadata(ctx, volume, path, fi) } func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { - defer p.updateStorageMetrics(storageMetricReadVersion, volume, path)() - - if contextCanceled(ctx) { - return fi, ctx.Err() - } - - if err = p.checkDiskStale(); err != nil { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricReadVersion, volume, path) + if err != nil { return fi, err } + defer done(&err) return p.storage.ReadVersion(ctx, volume, path, versionID, readData) } func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) { - defer p.updateStorageMetrics(storageMetricReadAll, volume, path)() - - if contextCanceled(ctx) { - return nil, ctx.Err() - } - - if err = p.checkDiskStale(); err != nil { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricReadAll, volume, path) + if err != nil { return nil, err } + defer done(&err) return p.storage.ReadAll(ctx, volume, path) } func (p *xlStorageDiskIDCheck) StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) { - defer p.updateStorageMetrics(storageMetricStatInfoFile, volume, path)() - - if contextCanceled(ctx) { - return nil, ctx.Err() - } - - if err = p.checkDiskStale(); err != nil { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricStatInfoFile, volume, path) + if err != nil { return nil, err } + defer done(&err) return p.storage.StatInfoFile(ctx, volume, path, glob) } @@ -565,10 +500,10 @@ func storageTrace(s storageMetric, startTime time.Time, duration time.Duration, } // Update storage metrics -func (p *xlStorageDiskIDCheck) updateStorageMetrics(s storageMetric, paths ...string) func() { +func (p *xlStorageDiskIDCheck) updateStorageMetrics(s storageMetric, paths ...string) func(err *error) { startTime := time.Now() trace := globalTrace.NumSubscribers() > 0 - return func() { + return func(err *error) { duration := time.Since(startTime) atomic.AddUint64(&p.apiCalls[s], 1) @@ -579,3 +514,317 @@ func (p *xlStorageDiskIDCheck) updateStorageMetrics(s storageMetric, paths ...st } } } + +const ( + diskHealthOK = iota + diskHealthFaulty +) + +// diskMaxConcurrent is the maximum number of running concurrent operations +// for local and (incoming) remote disk ops respectively. +var diskMaxConcurrent = 50 + +func init() { + if s, ok := os.LookupEnv("_MINIO_DISK_MAX_CONCURRENT"); ok && s != "" { + var err error + diskMaxConcurrent, err = strconv.Atoi(s) + if err != nil { + logger.Fatal(err, "invalid _MINIO_DISK_MAX_CONCURRENT value") + } + } +} + +type diskHealthTracker struct { + // atomic time of last success + lastSuccess int64 + + // atomic time of last time a token was grabbed. + lastStarted int64 + + // Atomic status of disk. + status int32 + + // Atomic number of requests blocking for a token. + blocked int32 + + // Concurrency tokens. + tokens chan struct{} +} + +// newDiskHealthTracker creates a new disk health tracker. +func newDiskHealthTracker() *diskHealthTracker { + d := diskHealthTracker{ + lastSuccess: time.Now().UnixNano(), + lastStarted: time.Now().UnixNano(), + status: diskHealthOK, + tokens: make(chan struct{}, diskMaxConcurrent), + } + for i := 0; i < diskMaxConcurrent; i++ { + d.tokens <- struct{}{} + } + return &d +} + +// logSuccess will update the last successful operation time. +func (d *diskHealthTracker) logSuccess() { + atomic.StoreInt64(&d.lastSuccess, time.Now().UnixNano()) +} + +func (d *diskHealthTracker) isFaulty() bool { + return atomic.LoadInt32(&d.status) == diskHealthFaulty +} + +type ( + healthDiskCtxKey struct{} + healthDiskCtxValue struct { + lastSuccess *int64 + } +) + +// logSuccess will update the last successful operation time. +func (h *healthDiskCtxValue) logSuccess() { + atomic.StoreInt64(h.lastSuccess, time.Now().UnixNano()) +} + +// noopDoneFunc is a no-op done func. +// Can be reused. +var noopDoneFunc = func(_ *error) {} + +// TrackDiskHealth for this request. +// When a non-nil error is returned 'done' MUST be called +// with the status of the response, if it corresponds to disk health. +// If the pointer sent to done is non-nil AND the error +// is either nil or io.EOF the disk is considered good. +// So if unsure if the disk status is ok, return nil as a parameter to done. +// Shadowing will work as long as return error is named: https://go.dev/play/p/sauq86SsTN2 +func (p *xlStorageDiskIDCheck) TrackDiskHealth(ctx context.Context, s storageMetric, paths ...string) (c context.Context, done func(*error), err error) { + done = noopDoneFunc + if contextCanceled(ctx) { + return ctx, done, ctx.Err() + } + + // Return early if disk is faulty already. + if atomic.LoadInt32(&p.health.status) == diskHealthFaulty { + return ctx, done, errFaultyDisk + } + + // Verify if the disk is not stale + // - missing format.json (unformatted drive) + // - format.json is valid but invalid 'uuid' + if err = p.checkDiskStale(); err != nil { + return ctx, done, err + } + + // Disallow recursive tracking to avoid deadlocks. + if ctx.Value(healthDiskCtxKey{}) != nil { + done = p.updateStorageMetrics(s, paths...) + return ctx, done, nil + } + + select { + case <-ctx.Done(): + return ctx, done, ctx.Err() + case <-p.health.tokens: + // Fast path, got token. + default: + // We ran out of tokens, check health before blocking. + err = p.waitForToken(ctx) + if err != nil { + return ctx, done, err + } + } + // We only progress here if we got a token. + + atomic.StoreInt64(&p.health.lastStarted, time.Now().UnixNano()) + ctx = context.WithValue(ctx, healthDiskCtxKey{}, &healthDiskCtxValue{lastSuccess: &p.health.lastSuccess}) + si := p.updateStorageMetrics(s, paths...) + t := time.Now() + var once sync.Once + return ctx, func(errp *error) { + once.Do(func() { + if false { + var ers string + if errp != nil { + err := *errp + ers = fmt.Sprint(err) + } + fmt.Println(time.Now().Format(time.RFC3339), "op", s, "took", time.Since(t), "result:", ers, "disk:", p.storage.String(), "path:", strings.Join(paths, "/")) + } + p.health.tokens <- struct{}{} + if errp != nil { + err := *errp + if err != nil && !errors.Is(err, io.EOF) { + return + } + p.health.logSuccess() + } + si(errp) + }) + }, nil +} + +// waitForToken will wait for a token, while periodically +// checking the disk status. +// If nil is returned a token was picked up. +func (p *xlStorageDiskIDCheck) waitForToken(ctx context.Context) (err error) { + atomic.AddInt32(&p.health.blocked, 1) + defer func() { + atomic.AddInt32(&p.health.blocked, -1) + }() + // Avoid stampeding herd... + ticker := time.NewTicker(5*time.Second + time.Duration(rand.Int63n(int64(5*time.Second)))) + defer ticker.Stop() + for { + err = p.checkHealth(ctx) + if err != nil { + return err + } + select { + case <-ticker.C: + // Ticker expired, check health again. + case <-ctx.Done(): + return ctx.Err() + case <-p.health.tokens: + return nil + } + } +} + +// checkHealth should only be called when tokens have run out. +// This will check if disk should be taken offline. +func (p *xlStorageDiskIDCheck) checkHealth(ctx context.Context) (err error) { + if atomic.LoadInt32(&p.health.status) == diskHealthFaulty { + return errFaultyDisk + } + // Check if there are tokens. + if len(p.health.tokens) > 0 { + return nil + } + + const maxTimeSinceLastSuccess = 30 * time.Second + const minTimeSinceLastOpStarted = 15 * time.Second + + // To avoid stampeding herd (100s of simultaneous starting requests) + // there must be a delay between the last started request and now + // for the last lastSuccess to be useful. + t := time.Since(time.Unix(0, atomic.LoadInt64(&p.health.lastStarted))) + if t < minTimeSinceLastOpStarted { + return nil + } + + // If also more than 15 seconds since last success, take disk offline. + t = time.Since(time.Unix(0, atomic.LoadInt64(&p.health.lastSuccess))) + if t > maxTimeSinceLastSuccess { + if atomic.CompareAndSwapInt32(&p.health.status, diskHealthOK, diskHealthFaulty) { + logger.LogAlwaysIf(ctx, fmt.Errorf("taking disk %s offline, time since last response %v", p.storage.String(), t.Round(time.Millisecond))) + go p.monitorDiskStatus() + } + return errFaultyDisk + } + return nil +} + +// monitorDiskStatus should be called once when a drive has been marked offline. +// Once the disk has been deemed ok, it will return to online status. +func (p *xlStorageDiskIDCheck) monitorDiskStatus() { + t := time.NewTicker(5 * time.Second) + defer t.Stop() + fn := mustGetUUID() + for range t.C { + if len(p.health.tokens) == 0 { + // Queue is still full, no need to check. + continue + } + err := p.storage.WriteAll(context.Background(), minioMetaTmpBucket, fn, []byte{10000: 42}) + if err != nil { + continue + } + b, err := p.storage.ReadAll(context.Background(), minioMetaTmpBucket, fn) + if err != nil || len(b) != 10001 { + continue + } + err = p.storage.Delete(context.Background(), minioMetaTmpBucket, fn, false) + if err == nil { + logger.Info("Able to read+write, bringing disk %s online.", p.storage.String()) + atomic.StoreInt32(&p.health.status, diskHealthOK) + return + } + } +} + +// diskHealthCheckOK will check if the provided error is nil +// and update disk status if good. +// For convenience a bool is returned to indicate any error state +// that is not io.EOF. +func diskHealthCheckOK(ctx context.Context, err error) bool { + // Check if context has a disk health check. + tracker, ok := ctx.Value(healthDiskCtxKey{}).(*healthDiskCtxValue) + if !ok { + // No tracker, return + return err == nil || errors.Is(err, io.EOF) + } + if err == nil || errors.Is(err, io.EOF) { + tracker.logSuccess() + return true + } + return false +} + +// diskHealthWrapper provides either a io.Reader or io.Writer +// that updates status of the provided tracker. +// Use through diskHealthReader or diskHealthWriter. +type diskHealthWrapper struct { + tracker *healthDiskCtxValue + r io.Reader + w io.Writer +} + +func (d *diskHealthWrapper) Read(p []byte) (int, error) { + if d.r == nil { + return 0, fmt.Errorf("diskHealthWrapper: Read with no reader") + } + n, err := d.r.Read(p) + if err == nil || err == io.EOF && n > 0 { + d.tracker.logSuccess() + } + return n, err +} + +func (d *diskHealthWrapper) Write(p []byte) (int, error) { + if d.w == nil { + return 0, fmt.Errorf("diskHealthWrapper: Write with no writer") + } + n, err := d.w.Write(p) + if err == nil && n == len(p) { + d.tracker.logSuccess() + } + return n, err +} + +// diskHealthReader provides a wrapper that will update disk health on +// ctx, on every successful read. +// This should only be used directly at the os/syscall level, +// otherwise buffered operations may return false health checks. +func diskHealthReader(ctx context.Context, r io.Reader) io.Reader { + // Check if context has a disk health check. + tracker, ok := ctx.Value(healthDiskCtxKey{}).(*healthDiskCtxValue) + if !ok { + // No need to wrap + return r + } + return &diskHealthWrapper{r: r, tracker: tracker} +} + +// diskHealthWriter provides a wrapper that will update disk health on +// ctx, on every successful write. +// This should only be used directly at the os/syscall level, +// otherwise buffered operations may return false health checks. +func diskHealthWriter(ctx context.Context, w io.Writer) io.Writer { + // Check if context has a disk health check. + tracker, ok := ctx.Value(healthDiskCtxKey{}).(*healthDiskCtxValue) + if !ok { + // No need to wrap + return w + } + return &diskHealthWrapper{w: w, tracker: tracker} +} diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 0b3b8923b..800d4bbc0 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -683,9 +683,11 @@ func (s *xlStorage) SetDiskID(id string) { func (s *xlStorage) MakeVolBulk(ctx context.Context, volumes ...string) error { for _, volume := range volumes { - if err := s.MakeVol(ctx, volume); err != nil && !errors.Is(err, errVolumeExists) { + err := s.MakeVol(ctx, volume) + if err != nil && !errors.Is(err, errVolumeExists) { return err } + diskHealthCheckOK(ctx, err) } return nil } @@ -943,6 +945,7 @@ func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions if err := s.deleteVersions(ctx, volume, fiv.Name, fiv.Versions...); err != nil { errs[i] = err } + diskHealthCheckOK(ctx, errs[i]) } return errs @@ -1382,7 +1385,7 @@ func (s *xlStorage) readAllData(ctx context.Context, volumeDir string, filePath buf = make([]byte, sz) } // Read file... - _, err = io.ReadFull(r, buf) + _, err = io.ReadFull(diskHealthReader(ctx, r), buf) return buf, stat.ModTime().UTC(), osErrToFileErr(err) } @@ -1693,7 +1696,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off r := struct { io.Reader io.Closer - }{Reader: io.LimitReader(or, length), Closer: closeWrapper(func() error { + }{Reader: io.LimitReader(diskHealthReader(ctx, or), length), Closer: closeWrapper(func() error { if !alignment || offset+length%xioutil.DirectioAlignSize != 0 { // invalidate page-cache for unaligned reads. if !globalAPIConfig.isDisableODirect() { @@ -1792,7 +1795,7 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz defer xioutil.ODirectPoolLarge.Put(bufp) } - written, err := xioutil.CopyAligned(w, r, *bufp, fileSize) + written, err := xioutil.CopyAligned(diskHealthWriter(ctx, w), r, *bufp, fileSize, w) if err != nil { return err } @@ -2269,6 +2272,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f } return osErrToFileErr(err) } + diskHealthCheckOK(ctx, err) // renameAll only for objects that have xl.meta not saved inline. if len(fi.Data) == 0 && fi.Size > 0 { @@ -2406,7 +2410,7 @@ func (s *xlStorage) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolum return nil } -func (s *xlStorage) bitrotVerify(partPath string, partSize int64, algo BitrotAlgorithm, sum []byte, shardSize int64) error { +func (s *xlStorage) bitrotVerify(ctx context.Context, partPath string, partSize int64, algo BitrotAlgorithm, sum []byte, shardSize int64) error { // Open the file for reading. file, err := Open(partPath) if err != nil { @@ -2421,7 +2425,7 @@ func (s *xlStorage) bitrotVerify(partPath string, partSize int64, algo BitrotAlg // for healing code to fix this file. return err } - return bitrotVerify(file, fi.Size(), partSize, algo, sum, shardSize) + return bitrotVerify(diskHealthReader(ctx, file), fi.Size(), partSize, algo, sum, shardSize) } func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) (err error) { @@ -2446,7 +2450,7 @@ func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi File for _, part := range fi.Parts { checksumInfo := erasure.GetChecksumInfo(part.Number) partPath := pathJoin(volumeDir, path, fi.DataDir, fmt.Sprintf("part.%d", part.Number)) - if err := s.bitrotVerify(partPath, + if err := s.bitrotVerify(ctx, partPath, erasure.ShardFileSize(part.Size), checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize()); err != nil { diff --git a/cmd/xl-storage_test.go b/cmd/xl-storage_test.go index b5ebd9972..ca42a2491 100644 --- a/cmd/xl-storage_test.go +++ b/cmd/xl-storage_test.go @@ -1884,7 +1884,7 @@ func TestXLStorageVerifyFile(t *testing.T) { if err := storage.WriteAll(context.Background(), volName, fileName, data); err != nil { t.Fatal(err) } - if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size, algo, hashBytes, 0); err != nil { + if err := storage.storage.bitrotVerify(context.Background(), pathJoin(path, volName, fileName), size, algo, hashBytes, 0); err != nil { t.Fatal(err) } @@ -1894,12 +1894,12 @@ func TestXLStorageVerifyFile(t *testing.T) { } // Check if VerifyFile reports the incorrect file length (the correct length is `size+1`) - if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size, algo, hashBytes, 0); err == nil { + if err := storage.storage.bitrotVerify(context.Background(), pathJoin(path, volName, fileName), size, algo, hashBytes, 0); err == nil { t.Fatal("expected to fail bitrot check") } // Check if bitrot fails - if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size+1, algo, hashBytes, 0); err == nil { + if err := storage.storage.bitrotVerify(context.Background(), pathJoin(path, volName, fileName), size+1, algo, hashBytes, 0); err == nil { t.Fatal("expected to fail bitrot check") } @@ -1928,7 +1928,7 @@ func TestXLStorageVerifyFile(t *testing.T) { t.Fatal(err) } w.(io.Closer).Close() - if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size, algo, nil, shardSize); err != nil { + if err := storage.storage.bitrotVerify(context.Background(), pathJoin(path, volName, fileName), size, algo, nil, shardSize); err != nil { t.Fatal(err) } @@ -1943,10 +1943,10 @@ func TestXLStorageVerifyFile(t *testing.T) { t.Fatal(err) } f.Close() - if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size, algo, nil, shardSize); err == nil { + if err := storage.storage.bitrotVerify(context.Background(), pathJoin(path, volName, fileName), size, algo, nil, shardSize); err == nil { t.Fatal("expected to fail bitrot check") } - if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size+1, algo, nil, shardSize); err == nil { + if err := storage.storage.bitrotVerify(context.Background(), pathJoin(path, volName, fileName), size+1, algo, nil, shardSize); err == nil { t.Fatal("expected to fail bitrot check") } } diff --git a/internal/ioutil/ioutil.go b/internal/ioutil/ioutil.go index 59ec15116..18381fb2d 100644 --- a/internal/ioutil/ioutil.go +++ b/internal/ioutil/ioutil.go @@ -254,14 +254,14 @@ const DirectioAlignSize = 4096 // used with DIRECT I/O based file descriptor and it is expected that // input writer *os.File not a generic io.Writer. Make sure to have // the file opened for writes with syscall.O_DIRECT flag. -func CopyAligned(w *os.File, r io.Reader, alignedBuf []byte, totalSize int64) (int64, error) { +func CopyAligned(w io.Writer, r io.Reader, alignedBuf []byte, totalSize int64, file *os.File) (int64, error) { // Writes remaining bytes in the buffer. - writeUnaligned := func(w *os.File, buf []byte) (remainingWritten int64, err error) { + writeUnaligned := func(w io.Writer, buf []byte) (remainingWritten int64, err error) { // Disable O_DIRECT on fd's on unaligned buffer // perform an amortized Fdatasync(fd) on the fd at // the end, this is performed by the caller before // closing 'w'. - if err = disk.DisableDirectIO(w); err != nil { + if err = disk.DisableDirectIO(file); err != nil { return remainingWritten, err } // Since w is *os.File io.Copy shall use ReadFrom() call.