1
0
mirror of https://github.com/minio/minio synced 2024-07-09 04:05:53 +00:00

allow background IAM load to speed up startup (#9796)

Also fix healthcheck handler to run success
only if object layer has initialized fully
for S3 API access call.
This commit is contained in:
Harshavardhana 2020-06-09 19:19:03 -07:00 committed by GitHub
parent 342ade03f6
commit 4790868878
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 124 additions and 64 deletions

View File

@ -70,7 +70,7 @@ func prepareAdminXLTestBed(ctx context.Context) (*adminXLTestBed, error) {
newAllSubsystems()
initAllSubsystems(objLayer)
initAllSubsystems(ctx, objLayer)
// Setup admin mgmt REST API handlers.
adminRouter := mux.NewRouter()

View File

@ -274,7 +274,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
if enableIAMOps {
// Initialize IAM sys.
logger.FatalIf(globalIAMSys.Init(GlobalContext, newObject), "Unable to initialize IAM system")
startBackgroundIAMLoad(GlobalContext)
}
if globalCacheConfig.Enabled {

View File

@ -37,7 +37,7 @@ func ReadinessCheckHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(ctx, globalAPIConfig.getReadyDeadline())
defer cancel()
if !objLayer.IsReady(ctx) {
if !objLayer.IsReady(ctx) && newObjectLayerFn() == nil {
writeResponse(w, http.StatusServiceUnavailable, nil, mimeNone)
return
}

View File

@ -588,14 +588,6 @@ func listIAMConfigItems(ctx context.Context, objAPI ObjectLayer, pathPrefix stri
return
}
// Attempt a slow down load only when server is
// active and initialized.
if !globalSafeMode {
// Slow down listing and loading for config items to
// reduce load on the server
waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()))
}
marker = lo.NextMarker
lister := dirList(lo)
if !dirs {

View File

@ -21,8 +21,10 @@ import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/minio/minio-go/v6/pkg/set"
"github.com/minio/minio/cmd/config"
@ -30,6 +32,7 @@ import (
"github.com/minio/minio/pkg/auth"
iampolicy "github.com/minio/minio/pkg/iam/policy"
"github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/retry"
)
// UsersSysType - defines the type of users and groups system that is
@ -403,10 +406,18 @@ func (sys *IAMSys) doIAMConfigMigration(ctx context.Context) error {
return sys.store.migrateBackendFormat(ctx)
}
// Loads IAM users and policies in background, any un-handled
// error means this code can potentially crash the server
// in such a situation manual intervention is necessary.
func startBackgroundIAMLoad(ctx context.Context) {
go globalIAMSys.Init(ctx, newObjectLayerWithoutSafeModeFn())
}
// Init - initializes config system from iam.json
func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) error {
func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
if objAPI == nil {
return errServerNotInitialized
logger.LogIf(ctx, errServerNotInitialized)
return
}
if globalEtcdClient == nil {
@ -419,18 +430,75 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) error {
sys.EnableLDAPSys()
}
// Migrate IAM configuration
if err := sys.doIAMConfigMigration(ctx); err != nil {
return err
retryCtx, cancel := context.WithCancel(ctx)
// Indicate to our routine to exit cleanly upon return.
defer cancel()
// Hold the lock for migration only.
txnLk := objAPI.NewNSLock(retryCtx, minioMetaBucket, minioConfigPrefix+"/iam.lock")
// Initializing IAM sub-system needs a retry mechanism for
// the following reasons:
// - Read quorum is lost just after the initialization
// of the object layer.
// - Write quorum not met when upgrading configuration
// version is needed, migration is needed etc.
rquorum := InsufficientReadQuorum{}
wquorum := InsufficientWriteQuorum{}
for range retry.NewTimerWithJitter(retryCtx, time.Second, 5*time.Second, retry.MaxJitter) {
// let one of the server acquire the lock, if not let them timeout.
// which shall be retried again by this loop.
if err := txnLk.GetLock(newDynamicTimeout(1*time.Second, 5*time.Second)); err != nil {
logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. trying to acquire lock")
continue
}
if globalEtcdClient != nil {
// **** WARNING ****
// Migrating to encrypted backend on etcd should happen before initialization of
// IAM sub-system, make sure that we do not move the above codeblock elsewhere.
if err := migrateIAMConfigsEtcdToEncrypted(ctx, globalEtcdClient); err != nil {
txnLk.Unlock()
logger.LogIf(ctx, fmt.Errorf("Unable to handle encrypted backend for iam and policies: %w", err))
return
}
}
// These messages only meant primarily for distributed setup, so only log during distributed setup.
if globalIsDistXL {
logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. lock acquired")
}
// Migrate IAM configuration
if err := sys.doIAMConfigMigration(ctx); err != nil {
txnLk.Unlock()
if errors.Is(err, errDiskNotFound) ||
errors.Is(err, errConfigNotFound) ||
errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.As(err, &rquorum) ||
errors.As(err, &wquorum) ||
isErrBucketNotFound(err) {
logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v)", err)
continue
}
logger.LogIf(ctx, fmt.Errorf("Unable to migration IAM users and policies: %w", err))
return
}
// Successfully migrated
txnLk.Unlock()
break
}
err := sys.store.loadAll(ctx, sys)
logger.LogIf(ctx, sys.store.loadAll(ctx, sys))
// Invalidate the old cred after finishing IAM initialization
globalOldCred = auth.Credentials{}
go sys.store.watch(ctx, sys)
return err
}
// DeletePolicy - deletes a canned policy from backend or etcd.

View File

@ -677,14 +677,15 @@ func (client *peerRESTClient) BackgroundHealStatus() (madmin.BgHealState, error)
}
// GetLocalDiskIDs - get a peer's local disks' IDs.
func (client *peerRESTClient) GetLocalDiskIDs(ctx context.Context) []string {
func (client *peerRESTClient) GetLocalDiskIDs(ctx context.Context) (diskIDs []string) {
respBody, err := client.callWithContext(ctx, peerRESTMethodGetLocalDiskIDs, nil, nil, -1)
if err != nil {
logger.LogIf(ctx, err)
return nil
}
defer http.DrainBody(respBody)
var diskIDs []string
if err = gob.NewDecoder(respBody).Decode(&diskIDs); err != nil {
logger.LogIf(ctx, err)
return nil
}
return diskIDs

View File

@ -169,15 +169,19 @@ func newAllSubsystems() {
globalBucketQuotaSys = NewBucketQuotaSys()
}
func initSafeMode() (err error) {
newObject := newObjectLayerWithoutSafeModeFn()
func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) {
// Create cancel context to control 'newRetryTimer' go routine.
retryCtx, cancel := context.WithCancel(ctx)
// Indicate to our routine to exit cleanly upon return.
defer cancel()
// Make sure to hold lock for entire migration to avoid
// such that only one server should migrate the entire config
// at a given time, this big transaction lock ensures this
// appropriately. This is also true for rotation of encrypted
// content.
txnLk := newObject.NewNSLock(GlobalContext, minioMetaBucket, minioConfigPrefix+"/transaction.lock")
txnLk := newObject.NewNSLock(retryCtx, minioMetaBucket, minioConfigPrefix+"/transaction.lock")
defer func(txnLk RWLocker) {
txnLk.Unlock()
@ -203,11 +207,6 @@ func initSafeMode() (err error) {
// **** WARNING ****
// Migrating to encrypted backend should happen before initialization of any
// sub-systems, make sure that we do not move the above codeblock elsewhere.
// Create cancel context to control 'newRetryTimer' go routine.
retryCtx, cancel := context.WithCancel(GlobalContext)
// Indicate to our routine to exit cleanly upon return.
defer cancel()
// Initializing sub-systems needs a retry mechanism for
// the following reasons:
@ -220,7 +219,7 @@ func initSafeMode() (err error) {
for range retry.NewTimer(retryCtx) {
// let one of the server acquire the lock, if not let them timeout.
// which shall be retried again by this loop.
if err = txnLk.GetLock(newDynamicTimeout(5*time.Second, 30*time.Second)); err != nil {
if err = txnLk.GetLock(newDynamicTimeout(1*time.Second, 10*time.Second)); err != nil {
logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock")
continue
}
@ -236,7 +235,7 @@ func initSafeMode() (err error) {
if err = handleEncryptedConfigBackend(newObject, true); err == nil {
// Upon success migrating the config, initialize all sub-systems
// if all sub-systems initialized successfully return right away
if err = initAllSubsystems(newObject); err == nil {
if err = initAllSubsystems(retryCtx, newObject); err == nil {
// All successful return.
if globalIsDistXL {
// These messages only meant primarily for distributed setup, so only log during distributed setup.
@ -272,7 +271,7 @@ func initSafeMode() (err error) {
return errors.New("Initializing sub-systems stopped gracefully")
}
func initAllSubsystems(newObject ObjectLayer) (err error) {
func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
// %w is used by all error returns here to make sure
// we wrap the underlying error, make sure when you
// are modifying this code that you do so, if and when
@ -281,7 +280,7 @@ func initAllSubsystems(newObject ObjectLayer) (err error) {
var buckets []BucketInfo
if globalIsDistXL || globalIsXL {
// List buckets to heal, and be re-used for loading configs.
buckets, err = newObject.ListBucketsHeal(GlobalContext)
buckets, err = newObject.ListBucketsHeal(ctx)
if err != nil {
return fmt.Errorf("Unable to list buckets to heal: %w", err)
}
@ -290,21 +289,21 @@ func initAllSubsystems(newObject ObjectLayer) (err error) {
wquorum := &InsufficientWriteQuorum{}
rquorum := &InsufficientReadQuorum{}
for _, bucket := range buckets {
if err = newObject.MakeBucketWithLocation(GlobalContext, bucket.Name, "", false); err != nil {
if err = newObject.MakeBucketWithLocation(ctx, bucket.Name, "", false); err != nil {
if errors.As(err, &wquorum) || errors.As(err, &rquorum) {
// Retrun the error upwards for the caller to retry.
// Return the error upwards for the caller to retry.
return fmt.Errorf("Unable to heal bucket: %w", err)
}
if _, ok := err.(BucketExists); !ok {
// ignore any other error and log for investigation.
logger.LogIf(GlobalContext, err)
logger.LogIf(ctx, err)
continue
}
// Bucket already exists, nothing that needs to be done.
}
}
} else {
buckets, err = newObject.ListBuckets(GlobalContext)
buckets, err = newObject.ListBuckets(ctx)
if err != nil {
return fmt.Errorf("Unable to list buckets: %w", err)
}
@ -315,21 +314,14 @@ func initAllSubsystems(newObject ObjectLayer) (err error) {
return fmt.Errorf("Unable to initialize config system: %w", err)
}
if globalEtcdClient != nil {
// **** WARNING ****
// Migrating to encrypted backend on etcd should happen before initialization of
// IAM sub-systems, make sure that we do not move the above codeblock elsewhere.
if err = migrateIAMConfigsEtcdToEncrypted(GlobalContext, globalEtcdClient); err != nil {
return fmt.Errorf("Unable to handle encrypted backend for iam and policies: %w", err)
}
}
if err = globalIAMSys.Init(GlobalContext, newObject); err != nil {
return fmt.Errorf("Unable to initialize IAM system: %w", err)
// Populate existing buckets to the etcd backend
if globalDNSConfig != nil {
// Background this operation.
go initFederatorBackend(buckets, newObject)
}
// Initialize bucket metadata sub-system.
if err = globalBucketMetadataSys.Init(GlobalContext, buckets, newObject); err != nil {
if err := globalBucketMetadataSys.Init(ctx, buckets, newObject); err != nil {
return fmt.Errorf("Unable to initialize bucket metadata sub-system: %w", err)
}
@ -338,11 +330,6 @@ func initAllSubsystems(newObject ObjectLayer) (err error) {
return fmt.Errorf("Unable to initialize notification system: %w", err)
}
// Populate existing buckets to the etcd backend
if globalDNSConfig != nil {
initFederatorBackend(buckets, newObject)
}
return nil
}
@ -523,7 +510,10 @@ func serverMain(ctx *cli.Context) {
go startBackgroundOps(GlobalContext, newObject)
logger.FatalIf(initSafeMode(), "Unable to initialize server switching into safe-mode")
logger.FatalIf(initSafeMode(GlobalContext, newObject), "Unable to initialize server switching into safe-mode")
// Initialize users credentials and policies in background.
go startBackgroundIAMLoad(GlobalContext)
if globalCacheConfig.Enabled {
// initialize the new disk cache objects.

View File

@ -331,7 +331,7 @@ func UnstartedTestServer(t TestErrHandler, instanceType string) TestServer {
newAllSubsystems()
initAllSubsystems(objLayer)
initAllSubsystems(ctx, objLayer)
return testServer
}
@ -1556,7 +1556,7 @@ func newTestObjectLayer(ctx context.Context, endpointZones EndpointZones) (newOb
newAllSubsystems()
initAllSubsystems(z)
initAllSubsystems(ctx, z)
return z, nil
}
@ -1602,7 +1602,7 @@ func removeDiskN(disks []string, n int) {
func initAPIHandlerTest(obj ObjectLayer, endpoints []string) (string, http.Handler, error) {
newAllSubsystems()
initAllSubsystems(obj)
initAllSubsystems(context.Background(), obj)
// get random bucket name.
bucketName := getRandomBucketName()
@ -1878,7 +1878,7 @@ func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) {
t.Fatal("Unexpected error", err)
}
initAllSubsystems(objLayer)
initAllSubsystems(ctx, objLayer)
// Executing the object layer tests for single node setup.
objTest(objLayer, FSTestStr, t)
@ -1890,7 +1890,7 @@ func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) {
t.Fatalf("Initialization of object layer failed for XL setup: %s", err)
}
initAllSubsystems(objLayer)
initAllSubsystems(ctx, objLayer)
defer removeRoots(append(fsDirs, fsDir))
// Executing the object layer tests for XL.

View File

@ -160,7 +160,10 @@ func (xl xlObjects) newMultipartUpload(ctx context.Context, bucket string, objec
// we now know the number of blocks this object needs for data and parity.
// establish the writeQuorum using this data
writeQuorum := dataBlocks + 1
writeQuorum := dataBlocks
if dataBlocks == parityBlocks {
writeQuorum = dataBlocks + 1
}
if meta["content-type"] == "" {
contentType := mimedb.TypeByExtension(path.Ext(object))

View File

@ -500,7 +500,10 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
// we now know the number of blocks this object needs for data and parity.
// writeQuorum is dataBlocks + 1
writeQuorum := dataDrives + 1
writeQuorum := dataDrives
if dataDrives == parityDrives {
writeQuorum = dataDrives + 1
}
// Delete temporary object in the event of failure.
// If PutObject succeeded there would be no temporary

2
go.mod
View File

@ -67,7 +67,7 @@ require (
github.com/miekg/dns v1.1.8
github.com/minio/cli v1.22.0
github.com/minio/highwayhash v1.0.0
github.com/minio/minio-go/v6 v6.0.56
github.com/minio/minio-go/v6 v6.0.57
github.com/minio/parquet-go v0.0.0-20200414234858-838cfa8aae61
github.com/minio/sha256-simd v0.1.1
github.com/minio/simdjson-go v0.1.5-0.20200303142138-b17fe061ea37

7
go.sum
View File

@ -232,6 +232,7 @@ github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eT
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/cpuid v1.2.2 h1:1xAgYebNnsb9LKCdLOvFWtAxGU/33mjJtyOVbmUa0Us=
github.com/klauspost/cpuid v1.2.2/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.2.4 h1:EBfaK0SWSwk+fgk6efYFWdzl8MwRWoOO1gkmiaTXPW4=
github.com/klauspost/cpuid v1.2.4/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/pgzip v1.2.1 h1:oIPZROsWuPHpOdMVWLuJZXwgjhrW8r1yEX8UqMyeNHM=
@ -273,9 +274,11 @@ github.com/minio/cli v1.22.0 h1:VTQm7lmXm3quxO917X3p+el1l0Ca5X3S4PM2ruUYO68=
github.com/minio/cli v1.22.0/go.mod h1:bYxnK0uS629N3Bq+AOZZ+6lwF77Sodk4+UL9vNuXhOY=
github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA=
github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc=
github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4=
github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
github.com/minio/minio-go/v6 v6.0.53/go.mod h1:DIvC/IApeHX8q1BAMVCXSXwpmrmM+I+iBvhvztQorfI=
github.com/minio/minio-go/v6 v6.0.56 h1:H4+v6UFV1V7VkEf1HjL15W9OvTL1Gy8EbMmjQZHqEbg=
github.com/minio/minio-go/v6 v6.0.56/go.mod h1:KQMM+/44DSlSGSQWSfRrAZ12FVMmpWNuX37i2AX0jfI=
github.com/minio/minio-go/v6 v6.0.57 h1:ixPkbKkyD7IhnluRgQpGSpHdpvNVaW6OD5R9IAO/9Tw=
github.com/minio/minio-go/v6 v6.0.57/go.mod h1:5+R/nM9Pwrh0vqF+HbYYDQ84wdUFPyXHkrdT4AIkifM=
github.com/minio/parquet-go v0.0.0-20200414234858-838cfa8aae61 h1:pUSI/WKPdd77gcuoJkSzhJ4wdS8OMDOsOu99MtpXEQA=
github.com/minio/parquet-go v0.0.0-20200414234858-838cfa8aae61/go.mod h1:4trzEJ7N1nBTd5Tt7OCZT5SEin+WiAXpdJ/WgPkESA8=
github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU=