Reload bucket targets when replication config is refreshed (#16684)

This commit is contained in:
Poorna 2023-02-21 21:18:49 -08:00 committed by GitHub
parent 59a5456091
commit 9202c6e26a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -202,7 +202,7 @@ func (sys *BucketMetadataSys) Get(bucket string) (BucketMetadata, error) {
// GetVersioningConfig returns configured versioning config
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetVersioningConfig(bucket string) (*versioning.Versioning, time.Time, error) {
meta, err := sys.GetConfig(GlobalContext, bucket)
meta, _, err := sys.GetConfig(GlobalContext, bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return &versioning.Versioning{XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/"}, meta.Created, nil
@ -215,7 +215,7 @@ func (sys *BucketMetadataSys) GetVersioningConfig(bucket string) (*versioning.Ve
// GetTaggingConfig returns configured tagging config
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetTaggingConfig(bucket string) (*tags.Tags, time.Time, error) {
meta, err := sys.GetConfig(GlobalContext, bucket)
meta, _, err := sys.GetConfig(GlobalContext, bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, time.Time{}, BucketTaggingNotFound{Bucket: bucket}
@ -231,7 +231,7 @@ func (sys *BucketMetadataSys) GetTaggingConfig(bucket string) (*tags.Tags, time.
// GetObjectLockConfig returns configured object lock config
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetObjectLockConfig(bucket string) (*objectlock.Config, time.Time, error) {
meta, err := sys.GetConfig(GlobalContext, bucket)
meta, _, err := sys.GetConfig(GlobalContext, bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, time.Time{}, BucketObjectLockConfigNotFound{Bucket: bucket}
@ -247,7 +247,7 @@ func (sys *BucketMetadataSys) GetObjectLockConfig(bucket string) (*objectlock.Co
// GetLifecycleConfig returns configured lifecycle config
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetLifecycleConfig(bucket string) (*lifecycle.Lifecycle, error) {
meta, err := sys.GetConfig(GlobalContext, bucket)
meta, _, err := sys.GetConfig(GlobalContext, bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, BucketLifecycleNotFound{Bucket: bucket}
@ -263,7 +263,7 @@ func (sys *BucketMetadataSys) GetLifecycleConfig(bucket string) (*lifecycle.Life
// GetNotificationConfig returns configured notification config
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetNotificationConfig(bucket string) (*event.Config, error) {
meta, err := sys.GetConfig(GlobalContext, bucket)
meta, _, err := sys.GetConfig(GlobalContext, bucket)
if err != nil {
return nil, err
}
@ -273,7 +273,7 @@ func (sys *BucketMetadataSys) GetNotificationConfig(bucket string) (*event.Confi
// GetSSEConfig returns configured SSE config
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetSSEConfig(bucket string) (*bucketsse.BucketSSEConfig, time.Time, error) {
meta, err := sys.GetConfig(GlobalContext, bucket)
meta, _, err := sys.GetConfig(GlobalContext, bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, time.Time{}, BucketSSEConfigNotFound{Bucket: bucket}
@ -288,7 +288,7 @@ func (sys *BucketMetadataSys) GetSSEConfig(bucket string) (*bucketsse.BucketSSEC
// CreatedAt returns the time of creation of bucket
func (sys *BucketMetadataSys) CreatedAt(bucket string) (time.Time, error) {
meta, err := sys.GetConfig(GlobalContext, bucket)
meta, _, err := sys.GetConfig(GlobalContext, bucket)
if err != nil {
return time.Time{}, err
}
@ -298,7 +298,7 @@ func (sys *BucketMetadataSys) CreatedAt(bucket string) (time.Time, error) {
// GetPolicyConfig returns configured bucket policy
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetPolicyConfig(bucket string) (*policy.Policy, time.Time, error) {
meta, err := sys.GetConfig(GlobalContext, bucket)
meta, _, err := sys.GetConfig(GlobalContext, bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, time.Time{}, BucketPolicyNotFound{Bucket: bucket}
@ -314,7 +314,7 @@ func (sys *BucketMetadataSys) GetPolicyConfig(bucket string) (*policy.Policy, ti
// GetQuotaConfig returns configured bucket quota
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetQuotaConfig(ctx context.Context, bucket string) (*madmin.BucketQuota, time.Time, error) {
meta, err := sys.GetConfig(ctx, bucket)
meta, _, err := sys.GetConfig(ctx, bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, time.Time{}, BucketQuotaConfigNotFound{Bucket: bucket}
@ -327,7 +327,7 @@ func (sys *BucketMetadataSys) GetQuotaConfig(ctx context.Context, bucket string)
// GetReplicationConfig returns configured bucket replication config
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetReplicationConfig(ctx context.Context, bucket string) (*replication.Config, time.Time, error) {
meta, err := sys.GetConfig(ctx, bucket)
meta, reloaded, err := sys.GetConfig(ctx, bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, time.Time{}, BucketReplicationConfigNotFound{Bucket: bucket}
@ -338,13 +338,18 @@ func (sys *BucketMetadataSys) GetReplicationConfig(ctx context.Context, bucket s
if meta.replicationConfig == nil {
return nil, time.Time{}, BucketReplicationConfigNotFound{Bucket: bucket}
}
if reloaded {
globalBucketTargetSys.set(BucketInfo{
Name: bucket,
}, meta)
}
return meta.replicationConfig, meta.ReplicationConfigUpdatedAt, nil
}
// GetBucketTargetsConfig returns configured bucket targets for this bucket
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetBucketTargetsConfig(bucket string) (*madmin.BucketTargets, error) {
meta, err := sys.GetConfig(GlobalContext, bucket)
meta, reloaded, err := sys.GetConfig(GlobalContext, bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, BucketRemoteTargetNotFound{Bucket: bucket}
@ -354,6 +359,11 @@ func (sys *BucketMetadataSys) GetBucketTargetsConfig(bucket string) (*madmin.Buc
if meta.bucketTargetConfig == nil {
return nil, BucketRemoteTargetNotFound{Bucket: bucket}
}
if reloaded {
globalBucketTargetSys.set(BucketInfo{
Name: bucket,
}, meta)
}
return meta.bucketTargetConfig, nil
}
@ -373,31 +383,32 @@ func (sys *BucketMetadataSys) GetConfigFromDisk(ctx context.Context, bucket stri
// GetConfig returns a specific configuration from the bucket metadata.
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetConfig(ctx context.Context, bucket string) (BucketMetadata, error) {
// reloaded will be true if metadata refreshed from disk
func (sys *BucketMetadataSys) GetConfig(ctx context.Context, bucket string) (meta BucketMetadata, reloaded bool, err error) {
objAPI := newObjectLayerFn()
if objAPI == nil {
return newBucketMetadata(bucket), errServerNotInitialized
return newBucketMetadata(bucket), reloaded, errServerNotInitialized
}
if isMinioMetaBucketName(bucket) {
return newBucketMetadata(bucket), errInvalidArgument
return newBucketMetadata(bucket), reloaded, errInvalidArgument
}
sys.RLock()
meta, ok := sys.metadataMap[bucket]
sys.RUnlock()
if ok {
return meta, nil
return meta, reloaded, nil
}
meta, err := loadBucketMetadata(ctx, objAPI, bucket)
meta, err = loadBucketMetadata(ctx, objAPI, bucket)
if err != nil {
return meta, err
return meta, reloaded, err
}
sys.Lock()
sys.metadataMap[bucket] = meta
sys.Unlock()
return meta, nil
return meta, true, nil
}
// Init - initializes bucket metadata system for all buckets.