config: Add api requests max & deadline configs (#9273)

Add two new configuration entries, api.requests-max and
api.requests-deadline which have the same role of
MINIO_API_REQUESTS_MAX and MINIO_API_REQUESTS_DEADLINE.
This commit is contained in:
Anis Elleuch 2020-04-14 20:46:37 +01:00 committed by GitHub
parent ec11e99667
commit 8a94aebdb8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 342 additions and 125 deletions

View file

@ -18,14 +18,9 @@ package cmd
import (
"net/http"
"strconv"
"time"
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/config"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/env"
)
func newHTTPServerFn() *xhttp.Server {
@ -92,229 +87,200 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool)
}
routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter())
requestsMax, _ := strconv.Atoi(env.Get(config.EnvAPIRequestsMax, "0"))
if requestsMax < 0 {
requestsMax = 0 // 0 means unlimited
}
var (
enabled bool
requestsMaxPerServer int
requestsDeadline time.Duration
)
if len(globalEndpoints.Hosts()) == 0 {
requestsMaxPerServer = requestsMax
} else {
requestsMaxPerServer = requestsMax / len(globalEndpoints.Hosts())
}
enabled = requestsMaxPerServer > 0
if enabled {
var err error
requestsDeadline, err = time.ParseDuration(env.Get(config.EnvAPIRequestsDeadline, "10s"))
if err != nil {
logger.Info("(%s) parsing environment value MINIO_API_REQUESTS_DEADLINE, defaulting to 10 seconds", err)
requestsDeadline = 10 * time.Second
}
}
requestsMaxCh := make(chan struct{}, requestsMaxPerServer)
for _, bucket := range routers {
// Object operations
// HeadObject
bucket.Methods(http.MethodHead).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("headobject", httpTraceAll(api.HeadObjectHandler)), enabled, requestsMaxCh, requestsDeadline))
maxClients(collectAPIStats("headobject", httpTraceAll(api.HeadObjectHandler))))
// CopyObjectPart
bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp(xhttp.AmzCopySource, ".*?(\\/|%2F).*?").HandlerFunc(maxClients(collectAPIStats("copyobjectpart", httpTraceAll(api.CopyObjectPartHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp(xhttp.AmzCopySource, ".*?(\\/|%2F).*?").HandlerFunc(maxClients(collectAPIStats("copyobjectpart", httpTraceAll(api.CopyObjectPartHandler)))).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
// PutObjectPart
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("putobjectpart", httpTraceHdrs(api.PutObjectPartHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
maxClients(collectAPIStats("putobjectpart", httpTraceHdrs(api.PutObjectPartHandler)))).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
// ListObjectParts
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("listobjectparts", httpTraceAll(api.ListObjectPartsHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("uploadId", "{uploadId:.*}")
maxClients(collectAPIStats("listobjectparts", httpTraceAll(api.ListObjectPartsHandler)))).Queries("uploadId", "{uploadId:.*}")
// CompleteMultipartUpload
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("completemutipartupload", httpTraceAll(api.CompleteMultipartUploadHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("uploadId", "{uploadId:.*}")
maxClients(collectAPIStats("completemutipartupload", httpTraceAll(api.CompleteMultipartUploadHandler)))).Queries("uploadId", "{uploadId:.*}")
// NewMultipartUpload
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("newmultipartupload", httpTraceAll(api.NewMultipartUploadHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("uploads", "")
maxClients(collectAPIStats("newmultipartupload", httpTraceAll(api.NewMultipartUploadHandler)))).Queries("uploads", "")
// AbortMultipartUpload
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("abortmultipartupload", httpTraceAll(api.AbortMultipartUploadHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("uploadId", "{uploadId:.*}")
maxClients(collectAPIStats("abortmultipartupload", httpTraceAll(api.AbortMultipartUploadHandler)))).Queries("uploadId", "{uploadId:.*}")
// GetObjectACL - this is a dummy call.
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("getobjectacl", httpTraceHdrs(api.GetObjectACLHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("acl", "")
maxClients(collectAPIStats("getobjectacl", httpTraceHdrs(api.GetObjectACLHandler)))).Queries("acl", "")
// PutObjectACL - this is a dummy call.
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("putobjectacl", httpTraceHdrs(api.PutObjectACLHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("acl", "")
maxClients(collectAPIStats("putobjectacl", httpTraceHdrs(api.PutObjectACLHandler)))).Queries("acl", "")
// GetObjectTagging
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("getobjecttagging", httpTraceHdrs(api.GetObjectTaggingHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("tagging", "")
maxClients(collectAPIStats("getobjecttagging", httpTraceHdrs(api.GetObjectTaggingHandler)))).Queries("tagging", "")
// PutObjectTagging
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("putobjecttagging", httpTraceHdrs(api.PutObjectTaggingHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("tagging", "")
maxClients(collectAPIStats("putobjecttagging", httpTraceHdrs(api.PutObjectTaggingHandler)))).Queries("tagging", "")
// DeleteObjectTagging
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("deleteobjecttagging", httpTraceHdrs(api.DeleteObjectTaggingHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("tagging", "")
maxClients(collectAPIStats("deleteobjecttagging", httpTraceHdrs(api.DeleteObjectTaggingHandler)))).Queries("tagging", "")
// SelectObjectContent
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("selectobjectcontent", httpTraceHdrs(api.SelectObjectContentHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("select", "").Queries("select-type", "2")
maxClients(collectAPIStats("selectobjectcontent", httpTraceHdrs(api.SelectObjectContentHandler)))).Queries("select", "").Queries("select-type", "2")
// GetObjectRetention
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("getobjectretention", httpTraceAll(api.GetObjectRetentionHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("retention", "")
maxClients(collectAPIStats("getobjectretention", httpTraceAll(api.GetObjectRetentionHandler)))).Queries("retention", "")
// GetObjectLegalHold
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("getobjectlegalhold", httpTraceAll(api.GetObjectLegalHoldHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("legal-hold", "")
maxClients(collectAPIStats("getobjectlegalhold", httpTraceAll(api.GetObjectLegalHoldHandler)))).Queries("legal-hold", "")
// GetObject
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("getobject", httpTraceHdrs(api.GetObjectHandler)), enabled, requestsMaxCh, requestsDeadline))
maxClients(collectAPIStats("getobject", httpTraceHdrs(api.GetObjectHandler))))
// CopyObject
bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp(xhttp.AmzCopySource, ".*?(\\/|%2F).*?").HandlerFunc(maxClients(collectAPIStats("copyobject", httpTraceAll(api.CopyObjectHandler)), enabled, requestsMaxCh, requestsDeadline))
bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp(xhttp.AmzCopySource, ".*?(\\/|%2F).*?").HandlerFunc(maxClients(collectAPIStats("copyobject", httpTraceAll(api.CopyObjectHandler))))
// PutObjectRetention
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("putobjectretention", httpTraceAll(api.PutObjectRetentionHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("retention", "")
maxClients(collectAPIStats("putobjectretention", httpTraceAll(api.PutObjectRetentionHandler)))).Queries("retention", "")
// PutObjectLegalHold
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("putobjectlegalhold", httpTraceAll(api.PutObjectLegalHoldHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("legal-hold", "")
maxClients(collectAPIStats("putobjectlegalhold", httpTraceAll(api.PutObjectLegalHoldHandler)))).Queries("legal-hold", "")
// PutObject
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("putobject", httpTraceHdrs(api.PutObjectHandler)), enabled, requestsMaxCh, requestsDeadline))
maxClients(collectAPIStats("putobject", httpTraceHdrs(api.PutObjectHandler))))
// DeleteObject
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("deleteobject", httpTraceAll(api.DeleteObjectHandler)), enabled, requestsMaxCh, requestsDeadline))
maxClients(collectAPIStats("deleteobject", httpTraceAll(api.DeleteObjectHandler))))
/// Bucket operations
// GetBucketLocation
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketlocation", httpTraceAll(api.GetBucketLocationHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("location", "")
maxClients(collectAPIStats("getbucketlocation", httpTraceAll(api.GetBucketLocationHandler)))).Queries("location", "")
// GetBucketPolicy
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketpolicy", httpTraceAll(api.GetBucketPolicyHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("policy", "")
maxClients(collectAPIStats("getbucketpolicy", httpTraceAll(api.GetBucketPolicyHandler)))).Queries("policy", "")
// GetBucketLifecycle
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketlifecycle", httpTraceAll(api.GetBucketLifecycleHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("lifecycle", "")
maxClients(collectAPIStats("getbucketlifecycle", httpTraceAll(api.GetBucketLifecycleHandler)))).Queries("lifecycle", "")
// GetBucketEncryption
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketencryption", httpTraceAll(api.GetBucketEncryptionHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("encryption", "")
maxClients(collectAPIStats("getbucketencryption", httpTraceAll(api.GetBucketEncryptionHandler)))).Queries("encryption", "")
// Dummy Bucket Calls
// GetBucketACL -- this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketacl", httpTraceAll(api.GetBucketACLHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("acl", "")
maxClients(collectAPIStats("getbucketacl", httpTraceAll(api.GetBucketACLHandler)))).Queries("acl", "")
// PutBucketACL -- this is a dummy call.
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketacl", httpTraceAll(api.PutBucketACLHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("acl", "")
maxClients(collectAPIStats("putbucketacl", httpTraceAll(api.PutBucketACLHandler)))).Queries("acl", "")
// GetBucketCors - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketcors", httpTraceAll(api.GetBucketCorsHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("cors", "")
maxClients(collectAPIStats("getbucketcors", httpTraceAll(api.GetBucketCorsHandler)))).Queries("cors", "")
// GetBucketWebsiteHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketwebsite", httpTraceAll(api.GetBucketWebsiteHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("website", "")
maxClients(collectAPIStats("getbucketwebsite", httpTraceAll(api.GetBucketWebsiteHandler)))).Queries("website", "")
// GetBucketAccelerateHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketaccelerate", httpTraceAll(api.GetBucketAccelerateHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("accelerate", "")
maxClients(collectAPIStats("getbucketaccelerate", httpTraceAll(api.GetBucketAccelerateHandler)))).Queries("accelerate", "")
// GetBucketRequestPaymentHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketrequestpayment", httpTraceAll(api.GetBucketRequestPaymentHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("requestPayment", "")
maxClients(collectAPIStats("getbucketrequestpayment", httpTraceAll(api.GetBucketRequestPaymentHandler)))).Queries("requestPayment", "")
// GetBucketLoggingHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketlogging", httpTraceAll(api.GetBucketLoggingHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("logging", "")
maxClients(collectAPIStats("getbucketlogging", httpTraceAll(api.GetBucketLoggingHandler)))).Queries("logging", "")
// GetBucketLifecycleHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketlifecycle", httpTraceAll(api.GetBucketLifecycleHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("lifecycle", "")
maxClients(collectAPIStats("getbucketlifecycle", httpTraceAll(api.GetBucketLifecycleHandler)))).Queries("lifecycle", "")
// GetBucketReplicationHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketreplication", httpTraceAll(api.GetBucketReplicationHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("replication", "")
maxClients(collectAPIStats("getbucketreplication", httpTraceAll(api.GetBucketReplicationHandler)))).Queries("replication", "")
// GetBucketTaggingHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbuckettagging", httpTraceAll(api.GetBucketTaggingHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("tagging", "")
maxClients(collectAPIStats("getbuckettagging", httpTraceAll(api.GetBucketTaggingHandler)))).Queries("tagging", "")
//DeleteBucketWebsiteHandler
bucket.Methods(http.MethodDelete).HandlerFunc(
maxClients(collectAPIStats("deletebucketwebsite", httpTraceAll(api.DeleteBucketWebsiteHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("website", "")
maxClients(collectAPIStats("deletebucketwebsite", httpTraceAll(api.DeleteBucketWebsiteHandler)))).Queries("website", "")
// DeleteBucketTaggingHandler
bucket.Methods(http.MethodDelete).HandlerFunc(
maxClients(collectAPIStats("deletebuckettagging", httpTraceAll(api.DeleteBucketTaggingHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("tagging", "")
maxClients(collectAPIStats("deletebuckettagging", httpTraceAll(api.DeleteBucketTaggingHandler)))).Queries("tagging", "")
// GetBucketObjectLockConfig
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketobjectlockconfiguration", httpTraceAll(api.GetBucketObjectLockConfigHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("object-lock", "")
maxClients(collectAPIStats("getbucketobjectlockconfiguration", httpTraceAll(api.GetBucketObjectLockConfigHandler)))).Queries("object-lock", "")
// GetBucketVersioning
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketversioning", httpTraceAll(api.GetBucketVersioningHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("versioning", "")
maxClients(collectAPIStats("getbucketversioning", httpTraceAll(api.GetBucketVersioningHandler)))).Queries("versioning", "")
// GetBucketNotification
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketnotification", httpTraceAll(api.GetBucketNotificationHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("notification", "")
maxClients(collectAPIStats("getbucketnotification", httpTraceAll(api.GetBucketNotificationHandler)))).Queries("notification", "")
// ListenBucketNotification
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listenbucketnotification", httpTraceAll(api.ListenBucketNotificationHandler))).Queries("events", "{events:.*}")
// ListMultipartUploads
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("listmultipartuploads", httpTraceAll(api.ListMultipartUploadsHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("uploads", "")
maxClients(collectAPIStats("listmultipartuploads", httpTraceAll(api.ListMultipartUploadsHandler)))).Queries("uploads", "")
// ListObjectsV2M
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("listobjectsv2M", httpTraceAll(api.ListObjectsV2MHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("list-type", "2", "metadata", "true")
maxClients(collectAPIStats("listobjectsv2M", httpTraceAll(api.ListObjectsV2MHandler)))).Queries("list-type", "2", "metadata", "true")
// ListObjectsV2
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("listobjectsv2", httpTraceAll(api.ListObjectsV2Handler)), enabled, requestsMaxCh, requestsDeadline)).Queries("list-type", "2")
maxClients(collectAPIStats("listobjectsv2", httpTraceAll(api.ListObjectsV2Handler)))).Queries("list-type", "2")
// ListBucketVersions
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("listbucketversions", httpTraceAll(api.ListBucketObjectVersionsHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("versions", "")
maxClients(collectAPIStats("listbucketversions", httpTraceAll(api.ListBucketObjectVersionsHandler)))).Queries("versions", "")
// ListObjectsV1 (Legacy)
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("listobjectsv1", httpTraceAll(api.ListObjectsV1Handler)), enabled, requestsMaxCh, requestsDeadline))
maxClients(collectAPIStats("listobjectsv1", httpTraceAll(api.ListObjectsV1Handler))))
// PutBucketLifecycle
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketlifecycle", httpTraceAll(api.PutBucketLifecycleHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("lifecycle", "")
maxClients(collectAPIStats("putbucketlifecycle", httpTraceAll(api.PutBucketLifecycleHandler)))).Queries("lifecycle", "")
// PutBucketEncryption
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketencryption", httpTraceAll(api.PutBucketEncryptionHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("encryption", "")
maxClients(collectAPIStats("putbucketencryption", httpTraceAll(api.PutBucketEncryptionHandler)))).Queries("encryption", "")
// PutBucketPolicy
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketpolicy", httpTraceAll(api.PutBucketPolicyHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("policy", "")
maxClients(collectAPIStats("putbucketpolicy", httpTraceAll(api.PutBucketPolicyHandler)))).Queries("policy", "")
// PutBucketObjectLockConfig
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketobjectlockconfig", httpTraceAll(api.PutBucketObjectLockConfigHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("object-lock", "")
maxClients(collectAPIStats("putbucketobjectlockconfig", httpTraceAll(api.PutBucketObjectLockConfigHandler)))).Queries("object-lock", "")
// PutBucketVersioning
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketversioning", httpTraceAll(api.PutBucketVersioningHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("versioning", "")
maxClients(collectAPIStats("putbucketversioning", httpTraceAll(api.PutBucketVersioningHandler)))).Queries("versioning", "")
// PutBucketNotification
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketnotification", httpTraceAll(api.PutBucketNotificationHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("notification", "")
maxClients(collectAPIStats("putbucketnotification", httpTraceAll(api.PutBucketNotificationHandler)))).Queries("notification", "")
// PutBucket
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucket", httpTraceAll(api.PutBucketHandler)), enabled, requestsMaxCh, requestsDeadline))
maxClients(collectAPIStats("putbucket", httpTraceAll(api.PutBucketHandler))))
// HeadBucket
bucket.Methods(http.MethodHead).HandlerFunc(
maxClients(collectAPIStats("headbucket", httpTraceAll(api.HeadBucketHandler)), enabled, requestsMaxCh, requestsDeadline))
maxClients(collectAPIStats("headbucket", httpTraceAll(api.HeadBucketHandler))))
// PostPolicy
bucket.Methods(http.MethodPost).HeadersRegexp(xhttp.ContentType, "multipart/form-data*").HandlerFunc(
maxClients(collectAPIStats("postpolicybucket", httpTraceHdrs(api.PostPolicyBucketHandler)), enabled, requestsMaxCh, requestsDeadline))
maxClients(collectAPIStats("postpolicybucket", httpTraceHdrs(api.PostPolicyBucketHandler))))
// DeleteMultipleObjects
bucket.Methods(http.MethodPost).HandlerFunc(
maxClients(collectAPIStats("deletemultipleobjects", httpTraceAll(api.DeleteMultipleObjectsHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("delete", "")
maxClients(collectAPIStats("deletemultipleobjects", httpTraceAll(api.DeleteMultipleObjectsHandler)))).Queries("delete", "")
// DeleteBucketPolicy
bucket.Methods(http.MethodDelete).HandlerFunc(
maxClients(collectAPIStats("deletebucketpolicy", httpTraceAll(api.DeleteBucketPolicyHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("policy", "")
maxClients(collectAPIStats("deletebucketpolicy", httpTraceAll(api.DeleteBucketPolicyHandler)))).Queries("policy", "")
// DeleteBucketLifecycle
bucket.Methods(http.MethodDelete).HandlerFunc(
maxClients(collectAPIStats("deletebucketlifecycle", httpTraceAll(api.DeleteBucketLifecycleHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("lifecycle", "")
maxClients(collectAPIStats("deletebucketlifecycle", httpTraceAll(api.DeleteBucketLifecycleHandler)))).Queries("lifecycle", "")
// DeleteBucketEncryption
bucket.Methods(http.MethodDelete).HandlerFunc(
maxClients(collectAPIStats("deletebucketencryption", httpTraceAll(api.DeleteBucketEncryptionHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("encryption", "")
maxClients(collectAPIStats("deletebucketencryption", httpTraceAll(api.DeleteBucketEncryptionHandler)))).Queries("encryption", "")
// DeleteBucket
bucket.Methods(http.MethodDelete).HandlerFunc(
maxClients(collectAPIStats("deletebucket", httpTraceAll(api.DeleteBucketHandler)), enabled, requestsMaxCh, requestsDeadline))
maxClients(collectAPIStats("deletebucket", httpTraceAll(api.DeleteBucketHandler))))
}
/// Root operation
// ListBuckets
apiRouter.Methods(http.MethodGet).Path(SlashSeparator).HandlerFunc(
maxClients(collectAPIStats("listbuckets", httpTraceAll(api.ListBucketsHandler)), enabled, requestsMaxCh, requestsDeadline))
maxClients(collectAPIStats("listbuckets", httpTraceAll(api.ListBucketsHandler))))
// If none of the routes match add default error handler routes
apiRouter.NotFoundHandler = http.HandlerFunc(collectAPIStats("notfound", httpTraceAll(errorResponseHandler)))

View file

@ -22,6 +22,7 @@ import (
"sync"
"github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/config/api"
"github.com/minio/minio/cmd/config/cache"
"github.com/minio/minio/cmd/config/compress"
"github.com/minio/minio/cmd/config/etcd"
@ -48,6 +49,7 @@ func initHelp() {
config.IdentityOpenIDSubSys: openid.DefaultKVS,
config.PolicyOPASubSys: opa.DefaultKVS,
config.RegionSubSys: config.DefaultRegionKVS,
config.APISubSys: api.DefaultKVS,
config.CredentialsSubSys: config.DefaultCredentialKVS,
config.KmsVaultSubSys: crypto.DefaultVaultKVS,
config.KmsKesSubSys: crypto.DefaultKesKVS,
@ -100,6 +102,10 @@ func initHelp() {
Key: config.KmsKesSubSys,
Description: "enable external MinIO key encryption service",
},
config.HelpKV{
Key: config.APISubSys,
Description: "manage global HTTP API call specific features, such as throttling, authentication types, etc.",
},
config.HelpKV{
Key: config.LoggerWebhookSubSys,
Description: "send server logs to webhook endpoints",
@ -174,6 +180,7 @@ func initHelp() {
var helpMap = map[string]config.HelpKVS{
"": helpSubSys, // Help for all sub-systems.
config.RegionSubSys: config.RegionHelp,
config.APISubSys: api.Help,
config.StorageClassSubSys: storageclass.Help,
config.EtcdSubSys: etcd.Help,
config.CacheSubSys: cache.Help,
@ -221,6 +228,10 @@ func validateConfig(s config.Config) error {
return err
}
if _, err := api.LookupConfig(s[config.APISubSys][config.Default]); err != nil {
return err
}
if globalIsXL {
if _, err := storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default],
globalXLSetDriveCount); err != nil {
@ -349,6 +360,18 @@ func lookupConfigs(s config.Config) {
logger.LogIf(ctx, fmt.Errorf("Invalid region configuration: %w", err))
}
apiConfig, err := api.LookupConfig(s[config.APISubSys][config.Default])
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Invalid api configuration: %w", err))
}
apiRequestsMax := apiConfig.APIRequestsMax
if len(globalEndpoints.Hosts()) > 0 {
apiRequestsMax /= len(globalEndpoints.Hosts())
}
globalAPIThrottling.init(apiRequestsMax, apiConfig.APIRequestsDeadline)
if globalIsXL {
globalStorageClass, err = storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default],
globalXLSetDriveCount)

91
cmd/config/api/api.go Normal file
View file

@ -0,0 +1,91 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package api
import (
"encoding/json"
"errors"
"strconv"
"time"
"github.com/minio/minio/cmd/config"
"github.com/minio/minio/pkg/env"
)
const (
apiRequestsMax = "requests_max"
apiRequestsDeadline = "requests_deadline"
)
// DefaultKVS - default storage class config
var (
DefaultKVS = config.KVS{
config.KV{
Key: apiRequestsMax,
Value: "0",
},
config.KV{
Key: apiRequestsDeadline,
Value: "10s",
},
}
)
// Config storage class configuration
type Config struct {
APIRequestsMax int `json:"requests_max"`
APIRequestsDeadline time.Duration `json:"requests_deadline"`
}
// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON.
func (sCfg *Config) UnmarshalJSON(data []byte) error {
type Alias Config
aux := &struct {
*Alias
}{
Alias: (*Alias)(sCfg),
}
return json.Unmarshal(data, &aux)
}
// LookupConfig - lookup api config and override with valid environment settings if any.
func LookupConfig(kvs config.KVS) (cfg Config, err error) {
if err = config.CheckValidKeys(config.APISubSys, kvs, DefaultKVS); err != nil {
return cfg, err
}
// Check environment variables parameters
requestsMax, err := strconv.Atoi(env.Get(config.EnvAPIRequestsMax, kvs.Get(apiRequestsMax)))
if err != nil {
return cfg, err
}
if requestsMax < 0 {
return cfg, errors.New("invalid API max requests value")
}
requestsDeadline, err := time.ParseDuration(env.Get(config.EnvAPIRequestsDeadline, kvs.Get(apiRequestsDeadline)))
if err != nil {
return cfg, err
}
cfg = Config{
APIRequestsMax: requestsMax,
APIRequestsDeadline: requestsDeadline,
}
return cfg, nil
}

37
cmd/config/api/help.go Normal file
View file

@ -0,0 +1,37 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package api
import "github.com/minio/minio/cmd/config"
// Help template for storageclass feature.
var (
Help = config.HelpKVS{
config.HelpKV{
Key: apiRequestsMax,
Description: `set the maximum number of concurrent requests, e.g. "1600"`,
Optional: true,
Type: "number",
},
config.HelpKV{
Key: apiRequestsDeadline,
Description: `set the deadline for API requests waiting to be processed e.g. "1m"`,
Optional: true,
Type: "duration",
},
}
)

View file

@ -71,6 +71,7 @@ const (
RegionSubSys = "region"
EtcdSubSys = "etcd"
StorageClassSubSys = "storage_class"
APISubSys = "api"
CompressionSubSys = "compression"
KmsVaultSubSys = "kms_vault"
KmsKesSubSys = "kms_kes"
@ -102,6 +103,7 @@ var SubSystems = set.CreateStringSet([]string{
RegionSubSys,
EtcdSubSys,
CacheSubSys,
APISubSys,
StorageClassSubSys,
CompressionSubSys,
KmsVaultSubSys,
@ -129,6 +131,7 @@ var SubSystemsSingleTargets = set.CreateStringSet([]string{
RegionSubSys,
EtcdSubSys,
CacheSubSys,
APISubSys,
StorageClassSubSys,
CompressionSubSys,
KmsVaultSubSys,

View file

@ -153,6 +153,10 @@ var (
globalLifecycleSys *LifecycleSys
globalBucketSSEConfigSys *BucketSSEConfigSys
// globalAPIThrottling controls S3 requests throttling when
// enabled in the config or in the shell environment.
globalAPIThrottling apiThrottling
globalStorageClass storageclass.Config
globalLDAPConfig xldap.Config
globalOpenIDConfig openid.Config

View file

@ -28,7 +28,6 @@ import (
"net/url"
"regexp"
"strings"
"time"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
@ -359,31 +358,6 @@ func httpTraceHdrs(f http.HandlerFunc) http.HandlerFunc {
}
}
// maxClients throttles the S3 API calls
func maxClients(f http.HandlerFunc, enabled bool, requestsMaxCh chan struct{}, requestsDeadline time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !enabled {
f.ServeHTTP(w, r)
return
}
select {
case requestsMaxCh <- struct{}{}:
defer func() { <-requestsMaxCh }()
f.ServeHTTP(w, r)
case <-time.NewTimer(requestsDeadline).C:
// Send a http timeout message
writeErrorResponse(r.Context(), w,
errorCodes.ToAPIErr(ErrOperationMaxedOut),
r.URL, guessIsBrowserReq(r))
return
case <-r.Context().Done():
return
}
}
}
func collectAPIStats(api string, f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {

80
cmd/throttling.go Normal file
View file

@ -0,0 +1,80 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"net/http"
"sync"
"time"
)
type apiThrottling struct {
mu sync.RWMutex
enabled bool
requestsDeadline time.Duration
requestsPool chan struct{}
}
func (t *apiThrottling) init(max int, deadline time.Duration) {
if max <= 0 {
return
}
t.mu.Lock()
defer t.mu.Unlock()
t.requestsPool = make(chan struct{}, max)
t.requestsDeadline = deadline
t.enabled = true
}
func (t *apiThrottling) get() (chan struct{}, <-chan time.Time) {
t.mu.RLock()
defer t.mu.RUnlock()
if !t.enabled {
return nil, nil
}
return t.requestsPool, time.NewTimer(t.requestsDeadline).C
}
// maxClients throttles the S3 API calls
func maxClients(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
pool, deadlineTimer := globalAPIThrottling.get()
if pool == nil {
f.ServeHTTP(w, r)
return
}
select {
case pool <- struct{}{}:
defer func() { <-pool }()
f.ServeHTTP(w, r)
case <-deadlineTimer:
// Send a http timeout message
writeErrorResponse(r.Context(), w,
errorCodes.ToAPIErr(ErrOperationMaxedOut),
r.URL, guessIsBrowserReq(r))
return
case <-r.Context().Done():
return
}
}
}

View file

@ -171,6 +171,27 @@ MINIO_ETCD_CLIENT_CERT_KEY (path) client cert key for mTLS authentication
MINIO_ETCD_COMMENT (sentence) optionally add a comment to this setting
```
### API
By default, there is no limitation on the number of concurrents requests that a server/cluster processes at the same time. However, it is possible to impose such limitation using the API subsystem. Read more about throttling limitation in MinIO server [here](https://github.com/minio/minio/blob/master/docs/throttle/README.md).
```
KEY:
api manage global HTTP API call specific features, such as throttling, authentication types, etc.
ARGS:
requests_max (number) set the maximum number of concurrent requests
requests_deadline (duration) set the deadline for API requests waiting to be processed
```
or environment variables
```
MINIO_API_REQUESTS_MAX (number) set the maximum number of concurrent requests
MINIO_API_REQUESTS_DEADLINE (duration) set the deadline for API requests waiting to be processed
```
#### Notifications
Notification targets supported by MinIO are in the following list. To configure individual targets please refer to more detailed documentation [here](https://docs.min.io/docs/minio-bucket-notification-guide.html)

View file

@ -5,12 +5,14 @@ MinIO server allows to throttle incoming requests:
- limit the number of active requests allowed across the cluster
- limit the wait duration for each request in the queue
These values are enabled using environment variables *only*.
These values are enabled using server's configuration or environment variables.
## Configuring connection limit
## Examples
### Configuring connection limit
If you have traditional spinning (hdd) drives, some applications with high concurrency might require MinIO cluster to be tuned such that to avoid random I/O on the drives. The way to convert high concurrent I/O into a sequential I/O is by reducing the number of concurrent operations allowed per cluster. This allows MinIO cluster to be operationally resilient to such workloads, while also making sure the drives are at optimal efficiency and responsive.
Example: Limit a MinIO cluster to accept at max 1600 simultaneous S3 API requests across 8 servers.
Example: Limit a MinIO cluster to accept at max 1600 simultaneous S3 API requests across all nodes of the cluster.
```sh
export MINIO_API_REQUESTS_MAX=1600
export MINIO_ACCESS_KEY=your-access-key
@ -18,14 +20,22 @@ export MINIO_SECRET_KEY=your-secret-key
minio server http://server{1...8}/mnt/hdd{1...16}
```
> NOTE: Setting MINIO_API_REQUESTS_MAX=0 means unlimited and that is the default behavior. These values need to be set based on your deployment requirements and application.
or
## Configuring connection (wait) deadline
```sh
mc admin config set myminio/ api requests_max=1600
mc admin service restart myminio/
```
> NOTE: A zero value of `requests_max` means unlimited and that is the default behavior.
### Configuring connection (wait) deadline
This value works in conjunction with max connection setting, setting this value allows for long waiting requests to quickly time out when there is no slot available to perform the request.
This will reduce the pileup of waiting requests when clients are not configured with timeouts. Default wait time is *10 seconds* if *MINIO_API_REQUESTS_MAX* is enabled. This may need to be tuned to your application needs.
Example: Limit a MinIO cluster to accept at max 1600 simultaneous S3 API requests across 8 servers, and set the wait deadline of *2 minutes* per API operation.
```sh
export MINIO_API_REQUESTS_MAX=1600
export MINIO_API_REQUESTS_DEADLINE=2m
@ -33,3 +43,11 @@ export MINIO_ACCESS_KEY=your-access-key
export MINIO_SECRET_KEY=your-secret-key
minio server http://server{1...8}/mnt/hdd{1...16}
```
or
```sh
mc admin config set myminio/ api requests_max=1600 requests_deadline=2m
mc admin service restart myminio/
```