minio/cmd/bucket-metadata-sys.go
Harshavardhana bd032d13ff
migrate all bucket metadata into a single file (#9586)
this is a major overhaul by migrating off all
bucket metadata related configs into a single
object '.metadata.bin' this allows us for faster
bootups across 1000's of buckets and as well
as keeps the code simple enough for future
work and additions.

Additionally also fixes #9396, #9394
2020-05-19 13:53:54 -07:00

246 lines
6.2 KiB
Go

/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"bytes"
"context"
"fmt"
"sync"
"github.com/minio/minio/pkg/bucket/policy"
)
// BucketMetadataSys captures all bucket metadata for a given cluster.
type BucketMetadataSys struct {
sync.RWMutex
metadataMap map[string]BucketMetadata
// Do fallback to old config when unable to find a bucket config
fallback bool
}
// Remove bucket metadata from memory.
func (sys *BucketMetadataSys) Remove(bucket string) {
if globalIsGateway {
return
}
sys.Lock()
delete(sys.metadataMap, bucket)
sys.Unlock()
}
// Set - sets a new metadata in-memory.
// Only a shallow copy is saved and fields with references
// cannot be modified without causing a race condition,
// so they should be replaced atomically and not appended to, etc.
// Data is not persisted to disk.
func (sys *BucketMetadataSys) Set(bucket string, meta BucketMetadata) {
if globalIsGateway {
return
}
if bucket != minioMetaBucket {
sys.Lock()
sys.metadataMap[bucket] = meta
sys.Unlock()
}
}
// Update update bucket metadata for the specified config file.
// The configData data should not be modified after being sent here.
func (sys *BucketMetadataSys) Update(bucket string, configFile string, configData []byte) error {
objAPI := newObjectLayerWithoutSafeModeFn()
if objAPI == nil {
return errServerNotInitialized
}
if globalIsGateway {
if configFile == bucketPolicyConfig {
config, err := policy.ParseConfig(bytes.NewReader(configData), bucket)
if err != nil {
return err
}
return objAPI.SetBucketPolicy(GlobalContext, bucket, config)
}
return NotImplemented{}
}
if bucket == minioMetaBucket {
return errInvalidArgument
}
defer globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket)
sys.Lock()
meta, ok := sys.metadataMap[bucket]
sys.Unlock()
if !ok {
return BucketNotFound{Bucket: bucket}
}
switch configFile {
case bucketPolicyConfig:
meta.PolicyConfigJSON = configData
case bucketNotificationConfig:
meta.NotificationXML = configData
case bucketLifecycleConfig:
meta.LifecycleConfigXML = configData
case bucketSSEConfig:
meta.EncryptionConfigXML = configData
case bucketTaggingConfigFile:
meta.TaggingConfigXML = configData
case objectLockConfig:
meta.ObjectLockConfigurationXML = configData
case bucketQuotaConfigFile:
meta.QuotaConfigJSON = configData
default:
return fmt.Errorf("Unknown bucket %s metadata update requested %s", bucket, configFile)
}
if err := meta.Save(GlobalContext, objAPI); err != nil {
return err
}
sys.Lock()
sys.metadataMap[bucket] = meta
sys.Unlock()
return nil
}
// Get metadata for a bucket.
// If no metadata exists errConfigNotFound is returned and a new metadata is returned.
// Only a shallow copy is returned, so referenced data should not be modified,
// but can be replaced atomically.
func (sys *BucketMetadataSys) Get(bucket string) (BucketMetadata, error) {
if globalIsGateway || bucket == minioMetaBucket {
return newBucketMetadata(bucket), errConfigNotFound
}
sys.RLock()
defer sys.RUnlock()
meta, ok := sys.metadataMap[bucket]
if !ok {
return newBucketMetadata(bucket), errConfigNotFound
}
return meta, nil
}
// GetConfig returns a specific configuration from the bucket metadata.
// The returned data should be copied before being modified.
func (sys *BucketMetadataSys) GetConfig(bucket string, configFile string) ([]byte, error) {
if globalIsGateway {
return nil, NotImplemented{}
}
if bucket == minioMetaBucket {
return nil, errInvalidArgument
}
sys.RLock()
defer sys.RUnlock()
meta, ok := sys.metadataMap[bucket]
if !ok {
if !sys.fallback {
return nil, errConfigNotFound
}
objAPI := newObjectLayerWithoutSafeModeFn()
if objAPI == nil {
return nil, errServerNotInitialized
}
var err error
meta, err = loadBucketMetadata(GlobalContext, objAPI, bucket)
if err != nil {
return nil, err
}
sys.metadataMap[bucket] = meta
}
var configData []byte
switch configFile {
case bucketPolicyConfig:
configData = meta.PolicyConfigJSON
case bucketNotificationConfig:
configData = meta.NotificationXML
case bucketLifecycleConfig:
configData = meta.LifecycleConfigXML
case bucketSSEConfig:
configData = meta.EncryptionConfigXML
case bucketTaggingConfigFile:
configData = meta.TaggingConfigXML
case objectLockConfig:
configData = meta.ObjectLockConfigurationXML
case bucketQuotaConfigFile:
configData = meta.QuotaConfigJSON
default:
return nil, fmt.Errorf("Unknown bucket %s metadata update requested %s", bucket, configFile)
}
if len(configData) == 0 {
return nil, errConfigNotFound
}
return configData, nil
}
// Init - initializes bucket metadata system for all buckets.
func (sys *BucketMetadataSys) Init(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error {
if objAPI == nil {
return errServerNotInitialized
}
// In gateway mode, we don't need to load the policies
// from the backend.
if globalIsGateway {
return nil
}
// Load PolicySys once during boot.
return sys.load(ctx, buckets, objAPI)
}
// Loads bucket metadata for all buckets into BucketMetadataSys.
// When if done successfully fallback will be disabled.
func (sys *BucketMetadataSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error {
sys.Lock()
defer sys.Unlock()
for _, bucket := range buckets {
meta, err := loadBucketMetadata(ctx, objAPI, bucket.Name)
if err != nil {
return err
}
sys.metadataMap[bucket.Name] = meta
}
sys.fallback = false
return nil
}
// NewBucketMetadataSys - creates new policy system.
func NewBucketMetadataSys() *BucketMetadataSys {
return &BucketMetadataSys{
metadataMap: make(map[string]BucketMetadata),
// Do fallback until all buckets have been loaded.
fallback: true,
}
}