diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index d8b572978..102826dbd 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "io" + "math/rand" "net/http" "path" "path/filepath" @@ -847,37 +848,54 @@ type objectIO interface { // load the cache content with name from minioMetaBackgroundOpsBucket. // Only backend errors are returned as errors. +// The loader is optimistic and has no locking, but tries 5 times before giving up. // If the object is not found or unable to deserialize d is cleared and nil error is returned. func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string) error { // Abandon if more than 5 minutes, so we don't hold up scanner. ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() - r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, readLock, ObjectOptions{}) - if err != nil { - switch err.(type) { - case ObjectNotFound: - case BucketNotFound: - case InsufficientReadQuorum: - case StorageErr: - default: - return toObjectErr(err, dataUsageBucket, name) + // Caches are read+written without locks, + retries := 0 + for retries < 5 { + r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, noLock, ObjectOptions{NoLock: true}) + if err != nil { + switch err.(type) { + case ObjectNotFound, BucketNotFound: + case InsufficientReadQuorum, StorageErr: + retries++ + time.Sleep(time.Duration(rand.Int63n(int64(time.Second)))) + continue + default: + return toObjectErr(err, dataUsageBucket, name) + } + *d = dataUsageCache{} + return nil } - *d = dataUsageCache{} + if err := d.deserialize(r); err != nil { + r.Close() + retries++ + time.Sleep(time.Duration(rand.Int63n(int64(time.Second)))) + continue + } + r.Close() return nil } - defer r.Close() - if err := d.deserialize(r); err != nil { - *d = dataUsageCache{} - logger.LogOnceIf(ctx, err, err.Error()) - } + *d = dataUsageCache{} return nil } +// Maximum running concurrent saves on server. +var maxConcurrentScannerSaves = make(chan struct{}, 4) + // save the content of the cache to minioMetaBackgroundOpsBucket with the provided name. +// Note that no locking is done when saving. func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) error { var r io.Reader - + maxConcurrentScannerSaves <- struct{}{} + defer func() { + <-maxConcurrentScannerSaves + }() // If big, do streaming... size := int64(-1) if len(d.Cache) > 10000 { @@ -909,7 +927,7 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) dataUsageBucket, name, NewPutObjReader(hr), - ObjectOptions{}) + ObjectOptions{NoLock: true}) if isErrBucketNotFound(err) { return nil } diff --git a/cmd/erasure.go b/cmd/erasure.go index bd4926e6a..af7e8d3d0 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -449,6 +449,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa // Start one scanner per disk var wg sync.WaitGroup wg.Add(len(disks)) + for i := range disks { go func(i int) { defer wg.Done() @@ -518,7 +519,6 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa if r := cache.root(); r != nil { root = cache.flatten(*r) } - t := time.Now() select { case <-ctx.Done(): return @@ -528,9 +528,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa Entry: root, }: } - // We want to avoid synchronizing up all writes in case - // the results are piled up. - time.Sleep(time.Duration(float64(time.Since(t)) * rand.Float64())) + // Save cache logger.LogIf(ctx, cache.save(ctx, er, cacheName)) }