minio/cmd/bucket-metadata-sys.go
Klaus Post ad511b0eb8
tests: Fix occasional data race (#11223)
CI tests could trigger a data race.

Servers are generally not expected to reinitialize, so tests could trigger data races when reinitializing and async operations are running.

We add the option to safely reset global vars instead of overwriting.

Fixes races like:

```
WARNING: DATA RACE
Read at 0x00000477ab18 by goroutine 1159:
  github.com/minio/minio/cmd.FileInfo.ToObjectInfo()
      /home/runner/work/minio/minio/cmd/erasure-metadata.go:105 +0x16d
  github.com/minio/minio/cmd.erasureObjects.putObject()
      /home/runner/work/minio/minio/cmd/erasure-object.go:748 +0x13f8
  github.com/minio/minio/cmd.(*erasureObjects).listPath.func3.2()
      /home/runner/work/minio/minio/cmd/metacache-set.go:682 +0x7d3
  github.com/minio/minio/cmd.newMetacacheBlockWriter.func1.2()
      /home/runner/work/minio/minio/cmd/metacache-stream.go:777 +0x1c4
  github.com/minio/minio/cmd.newMetacacheBlockWriter.func1()
      /home/runner/work/minio/minio/cmd/metacache-stream.go:806 +0x614

Previous write at 0x00000477ab18 by goroutine 1269:
  [failed to restore the stack]

Goroutine 1159 (running) created at:
  github.com/minio/minio/cmd.newMetacacheBlockWriter()
      /home/runner/work/minio/minio/cmd/metacache-stream.go:760 +0x112
  github.com/minio/minio/cmd.(*erasureObjects).listPath.func3()
      /home/runner/work/minio/minio/cmd/metacache-set.go:672 +0xe22

Goroutine 1269 (running) created at:
  testing.(*T).Run()
      /opt/hostedtoolcache/go/1.14.13/x64/src/testing/testing.go:1095 +0x537
  testing.runTests.func1()
      /opt/hostedtoolcache/go/1.14.13/x64/src/testing/testing.go:1339 +0xa6
  testing.tRunner()
      /opt/hostedtoolcache/go/1.14.13/x64/src/testing/testing.go:1050 +0x1eb
  testing.runTests()
      /opt/hostedtoolcache/go/1.14.13/x64/src/testing/testing.go:1337 +0x594
  testing.(*M).Run()
      /opt/hostedtoolcache/go/1.14.13/x64/src/testing/testing.go:1252 +0x2ff
  github.com/minio/minio/cmd.TestMain()
      /home/runner/work/minio/minio/cmd/test-utils_test.go:120 +0x44e
  main.main()
      _testmain.go:1408 +0x223
==================
==================
WARNING: DATA RACE
Read at 0x00000477aae8 by goroutine 1159:
  github.com/minio/minio/cmd.(*BucketVersioningSys).Enabled()
      /home/runner/work/minio/minio/cmd/bucket-versioning.go:26 +0x52
  github.com/minio/minio/cmd.FileInfo.ToObjectInfo()
      /home/runner/work/minio/minio/cmd/erasure-metadata.go:105 +0x197
  github.com/minio/minio/cmd.erasureObjects.putObject()
      /home/runner/work/minio/minio/cmd/erasure-object.go:748 +0x13f8
  github.com/minio/minio/cmd.(*erasureObjects).listPath.func3.2()
      /home/runner/work/minio/minio/cmd/metacache-set.go:682 +0x7d3
  github.com/minio/minio/cmd.newMetacacheBlockWriter.func1.2()
      /home/runner/work/minio/minio/cmd/metacache-stream.go:777 +0x1c4
  github.com/minio/minio/cmd.newMetacacheBlockWriter.func1()
      /home/runner/work/minio/minio/cmd/metacache-stream.go:806 +0x614

Previous write at 0x00000477aae8 by goroutine 1269:
  [failed to restore the stack]

Goroutine 1159 (running) created at:
  github.com/minio/minio/cmd.newMetacacheBlockWriter()
      /home/runner/work/minio/minio/cmd/metacache-stream.go:760 +0x112
  github.com/minio/minio/cmd.(*erasureObjects).listPath.func3()
      /home/runner/work/minio/minio/cmd/metacache-set.go:672 +0xe22

Goroutine 1269 (running) created at:
  testing.(*T).Run()
      /opt/hostedtoolcache/go/1.14.13/x64/src/testing/testing.go:1095 +0x537
  testing.runTests.func1()
      /opt/hostedtoolcache/go/1.14.13/x64/src/testing/testing.go:1339 +0xa6
  testing.tRunner()
      /opt/hostedtoolcache/go/1.14.13/x64/src/testing/testing.go:1050 +0x1eb
  testing.runTests()
      /opt/hostedtoolcache/go/1.14.13/x64/src/testing/testing.go:1337 +0x594
  testing.(*M).Run()
      /opt/hostedtoolcache/go/1.14.13/x64/src/testing/testing.go:1252 +0x2ff
  github.com/minio/minio/cmd.TestMain()
      /home/runner/work/minio/minio/cmd/test-utils_test.go:120 +0x44e
  main.main()
      _testmain.go:1408 +0x223
==================
```
2021-01-05 10:45:26 -08:00

494 lines
14 KiB
Go

/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/cmd/crypto"
"github.com/minio/minio/cmd/logger"
bucketsse "github.com/minio/minio/pkg/bucket/encryption"
"github.com/minio/minio/pkg/bucket/lifecycle"
objectlock "github.com/minio/minio/pkg/bucket/object/lock"
"github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio/pkg/bucket/replication"
"github.com/minio/minio/pkg/bucket/versioning"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/sync/errgroup"
)
// BucketMetadataSys captures all bucket metadata for a given cluster.
type BucketMetadataSys struct {
sync.RWMutex
metadataMap map[string]BucketMetadata
}
// Remove bucket metadata from memory.
func (sys *BucketMetadataSys) Remove(bucket string) {
if globalIsGateway {
return
}
sys.Lock()
delete(sys.metadataMap, bucket)
globalBucketMonitor.DeleteBucket(bucket)
sys.Unlock()
}
// Set - sets a new metadata in-memory.
// Only a shallow copy is saved and fields with references
// cannot be modified without causing a race condition,
// so they should be replaced atomically and not appended to, etc.
// Data is not persisted to disk.
func (sys *BucketMetadataSys) Set(bucket string, meta BucketMetadata) {
if globalIsGateway {
return
}
if bucket != minioMetaBucket {
sys.Lock()
sys.metadataMap[bucket] = meta
sys.Unlock()
}
}
// Update update bucket metadata for the specified config file.
// The configData data should not be modified after being sent here.
func (sys *BucketMetadataSys) Update(bucket string, configFile string, configData []byte) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
if globalIsGateway {
// This code is needed only for gateway implementations.
switch configFile {
case bucketSSEConfig:
if globalGatewayName == NASBackendGateway {
meta, err := loadBucketMetadata(GlobalContext, objAPI, bucket)
if err != nil {
return err
}
meta.EncryptionConfigXML = configData
return meta.Save(GlobalContext, objAPI)
}
case bucketLifecycleConfig:
if globalGatewayName == NASBackendGateway {
meta, err := loadBucketMetadata(GlobalContext, objAPI, bucket)
if err != nil {
return err
}
meta.LifecycleConfigXML = configData
return meta.Save(GlobalContext, objAPI)
}
case bucketTaggingConfig:
if globalGatewayName == NASBackendGateway {
meta, err := loadBucketMetadata(GlobalContext, objAPI, bucket)
if err != nil {
return err
}
meta.TaggingConfigXML = configData
return meta.Save(GlobalContext, objAPI)
}
case bucketNotificationConfig:
if globalGatewayName == NASBackendGateway {
meta, err := loadBucketMetadata(GlobalContext, objAPI, bucket)
if err != nil {
return err
}
meta.NotificationConfigXML = configData
return meta.Save(GlobalContext, objAPI)
}
case bucketPolicyConfig:
if configData == nil {
return objAPI.DeleteBucketPolicy(GlobalContext, bucket)
}
config, err := policy.ParseConfig(bytes.NewReader(configData), bucket)
if err != nil {
return err
}
return objAPI.SetBucketPolicy(GlobalContext, bucket, config)
}
return NotImplemented{}
}
if bucket == minioMetaBucket {
return errInvalidArgument
}
meta, err := loadBucketMetadata(GlobalContext, objAPI, bucket)
if err != nil {
return err
}
switch configFile {
case bucketPolicyConfig:
meta.PolicyConfigJSON = configData
case bucketNotificationConfig:
meta.NotificationConfigXML = configData
case bucketLifecycleConfig:
meta.LifecycleConfigXML = configData
case bucketSSEConfig:
meta.EncryptionConfigXML = configData
case bucketTaggingConfig:
meta.TaggingConfigXML = configData
case bucketQuotaConfigFile:
meta.QuotaConfigJSON = configData
case objectLockConfig:
if !globalIsErasure && !globalIsDistErasure {
return NotImplemented{}
}
meta.ObjectLockConfigXML = configData
case bucketVersioningConfig:
if !globalIsErasure && !globalIsDistErasure {
return NotImplemented{}
}
meta.VersioningConfigXML = configData
case bucketReplicationConfig:
if !globalIsErasure && !globalIsDistErasure {
return NotImplemented{}
}
meta.ReplicationConfigXML = configData
case bucketTargetsFile:
meta.BucketTargetsConfigJSON, meta.BucketTargetsConfigMetaJSON, err = encryptBucketMetadata(meta.Name, configData, crypto.Context{bucket: meta.Name, bucketTargetsFile: bucketTargetsFile})
if err != nil {
return fmt.Errorf("Error encrypting bucket target metadata %w", err)
}
default:
return fmt.Errorf("Unknown bucket %s metadata update requested %s", bucket, configFile)
}
if err := meta.Save(GlobalContext, objAPI); err != nil {
return err
}
sys.Set(bucket, meta)
globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket)
return nil
}
// Get metadata for a bucket.
// If no metadata exists errConfigNotFound is returned and a new metadata is returned.
// Only a shallow copy is returned, so referenced data should not be modified,
// but can be replaced atomically.
//
// This function should only be used with
// - GetBucketInfo
// - ListBuckets
// For all other bucket specific metadata, use the relevant
// calls implemented specifically for each of those features.
func (sys *BucketMetadataSys) Get(bucket string) (BucketMetadata, error) {
if globalIsGateway || bucket == minioMetaBucket {
return newBucketMetadata(bucket), errConfigNotFound
}
sys.RLock()
defer sys.RUnlock()
meta, ok := sys.metadataMap[bucket]
if !ok {
return newBucketMetadata(bucket), errConfigNotFound
}
return meta, nil
}
// GetVersioningConfig returns configured versioning config
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetVersioningConfig(bucket string) (*versioning.Versioning, error) {
meta, err := sys.GetConfig(bucket)
if err != nil {
return nil, err
}
return meta.versioningConfig, nil
}
// GetTaggingConfig returns configured tagging config
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetTaggingConfig(bucket string) (*tags.Tags, error) {
meta, err := sys.GetConfig(bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, BucketTaggingNotFound{Bucket: bucket}
}
return nil, err
}
if meta.taggingConfig == nil {
return nil, BucketTaggingNotFound{Bucket: bucket}
}
return meta.taggingConfig, nil
}
// GetObjectLockConfig returns configured object lock config
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetObjectLockConfig(bucket string) (*objectlock.Config, error) {
meta, err := sys.GetConfig(bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, BucketObjectLockConfigNotFound{Bucket: bucket}
}
return nil, err
}
if meta.objectLockConfig == nil {
return nil, BucketObjectLockConfigNotFound{Bucket: bucket}
}
return meta.objectLockConfig, nil
}
// GetLifecycleConfig returns configured lifecycle config
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetLifecycleConfig(bucket string) (*lifecycle.Lifecycle, error) {
meta, err := sys.GetConfig(bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, BucketLifecycleNotFound{Bucket: bucket}
}
return nil, err
}
if meta.lifecycleConfig == nil {
return nil, BucketLifecycleNotFound{Bucket: bucket}
}
return meta.lifecycleConfig, nil
}
// GetNotificationConfig returns configured notification config
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetNotificationConfig(bucket string) (*event.Config, error) {
if globalIsGateway && globalGatewayName == NASBackendGateway {
// Only needed in case of NAS gateway.
objAPI := newObjectLayerFn()
if objAPI == nil {
return nil, errServerNotInitialized
}
meta, err := loadBucketMetadata(GlobalContext, objAPI, bucket)
if err != nil {
return nil, err
}
return meta.notificationConfig, nil
}
meta, err := sys.GetConfig(bucket)
if err != nil {
return nil, err
}
return meta.notificationConfig, nil
}
// GetSSEConfig returns configured SSE config
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetSSEConfig(bucket string) (*bucketsse.BucketSSEConfig, error) {
meta, err := sys.GetConfig(bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, BucketSSEConfigNotFound{Bucket: bucket}
}
return nil, err
}
if meta.sseConfig == nil {
return nil, BucketSSEConfigNotFound{Bucket: bucket}
}
return meta.sseConfig, nil
}
// GetPolicyConfig returns configured bucket policy
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetPolicyConfig(bucket string) (*policy.Policy, error) {
if globalIsGateway {
objAPI := newObjectLayerFn()
if objAPI == nil {
return nil, errServerNotInitialized
}
return objAPI.GetBucketPolicy(GlobalContext, bucket)
}
meta, err := sys.GetConfig(bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, BucketPolicyNotFound{Bucket: bucket}
}
return nil, err
}
if meta.policyConfig == nil {
return nil, BucketPolicyNotFound{Bucket: bucket}
}
return meta.policyConfig, nil
}
// GetQuotaConfig returns configured bucket quota
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetQuotaConfig(bucket string) (*madmin.BucketQuota, error) {
meta, err := sys.GetConfig(bucket)
if err != nil {
return nil, err
}
return meta.quotaConfig, nil
}
// GetReplicationConfig returns configured bucket replication config
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetReplicationConfig(ctx context.Context, bucket string) (*replication.Config, error) {
meta, err := sys.GetConfig(bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, BucketReplicationConfigNotFound{Bucket: bucket}
}
return nil, err
}
if meta.replicationConfig == nil {
return nil, BucketReplicationConfigNotFound{Bucket: bucket}
}
return meta.replicationConfig, nil
}
// GetBucketTargetsConfig returns configured bucket targets for this bucket
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetBucketTargetsConfig(bucket string) (*madmin.BucketTargets, error) {
meta, err := sys.GetConfig(bucket)
if err != nil {
return nil, err
}
if meta.bucketTargetConfig == nil {
return nil, BucketRemoteTargetNotFound{Bucket: bucket}
}
return meta.bucketTargetConfig, nil
}
// GetBucketTarget returns the target for the bucket and arn.
func (sys *BucketMetadataSys) GetBucketTarget(bucket string, arn string) (madmin.BucketTarget, error) {
targets, err := sys.GetBucketTargetsConfig(bucket)
if err != nil {
return madmin.BucketTarget{}, err
}
for _, t := range targets.Targets {
if t.Arn == arn {
return t, nil
}
}
return madmin.BucketTarget{}, errConfigNotFound
}
// GetConfig returns a specific configuration from the bucket metadata.
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetConfig(bucket string) (BucketMetadata, error) {
objAPI := newObjectLayerFn()
if objAPI == nil {
return newBucketMetadata(bucket), errServerNotInitialized
}
if globalIsGateway {
return newBucketMetadata(bucket), NotImplemented{}
}
if bucket == minioMetaBucket {
return newBucketMetadata(bucket), errInvalidArgument
}
sys.RLock()
meta, ok := sys.metadataMap[bucket]
sys.RUnlock()
if ok {
return meta, nil
}
meta, err := loadBucketMetadata(GlobalContext, objAPI, bucket)
if err != nil {
return meta, err
}
sys.Lock()
sys.metadataMap[bucket] = meta
sys.Unlock()
return meta, nil
}
// Init - initializes bucket metadata system for all buckets.
func (sys *BucketMetadataSys) Init(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error {
if objAPI == nil {
return errServerNotInitialized
}
// In gateway mode, we don't need to load the policies
// from the backend.
if globalIsGateway {
return nil
}
// Load bucket metadata sys in background
go sys.load(ctx, buckets, objAPI)
return nil
}
// concurrently load bucket metadata to speed up loading bucket metadata.
func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) {
g := errgroup.WithNErrs(len(buckets))
for index := range buckets {
index := index
g.Go(func() error {
_, _ = objAPI.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{
// Ensure heal opts for bucket metadata be deep healed all the time.
ScanMode: madmin.HealDeepScan,
})
meta, err := loadBucketMetadata(ctx, objAPI, buckets[index].Name)
if err != nil {
return err
}
sys.Lock()
sys.metadataMap[buckets[index].Name] = meta
sys.Unlock()
return nil
}, index)
}
for _, err := range g.Wait() {
if err != nil {
logger.LogIf(ctx, err)
}
}
}
// Loads bucket metadata for all buckets into BucketMetadataSys.
func (sys *BucketMetadataSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) {
count := 100 // load 100 bucket metadata at a time.
for {
if len(buckets) < count {
sys.concurrentLoad(ctx, buckets, objAPI)
return
}
sys.concurrentLoad(ctx, buckets[:count], objAPI)
buckets = buckets[count:]
}
}
// Reset the state of the BucketMetadataSys.
func (sys *BucketMetadataSys) Reset() {
sys.Lock()
for k := range sys.metadataMap {
delete(sys.metadataMap, k)
}
sys.Unlock()
}
// NewBucketMetadataSys - creates new policy system.
func NewBucketMetadataSys() *BucketMetadataSys {
return &BucketMetadataSys{
metadataMap: make(map[string]BucketMetadata),
}
}