fix: do not listAndHeal() inline with PutObject() (#17499)

there is a possibility that slow drives can actually add latency
to the overall call, leading to a large spike in latency.

this can happen if there are other parallel listObjects()
calls to the same drive, in-turn causing each other to sort
of serialize.

this potentially improves performance and makes PutObject()
also non-blocking.
This commit is contained in:
Harshavardhana 2023-06-24 19:31:04 -07:00 committed by GitHub
parent fcbed41cc3
commit 1f8b9b4bd5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 98 additions and 74 deletions

View file

@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"io"
"strings"
"sync"
"time"
@ -66,6 +67,68 @@ func (fi FileInfo) DataShardFixed() bool {
return fi.Metadata[reservedMetadataPrefixLowerDataShardFix] == "true"
}
func (er erasureObjects) listAndHeal(bucket, prefix string, healEntry func(string, metaCacheEntry) error) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
disks, _ := er.getOnlineDisksWithHealing()
if len(disks) == 0 {
return errors.New("listAndHeal: No non-healing drives found")
}
// How to resolve partial results.
resolver := metadataResolutionParams{
dirQuorum: 1,
objQuorum: 1,
bucket: bucket,
strict: false, // Allow less strict matching.
}
path := baseDirFromPrefix(prefix)
filterPrefix := strings.Trim(strings.TrimPrefix(prefix, path), slashSeparator)
if path == prefix {
filterPrefix = ""
}
lopts := listPathRawOptions{
disks: disks,
bucket: bucket,
path: path,
filterPrefix: filterPrefix,
recursive: true,
forwardTo: "",
minDisks: 1,
reportNotFound: false,
agreed: func(entry metaCacheEntry) {
if err := healEntry(bucket, entry); err != nil {
logger.LogIf(ctx, err)
cancel()
}
},
partial: func(entries metaCacheEntries, _ []error) {
entry, ok := entries.resolve(&resolver)
if !ok {
// check if we can get one entry atleast
// proceed to heal nonetheless.
entry, _ = entries.firstFound()
}
if err := healEntry(bucket, *entry); err != nil {
logger.LogIf(ctx, err)
cancel()
return
}
},
finished: nil,
}
if err := listPathRaw(ctx, lopts); err != nil {
return fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts)
}
return nil
}
// HealBucket heals a bucket if it doesn't exist on one of the disks, additionally
// also heals the missing entries for bucket metadata files
// `policy.json, notification.xml, listeners.json`.

View file

@ -1249,7 +1249,14 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
}
if !opts.Speedtest && versionsDisparity {
listAndHeal(ctx, bucket, object, &er, healObjectVersionsDisparity)
globalMRFState.addPartialOp(partialOperation{
bucket: bucket,
object: object,
queued: time.Now(),
allVersions: true,
setIndex: er.setIndex,
poolIndex: er.poolIndex,
})
}
// Check if there is any offline disk and add it to the MRF list

View file

@ -1349,7 +1349,14 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
}
if versionsDisparity {
listAndHeal(ctx, bucket, object, &er, healObjectVersionsDisparity)
globalMRFState.addPartialOp(partialOperation{
bucket: bucket,
object: object,
queued: time.Now(),
allVersions: true,
setIndex: er.setIndex,
poolIndex: er.poolIndex,
})
}
}

View file

@ -1905,68 +1905,6 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
// HealObjectFn closure function heals the object.
type HealObjectFn func(bucket, object, versionID string) error
func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects, healEntry func(string, metaCacheEntry) error) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
disks, _ := set.getOnlineDisksWithHealing()
if len(disks) == 0 {
return errors.New("listAndHeal: No non-healing drives found")
}
// How to resolve partial results.
resolver := metadataResolutionParams{
dirQuorum: 1,
objQuorum: 1,
bucket: bucket,
strict: false, // Allow less strict matching.
}
path := baseDirFromPrefix(prefix)
filterPrefix := strings.Trim(strings.TrimPrefix(prefix, path), slashSeparator)
if path == prefix {
filterPrefix = ""
}
lopts := listPathRawOptions{
disks: disks,
bucket: bucket,
path: path,
filterPrefix: filterPrefix,
recursive: true,
forwardTo: "",
minDisks: 1,
reportNotFound: false,
agreed: func(entry metaCacheEntry) {
if err := healEntry(bucket, entry); err != nil {
logger.LogIf(ctx, err)
cancel()
}
},
partial: func(entries metaCacheEntries, _ []error) {
entry, ok := entries.resolve(&resolver)
if !ok {
// check if we can get one entry atleast
// proceed to heal nonetheless.
entry, _ = entries.firstFound()
}
if err := healEntry(bucket, *entry); err != nil {
logger.LogIf(ctx, err)
cancel()
return
}
},
finished: nil,
}
if err := listPathRaw(ctx, lopts); err != nil {
return fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts)
}
return nil
}
func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObjectFn HealObjectFn) error {
healEntry := func(bucket string, entry metaCacheEntry) error {
if entry.isDir() {
@ -2024,7 +1962,7 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
go func(idx int, set *erasureObjects) {
defer wg.Done()
errs[idx] = listAndHeal(ctx, bucket, prefix, set, healEntry)
errs[idx] = set.listAndHeal(bucket, prefix, healEntry)
}(idx, set)
}
wg.Wait()

View file

@ -32,17 +32,19 @@ const (
// partialOperation is a successful upload/delete of an object
// but not written in all disks (having quorum)
type partialOperation struct {
bucket string
object string
versionID string
queued time.Time
bucket string
object string
versionID string
allVersions bool
setIndex, poolIndex int
queued time.Time
}
// mrfState sncapsulates all the information
// related to the global background MRF.
type mrfState struct {
ctx context.Context
objectAPI ObjectLayer
ctx context.Context
pools *erasureServerPools
mu sync.Mutex
opCh chan partialOperation
@ -55,9 +57,12 @@ func (m *mrfState) init(ctx context.Context, objAPI ObjectLayer) {
m.ctx = ctx
m.opCh = make(chan partialOperation, mrfOpsQueueSize)
m.objectAPI = objAPI
go globalMRFState.healRoutine()
var ok bool
m.pools, ok = objAPI.(*erasureServerPools)
if ok {
go m.healRoutine()
}
}
// Add a partial S3 operation (put/delete) when one or more disks are offline.
@ -101,7 +106,11 @@ func (m *mrfState) healRoutine() {
if u.object == "" {
healBucket(u.bucket, madmin.HealNormalScan)
} else {
healObject(u.bucket, u.object, u.versionID, madmin.HealNormalScan)
if u.allVersions {
m.pools.serverPools[u.poolIndex].sets[u.setIndex].listAndHeal(u.bucket, u.object, healObjectVersionsDisparity)
} else {
healObject(u.bucket, u.object, u.versionID, madmin.HealNormalScan)
}
}
wait()