fix: Delete() of bucket metadata should not parse the config (#15904)

This commit is contained in:
Harshavardhana 2022-10-19 17:55:09 -07:00 committed by GitHub
parent 3dbef72dc7
commit a8332efa94
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 52 additions and 25 deletions

View file

@ -204,7 +204,7 @@ func (api objectAPIHandlers) DeleteBucketEncryptionHandler(w http.ResponseWriter
} }
// Delete bucket encryption config from object layer // Delete bucket encryption config from object layer
updatedAt, err := globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, nil) updatedAt, err := globalBucketMetadataSys.Delete(ctx, bucket, bucketSSEConfig)
if err != nil { if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return return

View file

@ -1618,7 +1618,7 @@ func (api objectAPIHandlers) DeleteBucketTaggingHandler(w http.ResponseWriter, r
return return
} }
updatedAt, err := globalBucketMetadataSys.Update(ctx, bucket, bucketTaggingConfig, nil) updatedAt, err := globalBucketMetadataSys.Delete(ctx, bucket, bucketTaggingConfig)
if err != nil { if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return return

View file

@ -168,7 +168,7 @@ func (api objectAPIHandlers) DeleteBucketLifecycleHandler(w http.ResponseWriter,
return return
} }
if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketLifecycleConfig, nil); err != nil { if _, err := globalBucketMetadataSys.Delete(ctx, bucket, bucketLifecycleConfig); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return return
} }

View file

@ -81,9 +81,7 @@ func (sys *BucketMetadataSys) Set(bucket string, meta BucketMetadata) {
} }
} }
// Update update bucket metadata for the specified config file. func (sys *BucketMetadataSys) updateAndParse(ctx context.Context, bucket string, configFile string, configData []byte, parse bool) (updatedAt time.Time, err error) {
// The configData data should not be modified after being sent here.
func (sys *BucketMetadataSys) Update(ctx context.Context, bucket string, configFile string, configData []byte) (updatedAt time.Time, err error) {
objAPI := newObjectLayerFn() objAPI := newObjectLayerFn()
if objAPI == nil { if objAPI == nil {
return updatedAt, errServerNotInitialized return updatedAt, errServerNotInitialized
@ -107,7 +105,7 @@ func (sys *BucketMetadataSys) Update(ctx context.Context, bucket string, configF
return updatedAt, errInvalidArgument return updatedAt, errInvalidArgument
} }
meta, err := loadBucketMetadata(ctx, objAPI, bucket) meta, err := loadBucketMetadataParse(ctx, objAPI, bucket, parse)
if err != nil { if err != nil {
if !globalIsErasure && !globalIsDistErasure && errors.Is(err, errVolumeNotFound) { if !globalIsErasure && !globalIsDistErasure && errors.Is(err, errVolumeNotFound) {
// Only single drive mode needs this fallback. // Only single drive mode needs this fallback.
@ -165,6 +163,18 @@ func (sys *BucketMetadataSys) Update(ctx context.Context, bucket string, configF
return updatedAt, nil return updatedAt, nil
} }
// Delete delete the bucket metadata for the specified bucket.
// must be used by all callers instead of using Update() with nil configData.
func (sys *BucketMetadataSys) Delete(ctx context.Context, bucket string, configFile string) (updatedAt time.Time, err error) {
return sys.updateAndParse(ctx, bucket, configFile, nil, false)
}
// Update update bucket metadata for the specified bucket.
// The configData data should not be modified after being sent here.
func (sys *BucketMetadataSys) Update(ctx context.Context, bucket string, configFile string, configData []byte) (updatedAt time.Time, err error) {
return sys.updateAndParse(ctx, bucket, configFile, configData, true)
}
// Get metadata for a bucket. // Get metadata for a bucket.
// If no metadata exists errConfigNotFound is returned and a new metadata is returned. // If no metadata exists errConfigNotFound is returned and a new metadata is returned.
// Only a shallow copy is returned, so referenced data should not be modified, // Only a shallow copy is returned, so referenced data should not be modified,

View file

@ -162,8 +162,7 @@ func (b *BucketMetadata) Load(ctx context.Context, api ObjectLayer, name string)
return err return err
} }
// loadBucketMetadata loads and migrates to bucket metadata. func loadBucketMetadataParse(ctx context.Context, objectAPI ObjectLayer, bucket string, parse bool) (BucketMetadata, error) {
func loadBucketMetadata(ctx context.Context, objectAPI ObjectLayer, bucket string) (BucketMetadata, error) {
b := newBucketMetadata(bucket) b := newBucketMetadata(bucket)
err := b.Load(ctx, objectAPI, b.Name) err := b.Load(ctx, objectAPI, b.Name)
if err != nil && !errors.Is(err, errConfigNotFound) { if err != nil && !errors.Is(err, errConfigNotFound) {
@ -172,8 +171,22 @@ func loadBucketMetadata(ctx context.Context, objectAPI ObjectLayer, bucket strin
if err == nil { if err == nil {
b.defaultTimestamps() b.defaultTimestamps()
} }
// Old bucket without bucket metadata. Hence we migrate existing settings.
if err := b.convertLegacyConfigs(ctx, objectAPI); err != nil { configs, err := b.getAllLegacyConfigs(ctx, objectAPI)
if err != nil {
return b, err
}
if len(configs) == 0 {
if parse {
// nothing to update, parse and proceed.
err = b.parseAllConfigs(ctx, objectAPI)
}
} else {
// Old bucket without bucket metadata. Hence we migrate existing settings.
err = b.convertLegacyConfigs(ctx, objectAPI, configs)
}
if err != nil {
return b, err return b, err
} }
@ -185,6 +198,11 @@ func loadBucketMetadata(ctx context.Context, objectAPI ObjectLayer, bucket strin
return b, nil return b, nil
} }
// loadBucketMetadata loads and migrates to bucket metadata.
func loadBucketMetadata(ctx context.Context, objectAPI ObjectLayer, bucket string) (BucketMetadata, error) {
return loadBucketMetadataParse(ctx, objectAPI, bucket, true)
}
// parseAllConfigs will parse all configs and populate the private fields. // parseAllConfigs will parse all configs and populate the private fields.
// The first error encountered is returned. // The first error encountered is returned.
func (b *BucketMetadata) parseAllConfigs(ctx context.Context, objectAPI ObjectLayer) (err error) { func (b *BucketMetadata) parseAllConfigs(ctx context.Context, objectAPI ObjectLayer) (err error) {
@ -277,7 +295,7 @@ func (b *BucketMetadata) parseAllConfigs(ctx context.Context, objectAPI ObjectLa
return nil return nil
} }
func (b *BucketMetadata) convertLegacyConfigs(ctx context.Context, objectAPI ObjectLayer) error { func (b *BucketMetadata) getAllLegacyConfigs(ctx context.Context, objectAPI ObjectLayer) (map[string][]byte, error) {
legacyConfigs := []string{ legacyConfigs := []string{
legacyBucketObjectLockEnabledConfigFile, legacyBucketObjectLockEnabledConfigFile,
bucketPolicyConfig, bucketPolicyConfig,
@ -291,7 +309,7 @@ func (b *BucketMetadata) convertLegacyConfigs(ctx context.Context, objectAPI Obj
objectLockConfig, objectLockConfig,
} }
configs := make(map[string][]byte) configs := make(map[string][]byte, len(legacyConfigs))
// Handle migration from lockEnabled to newer format. // Handle migration from lockEnabled to newer format.
if b.LockEnabled { if b.LockEnabled {
@ -316,16 +334,15 @@ func (b *BucketMetadata) convertLegacyConfigs(ctx context.Context, objectAPI Obj
continue continue
} }
return err return nil, err
} }
configs[legacyFile] = configData configs[legacyFile] = configData
} }
if len(configs) == 0 { return configs, nil
// nothing to update, return right away. }
return b.parseAllConfigs(ctx, objectAPI)
}
func (b *BucketMetadata) convertLegacyConfigs(ctx context.Context, objectAPI ObjectLayer, configs map[string][]byte) error {
for legacyFile, configData := range configs { for legacyFile, configData := range configs {
switch legacyFile { switch legacyFile {
case legacyBucketObjectLockEnabledConfigFile: case legacyBucketObjectLockEnabledConfigFile:

View file

@ -149,7 +149,7 @@ func (api objectAPIHandlers) DeleteBucketPolicyHandler(w http.ResponseWriter, r
return return
} }
updatedAt, err := globalBucketMetadataSys.Update(ctx, bucket, bucketPolicyConfig, nil) updatedAt, err := globalBucketMetadataSys.Delete(ctx, bucket, bucketPolicyConfig)
if err != nil { if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return return

View file

@ -165,7 +165,7 @@ func (api objectAPIHandlers) DeleteBucketReplicationConfigHandler(w http.Respons
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationDenyEditError), r.URL) writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationDenyEditError), r.URL)
return return
} }
if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketReplicationConfig, nil); err != nil { if _, err := globalBucketMetadataSys.Delete(ctx, bucket, bucketReplicationConfig); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return return
} }

View file

@ -1361,7 +1361,7 @@ func (c *SiteReplicationSys) PeerBucketPolicyHandler(ctx context.Context, bucket
} }
// Delete the bucket policy // Delete the bucket policy
_, err := globalBucketMetadataSys.Update(ctx, bucket, bucketPolicyConfig, nil) _, err := globalBucketMetadataSys.Delete(ctx, bucket, bucketPolicyConfig)
if err != nil { if err != nil {
return wrapSRErr(err) return wrapSRErr(err)
} }
@ -1391,7 +1391,7 @@ func (c *SiteReplicationSys) PeerBucketTaggingHandler(ctx context.Context, bucke
} }
// Delete the tags // Delete the tags
_, err := globalBucketMetadataSys.Update(ctx, bucket, bucketTaggingConfig, nil) _, err := globalBucketMetadataSys.Delete(ctx, bucket, bucketTaggingConfig)
if err != nil { if err != nil {
return wrapSRErr(err) return wrapSRErr(err)
} }
@ -1445,7 +1445,7 @@ func (c *SiteReplicationSys) PeerBucketSSEConfigHandler(ctx context.Context, buc
} }
// Delete sse config // Delete sse config
_, err := globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, nil) _, err := globalBucketMetadataSys.Delete(ctx, bucket, bucketSSEConfig)
if err != nil { if err != nil {
return wrapSRErr(err) return wrapSRErr(err)
} }
@ -1475,7 +1475,7 @@ func (c *SiteReplicationSys) PeerBucketQuotaConfigHandler(ctx context.Context, b
} }
// Delete the bucket policy // Delete the bucket policy
_, err := globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, nil) _, err := globalBucketMetadataSys.Delete(ctx, bucket, bucketQuotaConfigFile)
if err != nil { if err != nil {
return wrapSRErr(err) return wrapSRErr(err)
} }
@ -2234,7 +2234,7 @@ func (c *SiteReplicationSys) RemoveRemoteTargetsForEndpoint(ctx context.Context,
return err return err
} }
} else { } else {
if _, err := globalBucketMetadataSys.Update(ctx, b.Name, bucketReplicationConfig, nil); err != nil { if _, err := globalBucketMetadataSys.Delete(ctx, b.Name, bucketReplicationConfig); err != nil {
return err return err
} }
} }