fix: CrawlAndGetDataUsage close pipe() before using a new one (#11600)

also additionally make sure errors during deserializer closes
the reader with right error type such that Write() end
actually see the final error, this avoids a waitGroup usage
and waiting.
This commit is contained in:
Harshavardhana 2021-02-22 10:04:32 -08:00 committed by GitHub
parent 8778828a03
commit c31d2c3fdc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 8 additions and 12 deletions

View file

@ -485,7 +485,7 @@ type objectIO interface {
// Only backend errors are returned as errors.
// If the object is not found or unable to deserialize d is cleared and nil error is returned.
func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string) error {
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, readLock, ObjectOptions{})
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, noLock, ObjectOptions{})
if err != nil {
switch err.(type) {
case ObjectNotFound:

View file

@ -420,9 +420,10 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
cache, err = disk.CrawlAndGetDataUsage(ctx, cache)
cache.Info.BloomFilter = nil
if err != nil {
logger.LogIf(ctx, err)
if cache.Info.LastUpdate.After(before) {
if !cache.Info.LastUpdate.IsZero() && cache.Info.LastUpdate.After(before) {
logger.LogIf(ctx, cache.save(ctx, er, cacheName))
} else {
logger.LogIf(ctx, err)
}
continue
}

View file

@ -175,30 +175,25 @@ func (client *storageRESTClient) CrawlAndGetDataUsage(ctx context.Context, cache
go func() {
pw.CloseWithError(cache.serializeTo(pw))
}()
defer pr.Close()
respBody, err := client.call(ctx, storageRESTMethodCrawlAndGetDataUsage, url.Values{}, pr, -1)
defer http.DrainBody(respBody)
if err != nil {
pr.Close()
return cache, err
}
pr.Close()
var wg sync.WaitGroup
var newCache dataUsageCache
var decErr error
pr, pw = io.Pipe()
wg.Add(1)
go func() {
defer wg.Done()
decErr = newCache.deserialize(pr)
pr.CloseWithError(err)
pr.CloseWithError(newCache.deserialize(pr))
}()
err = waitForHTTPStream(respBody, pw)
pw.CloseWithError(err)
if err != nil {
return cache, err
}
wg.Wait()
return newCache, decErr
return newCache, nil
}
func (client *storageRESTClient) GetDiskID() (string, error) {