diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 8200baea2..fb2d3fbdd 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -1,5 +1,5 @@ /* - * MinIO Cloud Storage, (C) 2018 MinIO, Inc. + * MinIO Cloud Storage, (C) 2018-2019 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ import ( "hash/crc32" "io" "net/http" - "sort" "strings" "sync" "time" @@ -741,78 +740,6 @@ func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucke return destSet.putObject(ctx, destBucket, destObject, srcInfo.PutObjReader, putOpts) } -// Returns function "listDir" of the type listDirFunc. -// disks - used for doing disk.ListDir(). Sets passes set of disks. -func listDirSetsFactory(ctx context.Context, sets ...*xlObjects) ListDirFunc { - listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string) { - var diskEntries = make([][]string, len(disks)) - g := errgroup.WithNErrs(len(disks)) - for index, disk := range disks { - if disk == nil { - continue - } - index := index - g.Go(func() error { - var err error - diskEntries[index], err = disks[index].ListDir(bucket, prefixDir, -1, xlMetaJSONFile) - return err - }, index) - } - - for _, err := range g.Wait() { - if err != nil { - logger.LogIf(ctx, err) - } - } - - // Find elements in entries which are not in mergedEntries - for _, entries := range diskEntries { - var newEntries []string - - for _, entry := range entries { - idx := sort.SearchStrings(mergedEntries, entry) - // if entry is already present in mergedEntries don't add. - if idx < len(mergedEntries) && mergedEntries[idx] == entry { - continue - } - newEntries = append(newEntries, entry) - } - - if len(newEntries) > 0 { - // Merge the entries and sort it. - mergedEntries = append(mergedEntries, newEntries...) - sort.Strings(mergedEntries) - } - } - - return mergedEntries - } - - // listDir - lists all the entries at a given prefix and given entry in the prefix. - listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string) { - for _, set := range sets { - var newEntries []string - // Find elements in entries which are not in mergedEntries - for _, entry := range listDirInternal(bucket, prefixDir, prefixEntry, set.getLoadBalancedDisks()) { - idx := sort.SearchStrings(mergedEntries, entry) - // if entry is already present in mergedEntries don't add. - if idx < len(mergedEntries) && mergedEntries[idx] == entry { - continue - } - newEntries = append(newEntries, entry) - } - - if len(newEntries) > 0 { - // Merge the entries and sort it. - mergedEntries = append(mergedEntries, newEntries...) - sort.Strings(mergedEntries) - } - } - return filterMatchingPrefix(mergedEntries, prefixEntry) - } - return listDir -} - // FileInfoCh - file info channel type FileInfoCh struct { Ch chan FileInfo @@ -1711,23 +1638,37 @@ func (s *xlSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) { // HealObjects - Heal all objects recursively at a specified prefix, any // dangling objects deleted as well automatically. -func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObjectFn func(string, string) error) (err error) { - recursive := true +func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObjectFn func(string, string) error) error { - endWalkCh := make(chan struct{}) - listDir := listDirSetsFactory(ctx, s.sets...) - walkResultCh := startTreeWalk(ctx, bucket, prefix, "", recursive, listDir, endWalkCh) + marker := "" for { - walkResult, ok := <-walkResultCh - if !ok { - break - } - if err := healObjectFn(bucket, walkResult.entry); err != nil { - return toObjectErr(err, bucket, walkResult.entry) - } - if walkResult.end { + if globalHTTPServer != nil { + // Wait at max 10 minute for an inprogress request before proceeding to heal + waitCount := 600 + // Any requests in progress, delay the heal. + for (globalHTTPServer.GetRequestCount() >= int32(globalXLSetCount*globalXLSetDriveCount)) && + waitCount > 0 { + waitCount-- + time.Sleep(1 * time.Second) + } + } + + res, err := s.ListObjectsHeal(ctx, bucket, prefix, marker, "", 1000) + if err != nil { + continue + } + + for _, obj := range res.Objects { + if err = healObjectFn(bucket, obj.Name); err != nil { + return toObjectErr(err, bucket, obj.Name) + } + } + + if !res.IsTruncated { break } + + marker = res.NextMarker } return nil