Add support for tier-remove and tier-verify (#14382)

* Add tier remove support only if it's empty
* Add support for tier verify
This commit is contained in:
Krishnan Parthasarathi 2022-02-23 13:34:25 -08:00 committed by GitHub
parent 9d7648f02f
commit 27f64dd9a4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 103 additions and 1 deletions

View file

@ -182,6 +182,12 @@ func toAdminAPIErr(ctx context.Context, err error) APIError {
Description: err.Error(),
HTTPStatusCode: http.StatusConflict,
}
case errors.Is(err, errTierBackendNotEmpty):
apiErr = APIError{
Code: "XMinioAdminTierBackendNotEmpty",
Description: err.Error(),
HTTPStatusCode: http.StatusBadRequest,
}
case errors.Is(err, errTierInsufficientCreds):
apiErr = APIError{
Code: "XMinioAdminTierInsufficientCreds",

View file

@ -191,7 +191,8 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) {
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/tier").HandlerFunc(gz(httpTraceHdrs(adminAPI.AddTierHandler)))
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/tier/{tier}").HandlerFunc(gz(httpTraceHdrs(adminAPI.EditTierHandler)))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/tier").HandlerFunc(gz(httpTraceHdrs(adminAPI.ListTierHandler)))
adminRouter.Methods(http.MethodDelete).Path(adminVersion + "/tier/{tier}").HandlerFunc(gz(httpTraceHdrs(adminAPI.RemoveTierHandler)))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/tier/{tier}").HandlerFunc(gz(httpTraceHdrs(adminAPI.VerifyTierHandler)))
// Tier stats
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/tier-stats").HandlerFunc(gz(httpTraceHdrs(adminAPI.TierStatsHandler)))

View file

@ -335,6 +335,8 @@ func ErrorRespToObjectError(err error, params ...string) error {
switch minioErr.StatusCode {
case http.StatusMethodNotAllowed:
err = toObjectErr(errMethodNotAllowed, bucket, object)
case http.StatusBadGateway:
return BackendDown{Err: err.Error()}
}
return err
}

View file

@ -205,6 +205,69 @@ func (api adminAPIHandlers) EditTierHandler(w http.ResponseWriter, r *http.Reque
writeSuccessNoContent(w)
}
func (api adminAPIHandlers) RemoveTierHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "RemoveTier")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
if !globalIsErasure {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
objAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SetTierAction)
if objAPI == nil || globalNotificationSys == nil || globalTierConfigMgr == nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
vars := mux.Vars(r)
tier := vars["tier"]
if err := globalTierConfigMgr.Reload(ctx, objAPI); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
if err := globalTierConfigMgr.Remove(ctx, tier); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
if err := globalTierConfigMgr.Save(ctx, objAPI); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
globalNotificationSys.LoadTransitionTierConfig(ctx)
writeSuccessNoContent(w)
}
func (api adminAPIHandlers) VerifyTierHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "VerifyTier")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
if !globalIsErasure {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
objAPI, _ := validateAdminReq(ctx, w, r, iampolicy.ListTierAction)
if objAPI == nil || globalNotificationSys == nil || globalTierConfigMgr == nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
vars := mux.Vars(r)
tier := vars["tier"]
if err := globalTierConfigMgr.Verify(ctx, tier); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
writeSuccessNoContent(w)
}
func (api adminAPIHandlers) TierStatsHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "TierStats")

View file

@ -40,6 +40,7 @@ var (
errTierInsufficientCreds = errors.New("insufficient tier credentials supplied")
errTierBackendInUse = errors.New("remote tier backend already in use")
errTierTypeUnsupported = errors.New("unsupported tier type")
errTierBackendNotEmpty = errors.New("remote tier not empty")
)
const (
@ -116,6 +117,35 @@ func (config *TierConfigMgr) Add(ctx context.Context, tier madmin.TierConfig) er
return nil
}
// Remove removes tier if it is empty.
func (config *TierConfigMgr) Remove(ctx context.Context, tier string) error {
d, err := config.getDriver(tier)
if err != nil {
return err
}
if inuse, err := d.InUse(ctx); err != nil {
return err
} else if inuse {
return errTierBackendNotEmpty
} else {
config.Lock()
delete(config.Tiers, tier)
delete(config.drivercache, tier)
config.Unlock()
}
return nil
}
// Verify verifies if tier's config is valid by performing all supported
// operations on the corresponding warmbackend.
func (config *TierConfigMgr) Verify(ctx context.Context, tier string) error {
d, err := config.getDriver(tier)
if err != nil {
return err
}
return checkWarmBackend(ctx, d)
}
// Empty returns if tier targets are empty
func (config *TierConfigMgr) Empty() bool {
return len(config.ListTiers()) == 0