fix: avoid undoing bucket creation and return the first err instead (#9578)

This commit is contained in:
Anis Elleuch 2020-05-12 23:20:42 +01:00 committed by GitHub
parent 1756b7c6ff
commit c045ae15e7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 5 additions and 112 deletions

View file

@ -507,9 +507,8 @@ func (s *xlSets) Shutdown(ctx context.Context) error {
return nil
}
// MakeBucketLocation - creates a new bucket across all sets simultaneously
// even if one of the sets fail to create buckets, we proceed to undo a
// successful operation.
// MakeBucketLocation - creates a new bucket across all sets simultaneously,
// then return the first encountered error
func (s *xlSets) MakeBucketWithLocation(ctx context.Context, bucket, location string, lockEnabled bool) error {
g := errgroup.WithNErrs(len(s.sets))
@ -522,11 +521,10 @@ func (s *xlSets) MakeBucketWithLocation(ctx context.Context, bucket, location st
}
errs := g.Wait()
// Upon any error we try to undo the make bucket operation if possible
// on all sets where it succeeded.
// Return the first encountered error
for _, err := range errs {
if err != nil {
undoMakeBucketSets(bucket, s.sets, errs)
return err
}
}
@ -535,25 +533,6 @@ func (s *xlSets) MakeBucketWithLocation(ctx context.Context, bucket, location st
return nil
}
// This function is used to undo a successful MakeBucket operation.
func undoMakeBucketSets(bucket string, sets []*xlObjects, errs []error) {
g := errgroup.WithNErrs(len(sets))
// Undo previous make bucket entry on all underlying sets.
for index := range sets {
index := index
g.Go(func() error {
if errs[index] == nil {
return sets[index].DeleteBucket(GlobalContext, bucket, false)
}
return nil
}, index)
}
// Wait for all delete bucket to finish.
g.Wait()
}
// hashes the key returning an integer based on the input algorithm.
// This function currently supports
// - CRCMOD

View file

@ -69,10 +69,6 @@ func (xl xlObjects) MakeBucketWithLocation(ctx context.Context, bucket, location
writeQuorum := getWriteQuorum(len(storageDisks))
err := reduceWriteQuorumErrs(ctx, g.Wait(), bucketOpIgnoredErrs, writeQuorum)
if err == errXLWriteQuorum {
// Purge successfully created buckets if we don't have writeQuorum.
undoMakeBucket(storageDisks, bucket)
}
return toObjectErr(err, bucket)
}
@ -94,25 +90,6 @@ func undoDeleteBucket(storageDisks []StorageAPI, bucket string) {
g.Wait()
}
// undo make bucket operation upon quorum failure.
func undoMakeBucket(storageDisks []StorageAPI, bucket string) {
g := errgroup.WithNErrs(len(storageDisks))
// Undo previous make bucket entry on all underlying storage disks.
for index := range storageDisks {
if storageDisks[index] == nil {
continue
}
index := index
g.Go(func() error {
_ = storageDisks[index].DeleteVol(bucket, false)
return nil
}, index)
}
// Wait for all make vol to finish.
g.Wait()
}
// getBucketInfo - returns the BucketInfo from one of the load balanced disks.
func (xl xlObjects) getBucketInfo(ctx context.Context, bucketName string) (bucketInfo BucketInfo, err error) {
var bucketErrs []error

View file

@ -150,10 +150,6 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w
reducedErr = reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum)
if reducedErr != nil {
if reducedErr == errXLWriteQuorum {
// Purge successfully created buckets if we don't have writeQuorum.
undoMakeBucket(storageDisks, bucket)
}
return res, reducedErr
}

View file

@ -25,43 +25,6 @@ import (
"github.com/minio/minio/pkg/madmin"
)
// Tests undoes and validates if the undoing completes successfully.
func TestUndoMakeBucket(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nDisks := 16
fsDirs, err := getRandomDisks(nDisks)
if err != nil {
t.Fatal(err)
}
defer removeRoots(fsDirs)
// Remove format.json on 16 disks.
obj, _, err := initObjectLayer(ctx, mustGetZoneEndpoints(fsDirs...))
if err != nil {
t.Fatal(err)
}
bucketName := getRandomBucketName()
if err = obj.MakeBucketWithLocation(ctx, bucketName, "", false); err != nil {
t.Fatal(err)
}
z := obj.(*xlZones)
xl := z.zones[0].sets[0]
undoMakeBucket(xl.getDisks(), bucketName)
// Validate if bucket was deleted properly.
_, err = obj.GetBucketInfo(ctx, bucketName)
if err != nil {
switch err.(type) {
case BucketNotFound:
default:
t.Fatal(err)
}
}
}
func TestHealObjectCorrupted(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View file

@ -315,25 +315,6 @@ func (z *xlZones) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, upd
return firstErr
}
// This function is used to undo a successful MakeBucket operation.
func undoMakeBucketZones(bucket string, zones []*xlSets, errs []error) {
g := errgroup.WithNErrs(len(zones))
// Undo previous make bucket entry on all underlying zones.
for index := range zones {
index := index
g.Go(func() error {
if errs[index] == nil {
return zones[index].DeleteBucket(GlobalContext, bucket, false)
}
return nil
}, index)
}
// Wait for all delete bucket to finish.
g.Wait()
}
// MakeBucketWithLocation - creates a new bucket across all zones simultaneously
// even if one of the sets fail to create buckets, we proceed all the successful
// operations.
@ -363,12 +344,9 @@ func (z *xlZones) MakeBucketWithLocation(ctx context.Context, bucket, location s
}
errs := g.Wait()
// Upon even a single write quorum error we undo all previously created buckets.
// Return the first encountered error
for _, err := range errs {
if err != nil {
if _, ok := err.(InsufficientWriteQuorum); ok {
undoMakeBucketZones(bucket, z.zones, errs)
}
return err
}
}