add force delete option of non-empty bucket (#9166)

passing HTTP header `x-minio-force-delete: true` would 
allow standard S3 API DeleteBucket to delete a non-empty
bucket forcefully.
This commit is contained in:
Bala FA 2020-03-28 04:52:59 +00:00 committed by GitHub
parent 7f8f1ad4e3
commit 2c3e34f001
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 177 additions and 70 deletions

View file

@ -528,7 +528,7 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
}
if err = globalDNSConfig.Put(bucket); err != nil {
objectAPI.DeleteBucket(ctx, bucket)
objectAPI.DeleteBucket(ctx, bucket, false)
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
@ -877,6 +877,18 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
vars := mux.Vars(r)
bucket := vars["bucket"]
forceDelete := false
if vs, found := r.Header[http.CanonicalHeaderKey("x-minio-force-delete")]; found {
switch strings.ToLower(strings.Join(vs, "")) {
case "true":
forceDelete = true
case "false":
default:
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL, guessIsBrowserReq(r))
return
}
}
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
@ -891,7 +903,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
deleteBucket := objectAPI.DeleteBucket
// Attempt to delete bucket.
if err := deleteBucket(ctx, bucket); err != nil {
if err := deleteBucket(ctx, bucket, forceDelete); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}

View file

@ -29,6 +29,51 @@ import (
"github.com/minio/minio/pkg/auth"
)
// Wrapper for calling RemoveBucket HTTP handler tests for both XL multiple disks and single node setup.
func TestRemoveBucketHandler(t *testing.T) {
ExecObjectLayerAPITest(t, testRemoveBucketHandler, []string{"RemoveBucket"})
}
func testRemoveBucketHandler(obj ObjectLayer, instanceType, bucketName string, apiRouter http.Handler,
credentials auth.Credentials, t *testing.T) {
_, err := obj.PutObject(context.Background(), bucketName, "test-object", mustGetPutObjReader(t, bytes.NewBuffer([]byte{}), int64(0), "", "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"), ObjectOptions{})
// if object upload fails stop the test.
if err != nil {
t.Fatalf("Error uploading object: <ERROR> %v", err)
}
// initialize httptest Recorder, this records any mutations to response writer inside the handler.
rec := httptest.NewRecorder()
// construct HTTP request for DELETE bucket.
req, err := newTestSignedRequestV4("DELETE", getBucketLocationURL("", bucketName), 0, nil, credentials.AccessKey, credentials.SecretKey, nil)
if err != nil {
t.Fatalf("Test %s: Failed to create HTTP request for RemoveBucketHandler: <ERROR> %v", instanceType, err)
}
// Since `apiRouter` satisfies `http.Handler` it has a ServeHTTP to execute the logic of the handler.
// Call the ServeHTTP to execute the handler.
apiRouter.ServeHTTP(rec, req)
switch rec.Code {
case http.StatusOK, http.StatusCreated, http.StatusAccepted, http.StatusNoContent:
t.Fatalf("Test %v: expected failure, but succeeded with %v", instanceType, rec.Code)
}
// Verify response of the V2 signed HTTP request.
// initialize HTTP NewRecorder, this records any mutations to response writer inside the handler.
recV2 := httptest.NewRecorder()
// construct HTTP request for DELETE bucket.
reqV2, err := newTestSignedRequestV2("DELETE", getBucketLocationURL("", bucketName), 0, nil, credentials.AccessKey, credentials.SecretKey, nil)
if err != nil {
t.Fatalf("Test %s: Failed to create HTTP request for RemoveBucketHandler: <ERROR> %v", instanceType, err)
}
// Since `apiRouter` satisfies `http.Handler` it has a ServeHTTP to execute the logic of the handler.
// Call the ServeHTTP to execute the handler.
apiRouter.ServeHTTP(recV2, reqV2)
switch recV2.Code {
case http.StatusOK, http.StatusCreated, http.StatusAccepted, http.StatusNoContent:
t.Fatalf("Test %v: expected failure, but succeeded with %v", instanceType, recV2.Code)
}
}
// Wrapper for calling GetBucketPolicy HTTP handler tests for both XL multiple disks and single node setup.
func TestGetBucketLocationHandler(t *testing.T) {
ExecObjectLayerAPITest(t, testGetBucketLocationHandler, []string{"GetBucketLocation"})

View file

@ -397,13 +397,15 @@ func (fs *FSObjects) ListBuckets(ctx context.Context) ([]BucketInfo, error) {
// DeleteBucket - delete a bucket and all the metadata associated
// with the bucket including pending multipart, object metadata.
func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string) error {
bucketLock := fs.NewNSLock(ctx, bucket, "")
if err := bucketLock.GetLock(globalObjectTimeout); err != nil {
logger.LogIf(ctx, err)
return err
func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
if !forceDelete {
bucketLock := fs.NewNSLock(ctx, bucket, "")
if err := bucketLock.GetLock(globalObjectTimeout); err != nil {
logger.LogIf(ctx, err)
return err
}
defer bucketLock.Unlock()
}
defer bucketLock.Unlock()
atomic.AddInt64(&fs.activeIOCount, 1)
defer func() {
@ -415,9 +417,20 @@ func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string) error {
return toObjectErr(err, bucket)
}
// Attempt to delete regular bucket.
if err = fsRemoveDir(ctx, bucketDir); err != nil {
return toObjectErr(err, bucket)
if !forceDelete {
// Attempt to delete regular bucket.
if err = fsRemoveDir(ctx, bucketDir); err != nil {
return toObjectErr(err, bucket)
}
} else {
tmpBucketPath := pathJoin(fs.fsPath, minioMetaTmpBucket, bucket+"."+mustGetUUID())
if err = fsSimpleRenameFile(ctx, bucketDir, tmpBucketPath); err != nil {
return toObjectErr(err, bucket)
}
go func() {
fsRemoveAll(ctx, tmpBucketPath) // ignore returned error if any.
}()
}
// Cleanup all the bucket metadata.

View file

@ -318,16 +318,16 @@ func TestFSDeleteBucket(t *testing.T) {
}
// Test with an invalid bucket name
if err = fs.DeleteBucket(context.Background(), "fo"); !isSameType(err, BucketNotFound{}) {
if err = fs.DeleteBucket(context.Background(), "fo", false); !isSameType(err, BucketNotFound{}) {
t.Fatal("Unexpected error: ", err)
}
// Test with an inexistant bucket
if err = fs.DeleteBucket(context.Background(), "foobucket"); !isSameType(err, BucketNotFound{}) {
if err = fs.DeleteBucket(context.Background(), "foobucket", false); !isSameType(err, BucketNotFound{}) {
t.Fatal("Unexpected error: ", err)
}
// Test with a valid case
if err = fs.DeleteBucket(context.Background(), bucketName); err != nil {
if err = fs.DeleteBucket(context.Background(), bucketName, false); err != nil {
t.Fatal("Unexpected error: ", err)
}
@ -335,7 +335,7 @@ func TestFSDeleteBucket(t *testing.T) {
// Delete bucket should get error disk not found.
os.RemoveAll(disk)
if err = fs.DeleteBucket(context.Background(), bucketName); err != nil {
if err = fs.DeleteBucket(context.Background(), bucketName, false); err != nil {
if !isSameType(err, BucketNotFound{}) {
t.Fatal("Unexpected error: ", err)
}

View file

@ -559,7 +559,7 @@ func (a *azureObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketI
}
// DeleteBucket - delete a container on azure, uses Azure equivalent `ContainerURL.Delete`.
func (a *azureObjects) DeleteBucket(ctx context.Context, bucket string) error {
func (a *azureObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
containerURL := a.client.NewContainerURL(bucket)
_, err := containerURL.Delete(ctx, azblob.ContainerAccessConditions{})
return azureToObjectError(err, bucket)

View file

@ -317,7 +317,7 @@ func (l *b2Objects) ListBuckets(ctx context.Context) ([]minio.BucketInfo, error)
}
// DeleteBucket deletes a bucket on B2
func (l *b2Objects) DeleteBucket(ctx context.Context, bucket string) error {
func (l *b2Objects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return err

View file

@ -475,7 +475,7 @@ func (l *gcsGateway) ListBuckets(ctx context.Context) (buckets []minio.BucketInf
}
// DeleteBucket delete a bucket on GCS.
func (l *gcsGateway) DeleteBucket(ctx context.Context, bucket string) error {
func (l *gcsGateway) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
itObject := l.client.Bucket(bucket).Objects(ctx, &storage.Query{
Delimiter: minio.SlashSeparator,
Versions: false,

View file

@ -274,7 +274,7 @@ func hdfsIsValidBucketName(bucket string) bool {
return s3utils.CheckValidBucketNameStrict(bucket) == nil
}
func (n *hdfsObjects) DeleteBucket(ctx context.Context, bucket string) error {
func (n *hdfsObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
if !hdfsIsValidBucketName(bucket) {
return minio.BucketNameInvalid{Bucket: bucket}
}

View file

@ -398,7 +398,7 @@ func (l *ossObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketInf
}
// DeleteBucket deletes a bucket on OSS.
func (l *ossObjects) DeleteBucket(ctx context.Context, bucket string) error {
func (l *ossObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
err := l.Client.DeleteBucket(bucket)
if err != nil {
logger.LogIf(ctx, err)

View file

@ -742,7 +742,7 @@ func (l *s3EncObjects) getStalePartsForBucket(ctx context.Context, bucket string
return
}
func (l *s3EncObjects) DeleteBucket(ctx context.Context, bucket string) error {
func (l *s3EncObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
var prefix, continuationToken, delimiter, startAfter string
expParts := make(map[string]string)

View file

@ -351,7 +351,7 @@ func (l *s3Objects) ListBuckets(ctx context.Context) ([]minio.BucketInfo, error)
}
// DeleteBucket deletes a bucket on S3
func (l *s3Objects) DeleteBucket(ctx context.Context, bucket string) error {
func (l *s3Objects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
err := l.Client.RemoveBucket(bucket)
if err != nil {
return minio.ErrorRespToObjectError(err, bucket)

View file

@ -124,11 +124,11 @@ func (d *naughtyDisk) StatVol(volume string) (volInfo VolInfo, err error) {
}
return d.disk.StatVol(volume)
}
func (d *naughtyDisk) DeleteVol(volume string) (err error) {
func (d *naughtyDisk) DeleteVol(volume string, forceDelete bool) (err error) {
if err := d.calcError(); err != nil {
return err
}
return d.disk.DeleteVol(volume)
return d.disk.DeleteVol(volume, forceDelete)
}
func (d *naughtyDisk) WalkSplunk(volume, path, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error) {

View file

@ -66,7 +66,7 @@ type ObjectLayer interface {
MakeBucketWithLocation(ctx context.Context, bucket string, location string) error
GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error)
ListBuckets(ctx context.Context) (buckets []BucketInfo, err error)
DeleteBucket(ctx context.Context, bucket string) error
DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error
ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error)
ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error)
Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error

View file

@ -107,11 +107,11 @@ func (p *posixDiskIDCheck) StatVol(volume string) (vol VolInfo, err error) {
return p.storage.StatVol(volume)
}
func (p *posixDiskIDCheck) DeleteVol(volume string) (err error) {
func (p *posixDiskIDCheck) DeleteVol(volume string, forceDelete bool) (err error) {
if p.isDiskStale() {
return errDiskNotFound
}
return p.storage.DeleteVol(volume)
return p.storage.DeleteVol(volume, forceDelete)
}
func (p *posixDiskIDCheck) Walk(volume, dirPath string, marker string, recursive bool, leafFile string, readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (chan FileInfo, error) {

View file

@ -620,7 +620,7 @@ func (s *posix) StatVol(volume string) (volInfo VolInfo, err error) {
}
// DeleteVol - delete a volume.
func (s *posix) DeleteVol(volume string) (err error) {
func (s *posix) DeleteVol(volume string, forceDelete bool) (err error) {
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
@ -631,7 +631,13 @@ func (s *posix) DeleteVol(volume string) (err error) {
if err != nil {
return err
}
err = os.Remove((volumeDir))
if forceDelete {
err = os.RemoveAll(volumeDir)
} else {
err = os.Remove(volumeDir)
}
if err != nil {
switch {
case os.IsNotExist(err):

View file

@ -508,7 +508,7 @@ func TestPosixDeleteVol(t *testing.T) {
if _, ok := posixStorage.(*posixDiskIDCheck); !ok {
t.Errorf("Expected the StorageAPI to be of type *posixDiskIDCheck")
}
if err = posixStorage.DeleteVol(testCase.volName); err != testCase.expectedErr {
if err = posixStorage.DeleteVol(testCase.volName, false); err != testCase.expectedErr {
t.Fatalf("TestPosix: %d, expected: %s, got: %s", i+1, testCase.expectedErr, err)
}
}
@ -547,7 +547,7 @@ func TestPosixDeleteVol(t *testing.T) {
t.Fatalf("Unable to change permission to temporary directory %v. %v", permDeniedDir, err)
}
if err = posixStorage.DeleteVol("mybucket"); err != errDiskAccessDenied {
if err = posixStorage.DeleteVol("mybucket", false); err != errDiskAccessDenied {
t.Fatalf("expected: Permission error, got: %s", err)
}
}
@ -561,7 +561,7 @@ func TestPosixDeleteVol(t *testing.T) {
// TestPosix for delete on an removed disk.
// should fail with disk not found.
err = posixDeletedStorage.DeleteVol("Del-Vol")
err = posixDeletedStorage.DeleteVol("Del-Vol", false)
if err != errDiskNotFound {
t.Errorf("Expected: \"Disk not found\", got \"%s\"", err)
}

View file

@ -41,7 +41,7 @@ type StorageAPI interface {
MakeVolBulk(volumes ...string) (err error)
ListVols() (vols []VolInfo, err error)
StatVol(volume string) (vol VolInfo, err error)
DeleteVol(volume string) (err error)
DeleteVol(volume string, forceDelete bool) (err error)
// Walk in sorted order directly on disk.
Walk(volume, dirPath string, marker string, recursive bool, leafFile string,

View file

@ -234,9 +234,12 @@ func (client *storageRESTClient) StatVol(volume string) (volInfo VolInfo, err er
}
// DeleteVol - Deletes a volume over the network.
func (client *storageRESTClient) DeleteVol(volume string) (err error) {
func (client *storageRESTClient) DeleteVol(volume string, forceDelete bool) (err error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
if forceDelete {
values.Set(storageRESTForceDelete, "true")
}
respBody, err := client.call(storageRESTMethodDeleteVol, values, nil, -1)
defer http.DrainBody(respBody)
return err

View file

@ -17,7 +17,7 @@
package cmd
const (
storageRESTVersion = "v16" // CrawlAndGetDataUsageHandler API change
storageRESTVersion = "v17" // RemoveBucket API change
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage"
)
@ -49,22 +49,23 @@ const (
)
const (
storageRESTVolume = "volume"
storageRESTVolumes = "volumes"
storageRESTDirPath = "dir-path"
storageRESTFilePath = "file-path"
storageRESTSrcVolume = "source-volume"
storageRESTSrcPath = "source-path"
storageRESTDstVolume = "destination-volume"
storageRESTDstPath = "destination-path"
storageRESTOffset = "offset"
storageRESTLength = "length"
storageRESTShardSize = "shard-size"
storageRESTCount = "count"
storageRESTMarkerPath = "marker"
storageRESTLeafFile = "leaf-file"
storageRESTRecursive = "recursive"
storageRESTBitrotAlgo = "bitrot-algo"
storageRESTBitrotHash = "bitrot-hash"
storageRESTDiskID = "disk-id"
storageRESTVolume = "volume"
storageRESTVolumes = "volumes"
storageRESTDirPath = "dir-path"
storageRESTFilePath = "file-path"
storageRESTSrcVolume = "source-volume"
storageRESTSrcPath = "source-path"
storageRESTDstVolume = "destination-volume"
storageRESTDstPath = "destination-path"
storageRESTOffset = "offset"
storageRESTLength = "length"
storageRESTShardSize = "shard-size"
storageRESTCount = "count"
storageRESTMarkerPath = "marker"
storageRESTLeafFile = "leaf-file"
storageRESTRecursive = "recursive"
storageRESTBitrotAlgo = "bitrot-algo"
storageRESTBitrotHash = "bitrot-hash"
storageRESTDiskID = "disk-id"
storageRESTForceDelete = "force-delete"
)

View file

@ -227,7 +227,8 @@ func (s *storageRESTServer) DeleteVolHandler(w http.ResponseWriter, r *http.Requ
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
err := s.storage.DeleteVol(volume)
forceDelete := vars[storageRESTForceDelete] == "true"
err := s.storage.DeleteVol(volume, forceDelete)
if err != nil {
s.writeErrorResponse(w, err)
}

View file

@ -182,7 +182,7 @@ func testStorageAPIDeleteVol(t *testing.T, storage StorageAPI) {
}
for i, testCase := range testCases {
err := storage.DeleteVol(testCase.volumeName)
err := storage.DeleteVol(testCase.volumeName, false)
expectErr := (err != nil)
if expectErr != testCase.expectErr {

View file

@ -176,7 +176,7 @@ func (web *webAPIHandlers) MakeBucket(r *http.Request, args *MakeBucketArgs, rep
return toJSONError(ctx, err)
}
if err = globalDNSConfig.Put(args.BucketName); err != nil {
objectAPI.DeleteBucket(ctx, args.BucketName)
objectAPI.DeleteBucket(ctx, args.BucketName, false)
return toJSONError(ctx, err)
}
@ -254,7 +254,7 @@ func (web *webAPIHandlers) DeleteBucket(r *http.Request, args *RemoveBucketArgs,
deleteBucket := objectAPI.DeleteBucket
if err := deleteBucket(ctx, args.BucketName); err != nil {
if err := deleteBucket(ctx, args.BucketName, false); err != nil {
return toJSONError(ctx, err, args.BucketName)
}

View file

@ -528,7 +528,7 @@ func undoMakeBucketSets(bucket string, sets []*xlObjects, errs []error) {
index := index
g.Go(func() error {
if errs[index] == nil {
return sets[index].DeleteBucket(context.Background(), bucket)
return sets[index].DeleteBucket(context.Background(), bucket, false)
}
return nil
}, index)
@ -665,14 +665,14 @@ func (s *xlSets) IsCompressionSupported() bool {
// DeleteBucket - deletes a bucket on all sets simultaneously,
// even if one of the sets fail to delete buckets, we proceed to
// undo a successful operation.
func (s *xlSets) DeleteBucket(ctx context.Context, bucket string) error {
func (s *xlSets) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
g := errgroup.WithNErrs(len(s.sets))
// Delete buckets in parallel across all sets.
for index := range s.sets {
index := index
g.Go(func() error {
return s.sets[index].DeleteBucket(ctx, bucket)
return s.sets[index].DeleteBucket(ctx, bucket, forceDelete)
}, index)
}

View file

@ -102,7 +102,7 @@ func undoMakeBucket(storageDisks []StorageAPI, bucket string) {
}
index := index
g.Go(func() error {
_ = storageDisks[index].DeleteVol(bucket)
_ = storageDisks[index].DeleteVol(bucket, false)
return nil
}, index)
}
@ -209,10 +209,10 @@ func deleteDanglingBucket(ctx context.Context, storageDisks []StorageAPI, dErrs
for index, err := range dErrs {
if err == errVolumeNotEmpty {
// Attempt to delete bucket again.
if derr := storageDisks[index].DeleteVol(bucket); derr == errVolumeNotEmpty {
if derr := storageDisks[index].DeleteVol(bucket, false); derr == errVolumeNotEmpty {
_ = cleanupDir(ctx, storageDisks[index], bucket, "")
_ = storageDisks[index].DeleteVol(bucket)
_ = storageDisks[index].DeleteVol(bucket, false)
// Cleanup all the previously incomplete multiparts.
_ = cleanupDir(ctx, storageDisks[index], minioMetaMultipartBucket, bucket)
@ -222,7 +222,7 @@ func deleteDanglingBucket(ctx context.Context, storageDisks []StorageAPI, dErrs
}
// DeleteBucket - deletes a bucket.
func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error {
func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
// Collect if all disks report volume not found.
storageDisks := xl.getDisks()
@ -232,7 +232,7 @@ func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error {
index := index
g.Go(func() error {
if storageDisks[index] != nil {
if err := storageDisks[index].DeleteVol(bucket); err != nil {
if err := storageDisks[index].DeleteVol(bucket, forceDelete); err != nil {
return err
}
err := cleanupDir(ctx, storageDisks[index], minioMetaMultipartBucket, bucket)
@ -248,6 +248,17 @@ func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error {
// Wait for all the delete vols to finish.
dErrs := g.Wait()
if forceDelete {
for _, err := range dErrs {
if err != nil {
undoDeleteBucket(storageDisks, bucket)
return toObjectErr(err, bucket)
}
}
return nil
}
writeQuorum := len(storageDisks)/2 + 1
err := reduceWriteQuorumErrs(ctx, dErrs, bucketOpIgnoredErrs, writeQuorum)
if err == errXLWriteQuorum {

View file

@ -176,7 +176,7 @@ func TestListOnlineDisks(t *testing.T) {
// Cleanup from previous test.
obj.DeleteObject(context.Background(), bucket, object)
obj.DeleteBucket(context.Background(), bucket)
obj.DeleteBucket(context.Background(), bucket, false)
err = obj.MakeBucketWithLocation(context.Background(), "bucket", "")
if err != nil {

View file

@ -326,7 +326,7 @@ func undoMakeBucketZones(bucket string, zones []*xlSets, errs []error) {
index := index
g.Go(func() error {
if errs[index] == nil {
return zones[index].DeleteBucket(context.Background(), bucket)
return zones[index].DeleteBucket(context.Background(), bucket, false)
}
return nil
}, index)
@ -1232,9 +1232,9 @@ func (z *xlZones) IsCompressionSupported() bool {
// DeleteBucket - deletes a bucket on all zones simultaneously,
// even if one of the zones fail to delete buckets, we proceed to
// undo a successful operation.
func (z *xlZones) DeleteBucket(ctx context.Context, bucket string) error {
func (z *xlZones) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
if z.SingleZone() {
return z.zones[0].DeleteBucket(ctx, bucket)
return z.zones[0].DeleteBucket(ctx, bucket, forceDelete)
}
g := errgroup.WithNErrs(len(z.zones))
@ -1242,11 +1242,26 @@ func (z *xlZones) DeleteBucket(ctx context.Context, bucket string) error {
for index := range z.zones {
index := index
g.Go(func() error {
return z.zones[index].DeleteBucket(ctx, bucket)
return z.zones[index].DeleteBucket(ctx, bucket, forceDelete)
}, index)
}
errs := g.Wait()
if forceDelete {
for _, err := range errs {
if err != nil {
if _, ok := err.(InsufficientWriteQuorum); ok {
undoDeleteBucketZones(bucket, z.zones, errs)
}
return err
}
}
return nil
}
// For any write quorum failure, we undo all the delete buckets operation
// by creating all the buckets again.
for _, err := range errs {