allow decommission to continue when healing (#15312)

Bonus:

- heal buckets in-case during startup the new
  pools have bucket missing.
This commit is contained in:
Harshavardhana 2022-07-15 21:03:23 -07:00 committed by GitHub
parent 5ac6d91525
commit e7ac1ea54c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 16 deletions

View file

@ -94,7 +94,7 @@ func loadHealingTracker(ctx context.Context, disk StorageAPI) (*healingTracker,
return nil, err
}
b, err := disk.ReadAll(ctx, minioMetaBucket,
pathJoin(bucketMetaPrefix, slashSeparator, healingTrackerFilename))
pathJoin(bucketMetaPrefix, healingTrackerFilename))
if err != nil {
return nil, err
}
@ -155,14 +155,14 @@ func (h *healingTracker) save(ctx context.Context) error {
}
globalBackgroundHealState.updateHealStatus(h)
return h.disk.WriteAll(ctx, minioMetaBucket,
pathJoin(bucketMetaPrefix, slashSeparator, healingTrackerFilename),
pathJoin(bucketMetaPrefix, healingTrackerFilename),
htrackerBytes)
}
// delete the tracker on disk.
func (h *healingTracker) delete(ctx context.Context) error {
return h.disk.Delete(ctx, minioMetaBucket,
pathJoin(bucketMetaPrefix, slashSeparator, healingTrackerFilename),
pathJoin(bucketMetaPrefix, healingTrackerFilename),
DeleteOptions{
Recursive: false,
Force: false,
@ -365,12 +365,15 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
}
// Start or resume healing of this erasure set
err = z.serverPools[poolIdx].sets[setIdx].healErasureSet(ctx, tracker.QueuedBuckets, tracker)
if err != nil {
if err = z.serverPools[poolIdx].sets[setIdx].healErasureSet(ctx, tracker.QueuedBuckets, tracker); err != nil {
return err
}
logger.Info("Healing disk '%s' is complete (healed: %d, failed: %d).", disk, tracker.ItemsHealed, tracker.ItemsFailed)
if tracker.ItemsFailed > 0 {
logger.Info("Healing disk '%s' failed (healed: %d, failed: %d).", disk, tracker.ItemsHealed, tracker.ItemsFailed)
} else {
logger.Info("Healing disk '%s' complete (healed: %d, failed: %d).", disk, tracker.ItemsHealed, tracker.ItemsFailed)
}
if serverDebugLog {
tracker.printTo(os.Stdout)
@ -378,6 +381,7 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
}
logger.LogIf(ctx, tracker.delete(ctx))
return nil
}
@ -394,12 +398,11 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools) {
case <-ctx.Done():
return
case <-diskCheckTimer.C:
healDisks := globalBackgroundHealState.getHealLocalDiskEndpoints()
if len(healDisks) == 0 {
// Reset for next interval.
diskCheckTimer.Reset(defaultMonitorNewDiskInterval)
break
continue
}
// Reformat disks immediately
@ -408,7 +411,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools) {
logger.LogIf(ctx, err)
// Reset for next interval.
diskCheckTimer.Reset(defaultMonitorNewDiskInterval)
break
continue
}
for _, disk := range healDisks {

View file

@ -31,6 +31,7 @@ import (
"time"
"github.com/dustin/go-humanize"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger"
@ -967,15 +968,9 @@ func (z *erasureServerPools) getDecommissionPoolSpaceInfo(idx int) (pi poolSpace
if idx+1 > len(z.serverPools) {
return pi, errInvalidArgument
}
info, _ := z.serverPools[idx].StorageInfo(context.Background())
info.Backend = z.BackendInfo()
for _, disk := range info.Disks {
if disk.Healing {
return pi, decomError{
Err: fmt.Sprintf("%s drive is healing, decommission will not be started", disk.Endpoint),
}
}
}
usableTotal := int64(GetTotalUsableCapacity(info.Disks, info))
usableFree := int64(GetTotalUsableCapacityFree(info.Disks, info))
@ -1112,6 +1107,13 @@ func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (er
return err
}
// Make sure to heal the buckets to ensure the new
// pool has the new buckets, this is to avoid
// failures later.
for _, bucket := range buckets {
z.HealBucket(ctx, bucket.Name, madmin.HealOpts{})
}
decomBuckets := make([]decomBucketInfo, len(buckets))
for i := range buckets {
decomBuckets[i] = decomBucketInfo{