From e316873f8477de72ec2c4a1c3f5fcacc6aae42ca Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 13 Jul 2021 09:39:13 -0700 Subject: [PATCH] feat: Add support for kakfa audit logger target (#12678) --- cmd/config-current.go | 54 ++-- cmd/config-migrate.go | 7 +- cmd/consolelogger.go | 4 +- cmd/data-update-tracker_test.go | 2 +- docs/logging/README.md | 81 +++++- internal/config/config.go | 19 ++ internal/config/notify/parse.go | 35 +-- internal/logger/config.go | 272 +++++++++++++++--- internal/logger/help.go | 98 ++++++- internal/logger/legacy.go | 9 +- internal/logger/target/http/http.go | 134 +++------ internal/logger/target/kafka/kafka.go | 208 ++++++++++++++ .../kafka/kafka_scram_client_contrib.go | 84 ++++++ internal/logger/targets.go | 6 +- 14 files changed, 811 insertions(+), 202 deletions(-) create mode 100644 internal/logger/target/kafka/kafka.go create mode 100644 internal/logger/target/kafka/kafka_scram_client_contrib.go diff --git a/cmd/config-current.go b/cmd/config-current.go index 08c7c73b6..23366bbcc 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -43,6 +43,7 @@ import ( "github.com/minio/minio/internal/kms" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger/target/http" + "github.com/minio/minio/internal/logger/target/kafka" "github.com/minio/pkg/env" ) @@ -58,7 +59,8 @@ func initHelp() { config.APISubSys: api.DefaultKVS, config.CredentialsSubSys: config.DefaultCredentialKVS, config.LoggerWebhookSubSys: logger.DefaultKVS, - config.AuditWebhookSubSys: logger.DefaultAuditKVS, + config.AuditWebhookSubSys: logger.DefaultAuditWebhookKVS, + config.AuditKafkaSubSys: logger.DefaultAuditKafkaKVS, config.HealSubSys: heal.DefaultKVS, config.ScannerSubSys: scanner.DefaultKVS, } @@ -122,6 +124,11 @@ func initHelp() { Description: "send audit logs to webhook endpoints", MultipleTargets: true, }, + config.HelpKV{ + Key: config.AuditKafkaSubSys, + Description: "send audit logs to kafka endpoints", + MultipleTargets: true, + }, config.HelpKV{ Key: config.NotifyWebhookSubSys, Description: "publish bucket notifications to webhook endpoints", @@ -197,7 +204,8 @@ func initHelp() { config.IdentityLDAPSubSys: xldap.Help, config.PolicyOPASubSys: opa.Help, config.LoggerWebhookSubSys: logger.Help, - config.AuditWebhookSubSys: logger.HelpAudit, + config.AuditWebhookSubSys: logger.HelpWebhook, + config.AuditKafkaSubSys: logger.HelpKafka, config.NotifyAMQPSubSys: notify.HelpAMQP, config.NotifyKafkaSubSys: notify.HelpKafka, config.NotifyMQTTSubSys: notify.HelpMQTT, @@ -478,42 +486,40 @@ func lookupConfigs(s config.Config, setDriveCounts []int) { logger.LogIf(ctx, fmt.Errorf("Unable to initialize logger: %w", err)) } - for k, l := range loggerCfg.HTTP { + for _, l := range loggerCfg.HTTP { if l.Enabled { + l.LogOnce = logger.LogOnceIf + l.UserAgent = loggerUserAgent + l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) // Enable http logging - if err = logger.AddTarget( - http.New( - http.WithTargetName(k), - http.WithEndpoint(l.Endpoint), - http.WithAuthToken(l.AuthToken), - http.WithUserAgent(loggerUserAgent), - http.WithLogKind(string(logger.All)), - http.WithTransport(NewGatewayHTTPTransport()), - ), - ); err != nil { + if err = logger.AddTarget(http.New(l)); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to initialize console HTTP target: %w", err)) } } } - for k, l := range loggerCfg.Audit { + for _, l := range loggerCfg.AuditWebhook { if l.Enabled { + l.LogOnce = logger.LogOnceIf + l.UserAgent = loggerUserAgent + l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) // Enable http audit logging - if err = logger.AddAuditTarget( - http.New( - http.WithTargetName(k), - http.WithEndpoint(l.Endpoint), - http.WithAuthToken(l.AuthToken), - http.WithUserAgent(loggerUserAgent), - http.WithLogKind(string(logger.All)), - http.WithTransport(NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey)), - ), - ); err != nil { + if err = logger.AddAuditTarget(http.New(l)); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to initialize audit HTTP target: %w", err)) } } } + for _, l := range loggerCfg.AuditKafka { + if l.Enabled { + l.LogOnce = logger.LogOnceIf + // Enable Kafka audit logging + if err = logger.AddAuditTarget(kafka.New(l)); err != nil { + logger.LogIf(ctx, fmt.Errorf("Unable to initialize audit Kafka target: %w", err)) + } + } + } + globalConfigTargetList, err = notify.GetNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), false) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to initialize notification target(s): %w", err)) diff --git a/cmd/config-migrate.go b/cmd/config-migrate.go index 9045a40c3..0e0231291 100644 --- a/cmd/config-migrate.go +++ b/cmd/config-migrate.go @@ -41,6 +41,7 @@ import ( "github.com/minio/minio/internal/event/target" "github.com/minio/minio/internal/kms" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/logger/target/http" xnet "github.com/minio/pkg/net" "github.com/minio/pkg/quick" ) @@ -2383,8 +2384,8 @@ func migrateV26ToV27() error { // Enable console logging by default to avoid breaking users // current deployments srvConfig.Logger.Console.Enabled = true - srvConfig.Logger.HTTP = make(map[string]logger.HTTP) - srvConfig.Logger.HTTP["1"] = logger.HTTP{} + srvConfig.Logger.HTTP = make(map[string]http.Config) + srvConfig.Logger.HTTP["1"] = http.Config{} if err = quick.SaveConfig(srvConfig, configFile, globalEtcdClient); err != nil { return fmt.Errorf("Failed to migrate config from ‘26’ to ‘27’. %w", err) @@ -2748,7 +2749,7 @@ func migrateMinioSysConfigToKV(objAPI ObjectLayer) error { for k, loggerArgs := range cfg.Logger.HTTP { logger.SetLoggerHTTP(newCfg, k, loggerArgs) } - for k, auditArgs := range cfg.Logger.Audit { + for k, auditArgs := range cfg.Logger.AuditWebhook { logger.SetLoggerHTTPAudit(newCfg, k, auditArgs) } diff --git a/cmd/consolelogger.go b/cmd/consolelogger.go index 5eed65eaa..04f54c8f2 100644 --- a/cmd/consolelogger.go +++ b/cmd/consolelogger.go @@ -117,8 +117,8 @@ func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh <-chan sys.pubsub.Subscribe(subCh, doneCh, filter) } -// Validate if HTTPConsoleLoggerSys is valid, always returns nil right now -func (sys *HTTPConsoleLoggerSys) Validate() error { +// Init if HTTPConsoleLoggerSys is valid, always returns nil right now +func (sys *HTTPConsoleLoggerSys) Init() error { return nil } diff --git a/cmd/data-update-tracker_test.go b/cmd/data-update-tracker_test.go index 4868bd3de..e4b5a032e 100644 --- a/cmd/data-update-tracker_test.go +++ b/cmd/data-update-tracker_test.go @@ -51,7 +51,7 @@ func (t *testingLogger) String() string { return "" } -func (t *testingLogger) Validate() error { +func (t *testingLogger) Init() error { return nil } diff --git a/docs/logging/README.md b/docs/logging/README.md index 8988a2b41..adcbabca9 100644 --- a/docs/logging/README.md +++ b/docs/logging/README.md @@ -36,9 +36,11 @@ minio server /mnt/data ## Audit Targets Assuming `mc` is already [configured](https://docs.min.io/docs/minio-client-quickstart-guide.html) + +### HTTP Target ``` mc admin config get myminio/ audit_webhook -audit_webhook:name1 enable=off endpoint= auth_token= client_cert= client_key= +audit_webhook:name1 enable=off endpoint= auth_token= client_cert= client_key= ``` ``` @@ -119,6 +121,83 @@ NOTE: } ``` +### Kafka Target +Assuming that you already have Apache Kafka configured and running. +``` +mc admin config set myminio/ audit_kafka +KEY: +audit_kafka[:name] send audit logs to kafka endpoints + +ARGS: +brokers* (csv) comma separated list of Kafka broker addresses +topic (string) Kafka topic used for bucket notifications +sasl_username (string) username for SASL/PLAIN or SASL/SCRAM authentication +sasl_password (string) password for SASL/PLAIN or SASL/SCRAM authentication +sasl_mechanism (string) sasl authentication mechanism, default 'plain' +tls_client_auth (string) clientAuth determines the Kafka server's policy for TLS client auth +sasl (on|off) set to 'on' to enable SASL authentication +tls (on|off) set to 'on' to enable TLS +tls_skip_verify (on|off) trust server TLS without verification, defaults to "on" (verify) +client_tls_cert (path) path to client certificate for mTLS auth +client_tls_key (path) path to client key for mTLS auth +version (string) specify the version of the Kafka cluster +comment (sentence) optionally add a comment to this setting +``` + +Configure MinIO to send audit logs to locally running Kafka brokers +``` +mc admin config set myminio/ audit_kafka:target1 brokers=localhost:29092 topic=auditlog +mc admin service restart myminio/ +``` + +On another terminal assuming you have `kafkacat` installed + +``` +kafkacat -b localhost:29092 -t auditlog -C + +{"version":"1","deploymentid":"8a1d8091-b874-45df-b9ea-e044eede6ace","time":"2021-07-13T02:00:47.020547414Z","trigger":"incoming","api":{"name":"ListBuckets","status":"OK","statusCode":200,"timeToFirstByte":"261795ns","timeToResponse":"312490ns"},"remotehost":"127.0.0.1","requestID":"16913736591C237F","userAgent":"MinIO (linux; amd64) minio-go/v7.0.11 mc/DEVELOPMENT.2021-07-09T02-22-26Z","requestHeader":{"Authorization":"AWS4-HMAC-SHA256 Credential=minio/20210713/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=7fe65c5467e05ca21de64094688da43f96f34fec82e8955612827079f4600527","User-Agent":"MinIO (linux; amd64) minio-go/v7.0.11 mc/DEVELOPMENT.2021-07-09T02-22-26Z","X-Amz-Content-Sha256":"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855","X-Amz-Date":"20210713T020047Z"},"responseHeader":{"Accept-Ranges":"bytes","Content-Length":"547","Content-Security-Policy":"block-all-mixed-content","Content-Type":"application/xml","Server":"MinIO","Vary":"Origin,Accept-Encoding","X-Amz-Request-Id":"16913736591C237F","X-Xss-Protection":"1; mode=block"}} +``` + +MinIO also honors environment variable for Kafka target Audit logging as shown below, this setting will override the endpoint settings in the MinIO server config. + +``` +mc admin config set myminio/ audit_kafka --env +KEY: +audit_kafka[:name] send audit logs to kafka endpoints + +ARGS: +MINIO_AUDIT_KAFKA_ENABLE* (on|off) enable audit_kafka target, default is 'off' +MINIO_AUDIT_KAFKA_BROKERS* (csv) comma separated list of Kafka broker addresses +MINIO_AUDIT_KAFKA_TOPIC (string) Kafka topic used for bucket notifications +MINIO_AUDIT_KAFKA_SASL_USERNAME (string) username for SASL/PLAIN or SASL/SCRAM authentication +MINIO_AUDIT_KAFKA_SASL_PASSWORD (string) password for SASL/PLAIN or SASL/SCRAM authentication +MINIO_AUDIT_KAFKA_SASL_MECHANISM (string) sasl authentication mechanism, default 'plain' +MINIO_AUDIT_KAFKA_TLS_CLIENT_AUTH (string) clientAuth determines the Kafka server's policy for TLS client auth +MINIO_AUDIT_KAFKA_SASL (on|off) set to 'on' to enable SASL authentication +MINIO_AUDIT_KAFKA_TLS (on|off) set to 'on' to enable TLS +MINIO_AUDIT_KAFKA_TLS_SKIP_VERIFY (on|off) trust server TLS without verification, defaults to "on" (verify) +MINIO_AUDIT_KAFKA_CLIENT_TLS_CERT (path) path to client certificate for mTLS auth +MINIO_AUDIT_KAFKA_CLIENT_TLS_KEY (path) path to client key for mTLS auth +MINIO_AUDIT_KAFKA_VERSION (string) specify the version of the Kafka cluster +MINIO_AUDIT_KAFKA_COMMENT (sentence) optionally add a comment to this setting +``` + +``` +export MINIO_AUDIT_KAFKA_ENABLE_target1="on" +export MINIO_AUDIT_KAFKA_BROKERS_target1="localhost:29092" +export MINIO_AUDIT_KAFKA_TOPIC_target1="auditlog" +minio server /mnt/data +``` + +Setting this environment variable automatically enables audit logging to the Kafka target. The audit logging is in JSON format as described below. + +NOTE: +- `timeToFirstByte` and `timeToResponse` will be expressed in Nanoseconds. +- Additionally in the case of the erasure coded setup `tags.objectErasureMap` provides per object details about + - Pool number the object operation was performed on. + - Set number the object operation was performed on. + - The list of disks participating in this operation belong to the set. + ## Explore Further * [MinIO Quickstart Guide](https://docs.min.io/docs/minio-quickstart-guide) * [Configure MinIO Server with TLS](https://docs.min.io/docs/how-to-secure-access-to-minio-server-with-tls) diff --git a/internal/config/config.go b/internal/config/config.go index 67b75286a..50a60d10c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -74,6 +74,7 @@ const ( CompressionSubSys = "compression" LoggerWebhookSubSys = "logger_webhook" AuditWebhookSubSys = "audit_webhook" + AuditKafkaSubSys = "audit_kafka" HealSubSys = "heal" ScannerSubSys = "scanner" CrawlerSubSys = "crawler" @@ -108,6 +109,7 @@ var SubSystems = set.CreateStringSet( CompressionSubSys, LoggerWebhookSubSys, AuditWebhookSubSys, + AuditKafkaSubSys, PolicyOPASubSys, IdentityLDAPSubSys, IdentityOpenIDSubSys, @@ -247,6 +249,23 @@ func (kvs KVS) String() string { return s.String() } +// Merge environment values with on disk KVS, environment values overrides +// anything on the disk. +func Merge(cfgKVS map[string]KVS, envname string, defaultKVS KVS) map[string]KVS { + newCfgKVS := make(map[string]KVS) + for _, e := range env.List(envname) { + tgt := strings.TrimPrefix(e, envname+Default) + if tgt == envname { + tgt = Default + } + newCfgKVS[tgt] = defaultKVS + } + for tgt, kv := range cfgKVS { + newCfgKVS[tgt] = kv + } + return newCfgKVS +} + // Set sets a value, if not sets a default value. func (kvs *KVS) Set(key, value string) { for i, kv := range *kvs { diff --git a/internal/config/notify/parse.go b/internal/config/notify/parse.go index cc2a8cb3f..e23cf4aaf 100644 --- a/internal/config/notify/parse.go +++ b/internal/config/notify/parse.go @@ -410,21 +410,6 @@ func checkValidNotificationKeys(cfg config.Config) error { return nil } -func mergeTargets(cfgTargets map[string]config.KVS, envname string, defaultKVS config.KVS) map[string]config.KVS { - newCfgTargets := make(map[string]config.KVS) - for _, e := range env.List(envname) { - tgt := strings.TrimPrefix(e, envname+config.Default) - if tgt == envname { - tgt = config.Default - } - newCfgTargets[tgt] = defaultKVS - } - for tgt, kv := range cfgTargets { - newCfgTargets[tgt] = kv - } - return newCfgTargets -} - // DefaultKakfaKVS - default KV for kafka target var ( DefaultKafkaKVS = config.KVS{ @@ -494,7 +479,7 @@ var ( // GetNotifyKafka - returns a map of registered notification 'kafka' targets func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs, error) { kafkaTargets := make(map[string]target.KafkaArgs) - for k, kv := range mergeTargets(kafkaKVS, target.EnvKafkaEnable, DefaultKafkaKVS) { + for k, kv := range config.Merge(kafkaKVS, target.EnvKafkaEnable, DefaultKafkaKVS) { enableEnv := target.EnvKafkaEnable if k != config.Default { enableEnv = enableEnv + config.Default + k @@ -675,7 +660,7 @@ var ( // GetNotifyMQTT - returns a map of registered notification 'mqtt' targets func GetNotifyMQTT(mqttKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[string]target.MQTTArgs, error) { mqttTargets := make(map[string]target.MQTTArgs) - for k, kv := range mergeTargets(mqttKVS, target.EnvMQTTEnable, DefaultMQTTKVS) { + for k, kv := range config.Merge(mqttKVS, target.EnvMQTTEnable, DefaultMQTTKVS) { enableEnv := target.EnvMQTTEnable if k != config.Default { enableEnv = enableEnv + config.Default + k @@ -818,7 +803,7 @@ var ( // GetNotifyMySQL - returns a map of registered notification 'mysql' targets func GetNotifyMySQL(mysqlKVS map[string]config.KVS) (map[string]target.MySQLArgs, error) { mysqlTargets := make(map[string]target.MySQLArgs) - for k, kv := range mergeTargets(mysqlKVS, target.EnvMySQLEnable, DefaultMySQLKVS) { + for k, kv := range config.Merge(mysqlKVS, target.EnvMySQLEnable, DefaultMySQLKVS) { enableEnv := target.EnvMySQLEnable if k != config.Default { enableEnv = enableEnv + config.Default + k @@ -969,7 +954,7 @@ var ( // GetNotifyNATS - returns a map of registered notification 'nats' targets func GetNotifyNATS(natsKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[string]target.NATSArgs, error) { natsTargets := make(map[string]target.NATSArgs) - for k, kv := range mergeTargets(natsKVS, target.EnvNATSEnable, DefaultNATSKVS) { + for k, kv := range config.Merge(natsKVS, target.EnvNATSEnable, DefaultNATSKVS) { enableEnv := target.EnvNATSEnable if k != config.Default { enableEnv = enableEnv + config.Default + k @@ -1157,7 +1142,7 @@ var ( // GetNotifyNSQ - returns a map of registered notification 'nsq' targets func GetNotifyNSQ(nsqKVS map[string]config.KVS) (map[string]target.NSQArgs, error) { nsqTargets := make(map[string]target.NSQArgs) - for k, kv := range mergeTargets(nsqKVS, target.EnvNSQEnable, DefaultNSQKVS) { + for k, kv := range config.Merge(nsqKVS, target.EnvNSQEnable, DefaultNSQKVS) { enableEnv := target.EnvNSQEnable if k != config.Default { enableEnv = enableEnv + config.Default + k @@ -1262,7 +1247,7 @@ var ( // GetNotifyPostgres - returns a map of registered notification 'postgres' targets func GetNotifyPostgres(postgresKVS map[string]config.KVS) (map[string]target.PostgreSQLArgs, error) { psqlTargets := make(map[string]target.PostgreSQLArgs) - for k, kv := range mergeTargets(postgresKVS, target.EnvPostgresEnable, DefaultPostgresKVS) { + for k, kv := range config.Merge(postgresKVS, target.EnvPostgresEnable, DefaultPostgresKVS) { enableEnv := target.EnvPostgresEnable if k != config.Default { enableEnv = enableEnv + config.Default + k @@ -1371,7 +1356,7 @@ var ( // GetNotifyRedis - returns a map of registered notification 'redis' targets func GetNotifyRedis(redisKVS map[string]config.KVS) (map[string]target.RedisArgs, error) { redisTargets := make(map[string]target.RedisArgs) - for k, kv := range mergeTargets(redisKVS, target.EnvRedisEnable, DefaultRedisKVS) { + for k, kv := range config.Merge(redisKVS, target.EnvRedisEnable, DefaultRedisKVS) { enableEnv := target.EnvRedisEnable if k != config.Default { enableEnv = enableEnv + config.Default + k @@ -1472,7 +1457,7 @@ var ( func GetNotifyWebhook(webhookKVS map[string]config.KVS, transport *http.Transport) ( map[string]target.WebhookArgs, error) { webhookTargets := make(map[string]target.WebhookArgs) - for k, kv := range mergeTargets(webhookKVS, target.EnvWebhookEnable, DefaultWebhookKVS) { + for k, kv := range config.Merge(webhookKVS, target.EnvWebhookEnable, DefaultWebhookKVS) { enableEnv := target.EnvWebhookEnable if k != config.Default { enableEnv = enableEnv + config.Default + k @@ -1577,7 +1562,7 @@ var ( // GetNotifyES - returns a map of registered notification 'elasticsearch' targets func GetNotifyES(esKVS map[string]config.KVS, transport *http.Transport) (map[string]target.ElasticsearchArgs, error) { esTargets := make(map[string]target.ElasticsearchArgs) - for k, kv := range mergeTargets(esKVS, target.EnvElasticEnable, DefaultESKVS) { + for k, kv := range config.Merge(esKVS, target.EnvElasticEnable, DefaultESKVS) { enableEnv := target.EnvElasticEnable if k != config.Default { enableEnv = enableEnv + config.Default + k @@ -1719,7 +1704,7 @@ var ( // GetNotifyAMQP - returns a map of registered notification 'amqp' targets func GetNotifyAMQP(amqpKVS map[string]config.KVS) (map[string]target.AMQPArgs, error) { amqpTargets := make(map[string]target.AMQPArgs) - for k, kv := range mergeTargets(amqpKVS, target.EnvAMQPEnable, DefaultAMQPKVS) { + for k, kv := range config.Merge(amqpKVS, target.EnvAMQPEnable, DefaultAMQPKVS) { enableEnv := target.EnvAMQPEnable if k != config.Default { enableEnv = enableEnv + config.Default + k diff --git a/internal/logger/config.go b/internal/logger/config.go index 300e43c8f..8a9508354 100644 --- a/internal/logger/config.go +++ b/internal/logger/config.go @@ -18,10 +18,16 @@ package logger import ( + "crypto/tls" + "strconv" "strings" - "github.com/minio/minio/internal/config" "github.com/minio/pkg/env" + xnet "github.com/minio/pkg/net" + + "github.com/minio/minio/internal/config" + "github.com/minio/minio/internal/logger/target/http" + "github.com/minio/minio/internal/logger/target/kafka" ) // Console logger target @@ -29,29 +35,26 @@ type Console struct { Enabled bool `json:"enabled"` } -// HTTP logger target -type HTTP struct { - Enabled bool `json:"enabled"` - Endpoint string `json:"endpoint"` - AuthToken string `json:"authToken"` - ClientCert string `json:"clientCert"` - ClientKey string `json:"clientKey"` -} - -// Config console and http logger targets -type Config struct { - Console Console `json:"console"` - HTTP map[string]HTTP `json:"http"` - Audit map[string]HTTP `json:"audit"` -} - -// HTTP endpoint logger +// Audit/Logger constants const ( Endpoint = "endpoint" AuthToken = "auth_token" ClientCert = "client_cert" ClientKey = "client_key" + KafkaBrokers = "brokers" + KafkaTopic = "topic" + KafkaTLS = "tls" + KafkaTLSSkipVerify = "tls_skip_verify" + KafkaTLSClientAuth = "tls_client_auth" + KafkaSASL = "sasl" + KafkaSASLUsername = "sasl_username" + KafkaSASLPassword = "sasl_password" + KafkaSASLMechanism = "sasl_mechanism" + KafkaClientTLSCert = "client_tls_cert" + KafkaClientTLSKey = "client_tls_key" + KafkaVersion = "version" + EnvLoggerWebhookEnable = "MINIO_LOGGER_WEBHOOK_ENABLE" EnvLoggerWebhookEndpoint = "MINIO_LOGGER_WEBHOOK_ENDPOINT" EnvLoggerWebhookAuthToken = "MINIO_LOGGER_WEBHOOK_AUTH_TOKEN" @@ -61,6 +64,20 @@ const ( EnvAuditWebhookAuthToken = "MINIO_AUDIT_WEBHOOK_AUTH_TOKEN" EnvAuditWebhookClientCert = "MINIO_AUDIT_WEBHOOK_CLIENT_CERT" EnvAuditWebhookClientKey = "MINIO_AUDIT_WEBHOOK_CLIENT_KEY" + + EnvKafkaEnable = "MINIO_AUDIT_KAFKA_ENABLE" + EnvKafkaBrokers = "MINIO_AUDIT_KAFKA_BROKERS" + EnvKafkaTopic = "MINIO_AUDIT_KAFKA_TOPIC" + EnvKafkaTLS = "MINIO_AUDIT_KAFKA_TLS" + EnvKafkaTLSSkipVerify = "MINIO_AUDIT_KAFKA_TLS_SKIP_VERIFY" + EnvKafkaTLSClientAuth = "MINIO_AUDIT_KAFKA_TLS_CLIENT_AUTH" + EnvKafkaSASLEnable = "MINIO_AUDIT_KAFKA_SASL" + EnvKafkaSASLUsername = "MINIO_AUDIT_KAFKA_SASL_USERNAME" + EnvKafkaSASLPassword = "MINIO_AUDIT_KAFKA_SASL_PASSWORD" + EnvKafkaSASLMechanism = "MINIO_AUDIT_KAFKA_SASL_MECHANISM" + EnvKafkaClientTLSCert = "MINIO_AUDIT_KAFKA_CLIENT_TLS_CERT" + EnvKafkaClientTLSKey = "MINIO_AUDIT_KAFKA_CLIENT_TLS_KEY" + EnvKafkaVersion = "MINIO_AUDIT_KAFKA_VERSION" ) // Default KVS for loggerHTTP and loggerAuditHTTP @@ -79,7 +96,8 @@ var ( Value: "", }, } - DefaultAuditKVS = config.KVS{ + + DefaultAuditWebhookKVS = config.KVS{ config.KV{ Key: config.Enable, Value: config.EnableOff, @@ -101,8 +119,71 @@ var ( Value: "", }, } + + DefaultAuditKafkaKVS = config.KVS{ + config.KV{ + Key: config.Enable, + Value: config.EnableOff, + }, + config.KV{ + Key: KafkaTopic, + Value: "", + }, + config.KV{ + Key: KafkaBrokers, + Value: "", + }, + config.KV{ + Key: KafkaSASLUsername, + Value: "", + }, + config.KV{ + Key: KafkaSASLPassword, + Value: "", + }, + config.KV{ + Key: KafkaSASLMechanism, + Value: "plain", + }, + config.KV{ + Key: KafkaClientTLSCert, + Value: "", + }, + config.KV{ + Key: KafkaClientTLSKey, + Value: "", + }, + config.KV{ + Key: KafkaTLSClientAuth, + Value: "0", + }, + config.KV{ + Key: KafkaSASL, + Value: config.EnableOff, + }, + config.KV{ + Key: KafkaTLS, + Value: config.EnableOff, + }, + config.KV{ + Key: KafkaTLSSkipVerify, + Value: config.EnableOff, + }, + config.KV{ + Key: KafkaVersion, + Value: "", + }, + } ) +// Config console and http logger targets +type Config struct { + Console Console `json:"console"` + HTTP map[string]http.Config `json:"http"` + AuditWebhook map[string]http.Config `json:"audit"` + AuditKafka map[string]kafka.Config `json:"audit_kafka"` +} + // NewConfig - initialize new logger config. func NewConfig() Config { cfg := Config{ @@ -110,18 +191,9 @@ func NewConfig() Config { Console: Console{ Enabled: true, }, - HTTP: make(map[string]HTTP), - Audit: make(map[string]HTTP), - } - - // Create an example HTTP logger - cfg.HTTP[config.Default] = HTTP{ - Endpoint: "https://username:password@example.com/api", - } - - // Create an example Audit logger - cfg.Audit[config.Default] = HTTP{ - Endpoint: "https://username:password@example.com/api/audit", + HTTP: make(map[string]http.Config), + AuditWebhook: make(map[string]http.Config), + AuditKafka: make(map[string]kafka.Config), } return cfg @@ -150,7 +222,7 @@ func lookupLegacyConfig() (Config, error) { if endpoint == "" { continue } - cfg.HTTP[target] = HTTP{ + cfg.HTTP[target] = http.Config{ Enabled: true, Endpoint: endpoint, } @@ -176,7 +248,7 @@ func lookupLegacyConfig() (Config, error) { if endpoint == "" { continue } - cfg.Audit[target] = HTTP{ + cfg.AuditWebhook[target] = http.Config{ Enabled: true, Endpoint: endpoint, } @@ -186,6 +258,121 @@ func lookupLegacyConfig() (Config, error) { } +// GetAuditKafka - returns a map of registered notification 'kafka' targets +func GetAuditKafka(kafkaKVS map[string]config.KVS) (map[string]kafka.Config, error) { + kafkaTargets := make(map[string]kafka.Config) + for k, kv := range config.Merge(kafkaKVS, EnvKafkaEnable, DefaultAuditKafkaKVS) { + enableEnv := EnvKafkaEnable + if k != config.Default { + enableEnv = enableEnv + config.Default + k + } + enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable))) + if err != nil { + return nil, err + } + if !enabled { + continue + } + var brokers []xnet.Host + brokersEnv := EnvKafkaBrokers + if k != config.Default { + brokersEnv = brokersEnv + config.Default + k + } + kafkaBrokers := env.Get(brokersEnv, kv.Get(KafkaBrokers)) + if len(kafkaBrokers) == 0 { + return nil, config.Errorf("kafka 'brokers' cannot be empty") + } + for _, s := range strings.Split(kafkaBrokers, config.ValueSeparator) { + var host *xnet.Host + host, err = xnet.ParseHost(s) + if err != nil { + break + } + brokers = append(brokers, *host) + } + if err != nil { + return nil, err + } + + clientAuthEnv := EnvKafkaTLSClientAuth + if k != config.Default { + clientAuthEnv = clientAuthEnv + config.Default + k + } + clientAuth, err := strconv.Atoi(env.Get(clientAuthEnv, kv.Get(KafkaTLSClientAuth))) + if err != nil { + return nil, err + } + + topicEnv := EnvKafkaTopic + if k != config.Default { + topicEnv = topicEnv + config.Default + k + } + + versionEnv := EnvKafkaVersion + if k != config.Default { + versionEnv = versionEnv + config.Default + k + } + + kafkaArgs := kafka.Config{ + Enabled: enabled, + Brokers: brokers, + Topic: env.Get(topicEnv, kv.Get(KafkaTopic)), + Version: env.Get(versionEnv, kv.Get(KafkaVersion)), + } + + tlsEnableEnv := EnvKafkaTLS + if k != config.Default { + tlsEnableEnv = tlsEnableEnv + config.Default + k + } + tlsSkipVerifyEnv := EnvKafkaTLSSkipVerify + if k != config.Default { + tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k + } + + tlsClientTLSCertEnv := EnvKafkaClientTLSCert + if k != config.Default { + tlsClientTLSCertEnv = tlsClientTLSCertEnv + config.Default + k + } + + tlsClientTLSKeyEnv := EnvKafkaClientTLSKey + if k != config.Default { + tlsClientTLSKeyEnv = tlsClientTLSKeyEnv + config.Default + k + } + + kafkaArgs.TLS.Enable = env.Get(tlsEnableEnv, kv.Get(KafkaTLS)) == config.EnableOn + kafkaArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(KafkaTLSSkipVerify)) == config.EnableOn + kafkaArgs.TLS.ClientAuth = tls.ClientAuthType(clientAuth) + + kafkaArgs.TLS.ClientTLSCert = env.Get(tlsClientTLSCertEnv, kv.Get(KafkaClientTLSCert)) + kafkaArgs.TLS.ClientTLSKey = env.Get(tlsClientTLSKeyEnv, kv.Get(KafkaClientTLSKey)) + + saslEnableEnv := EnvKafkaSASLEnable + if k != config.Default { + saslEnableEnv = saslEnableEnv + config.Default + k + } + saslUsernameEnv := EnvKafkaSASLUsername + if k != config.Default { + saslUsernameEnv = saslUsernameEnv + config.Default + k + } + saslPasswordEnv := EnvKafkaSASLPassword + if k != config.Default { + saslPasswordEnv = saslPasswordEnv + config.Default + k + } + saslMechanismEnv := EnvKafkaSASLMechanism + if k != config.Default { + saslMechanismEnv = saslMechanismEnv + config.Default + k + } + kafkaArgs.SASL.Enable = env.Get(saslEnableEnv, kv.Get(KafkaSASL)) == config.EnableOn + kafkaArgs.SASL.User = env.Get(saslUsernameEnv, kv.Get(KafkaSASLUsername)) + kafkaArgs.SASL.Password = env.Get(saslPasswordEnv, kv.Get(KafkaSASLPassword)) + kafkaArgs.SASL.Mechanism = env.Get(saslMechanismEnv, kv.Get(KafkaSASLMechanism)) + + kafkaTargets[k] = kafkaArgs + } + + return kafkaTargets, nil +} + // LookupConfig - lookup logger config, override with ENVs if set. func LookupConfig(scfg config.Config) (Config, error) { // Lookup for legacy environment variables first @@ -237,7 +424,7 @@ func LookupConfig(scfg config.Config) (Config, error) { if target != config.Default { authTokenEnv = EnvLoggerWebhookAuthToken + config.Default + target } - cfg.HTTP[target] = HTTP{ + cfg.HTTP[target] = http.Config{ Enabled: true, Endpoint: env.Get(endpointEnv, ""), AuthToken: env.Get(authTokenEnv, ""), @@ -245,7 +432,7 @@ func LookupConfig(scfg config.Config) (Config, error) { } for _, target := range loggerAuditTargets { - if v, ok := cfg.Audit[target]; ok && v.Enabled { + if v, ok := cfg.AuditWebhook[target]; ok && v.Enabled { // This target is already enabled using the // legacy environment variables, ignore. continue @@ -278,7 +465,7 @@ func LookupConfig(scfg config.Config) (Config, error) { if err != nil { return cfg, err } - cfg.Audit[target] = HTTP{ + cfg.AuditWebhook[target] = http.Config{ Enabled: true, Endpoint: env.Get(endpointEnv, ""), AuthToken: env.Get(authTokenEnv, ""), @@ -308,7 +495,7 @@ func LookupConfig(scfg config.Config) (Config, error) { if !enabled { continue } - cfg.HTTP[starget] = HTTP{ + cfg.HTTP[starget] = http.Config{ Enabled: true, Endpoint: kv.Get(Endpoint), AuthToken: kv.Get(AuthToken), @@ -316,7 +503,7 @@ func LookupConfig(scfg config.Config) (Config, error) { } for starget, kv := range scfg[config.AuditWebhookSubSys] { - if l, ok := cfg.Audit[starget]; ok && l.Enabled { + if l, ok := cfg.AuditWebhook[starget]; ok && l.Enabled { // Ignore this audit config since another target // with the same name is already loaded and enabled // in the shell environment. @@ -326,7 +513,7 @@ func LookupConfig(scfg config.Config) (Config, error) { if starget != config.Default { subSysTarget = config.AuditWebhookSubSys + config.SubSystemSeparator + starget } - if err := config.CheckValidKeys(subSysTarget, kv, DefaultAuditKVS); err != nil { + if err := config.CheckValidKeys(subSysTarget, kv, DefaultAuditWebhookKVS); err != nil { return cfg, err } enabled, err := config.ParseBool(kv.Get(config.Enable)) @@ -340,7 +527,7 @@ func LookupConfig(scfg config.Config) (Config, error) { if err != nil { return cfg, err } - cfg.Audit[starget] = HTTP{ + cfg.AuditWebhook[starget] = http.Config{ Enabled: true, Endpoint: kv.Get(Endpoint), AuthToken: kv.Get(AuthToken), @@ -349,5 +536,10 @@ func LookupConfig(scfg config.Config) (Config, error) { } } + cfg.AuditKafka, err = GetAuditKafka(scfg[config.AuditKafkaSubSys]) + if err != nil { + return cfg, err + } + return cfg, nil } diff --git a/internal/logger/help.go b/internal/logger/help.go index 99281cfba..dd9b1cc1b 100644 --- a/internal/logger/help.go +++ b/internal/logger/help.go @@ -45,7 +45,7 @@ var ( }, } - HelpAudit = config.HelpKVS{ + HelpWebhook = config.HelpKVS{ config.HelpKV{ Key: Endpoint, Description: `HTTP(s) endpoint e.g. "http://localhost:8080/minio/logs/audit"`, @@ -59,12 +59,6 @@ var ( Type: "string", Sensitive: true, }, - config.HelpKV{ - Key: config.Comment, - Description: config.DefaultComment, - Optional: true, - Type: "sentence", - }, config.HelpKV{ Key: ClientCert, Description: "mTLS certificate for Audit Webhook authentication", @@ -79,5 +73,95 @@ var ( Type: "string", Sensitive: true, }, + config.HelpKV{ + Key: config.Comment, + Description: config.DefaultComment, + Optional: true, + Type: "sentence", + }, + } + + HelpKafka = config.HelpKVS{ + config.HelpKV{ + Key: KafkaBrokers, + Description: "comma separated list of Kafka broker addresses", + Type: "csv", + }, + config.HelpKV{ + Key: KafkaTopic, + Description: "Kafka topic used for bucket notifications", + Optional: true, + Type: "string", + }, + config.HelpKV{ + Key: KafkaSASLUsername, + Description: "username for SASL/PLAIN or SASL/SCRAM authentication", + Optional: true, + Type: "string", + Sensitive: true, + }, + config.HelpKV{ + Key: KafkaSASLPassword, + Description: "password for SASL/PLAIN or SASL/SCRAM authentication", + Optional: true, + Type: "string", + Sensitive: true, + }, + config.HelpKV{ + Key: KafkaSASLMechanism, + Description: "sasl authentication mechanism, default 'plain'", + Optional: true, + Type: "string", + }, + config.HelpKV{ + Key: KafkaTLSClientAuth, + Description: "clientAuth determines the Kafka server's policy for TLS client auth", + Optional: true, + Type: "string", + }, + config.HelpKV{ + Key: KafkaSASL, + Description: "set to 'on' to enable SASL authentication", + Optional: true, + Type: "on|off", + }, + config.HelpKV{ + Key: KafkaTLS, + Description: "set to 'on' to enable TLS", + Optional: true, + Type: "on|off", + }, + config.HelpKV{ + Key: KafkaTLSSkipVerify, + Description: `trust server TLS without verification, defaults to "on" (verify)`, + Optional: true, + Type: "on|off", + }, + config.HelpKV{ + Key: KafkaClientTLSCert, + Description: "path to client certificate for mTLS auth", + Optional: true, + Type: "path", + Sensitive: true, + }, + config.HelpKV{ + Key: KafkaClientTLSKey, + Description: "path to client key for mTLS auth", + Optional: true, + Type: "path", + Sensitive: true, + }, + config.HelpKV{ + Key: KafkaVersion, + Description: "specify the version of the Kafka cluster", + Optional: true, + Type: "string", + }, + config.HelpKV{ + Key: config.Comment, + Description: config.DefaultComment, + Optional: true, + Type: "sentence", + }, } ) diff --git a/internal/logger/legacy.go b/internal/logger/legacy.go index 55a278c6e..e9b870c4a 100644 --- a/internal/logger/legacy.go +++ b/internal/logger/legacy.go @@ -17,7 +17,10 @@ package logger -import "github.com/minio/minio/internal/config" +import ( + "github.com/minio/minio/internal/config" + "github.com/minio/minio/internal/logger/target/http" +) // Legacy envs const ( @@ -26,7 +29,7 @@ const ( ) // SetLoggerHTTPAudit - helper for migrating older config to newer KV format. -func SetLoggerHTTPAudit(scfg config.Config, k string, args HTTP) { +func SetLoggerHTTPAudit(scfg config.Config, k string, args http.Config) { if !args.Enabled { // Do not enable audit targets, if not enabled return @@ -48,7 +51,7 @@ func SetLoggerHTTPAudit(scfg config.Config, k string, args HTTP) { } // SetLoggerHTTP helper for migrating older config to newer KV format. -func SetLoggerHTTP(scfg config.Config, k string, args HTTP) { +func SetLoggerHTTP(scfg config.Config, k string, args http.Config) { if !args.Enabled { // Do not enable logger http targets, if not enabled return diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 706e6226b..6084ae025 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -28,12 +28,26 @@ import ( "time" xhttp "github.com/minio/minio/internal/http" - "github.com/minio/minio/internal/logger" ) // Timeout for the webhook http call const webhookCallTimeout = 5 * time.Second +// Config http logger target +type Config struct { + Enabled bool `json:"enabled"` + Name string `json:"name"` + UserAgent string `json:"userAgent"` + Endpoint string `json:"endpoint"` + AuthToken string `json:"authToken"` + ClientCert string `json:"clientCert"` + ClientKey string `json:"clientKey"` + Transport http.RoundTripper `json:"-"` + + // Custom logger + LogOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) `json:"-"` +} + // Target implements logger.Target and sends the json // format of a log entry to the configured http endpoint. // An internal buffer of logs is maintained but when the @@ -43,32 +57,24 @@ type Target struct { // Channel of log entries logCh chan interface{} - name string - // HTTP(s) endpoint - endpoint string - // Authorization token for `endpoint` - authToken string - // User-Agent to be set on each log to `endpoint` - userAgent string - logKind string - client http.Client + config Config } // Endpoint returns the backend endpoint func (h *Target) Endpoint() string { - return h.endpoint + return h.config.Endpoint } func (h *Target) String() string { - return h.name + return h.config.Name } -// Validate validate the http target -func (h *Target) Validate() error { +// Init validate and initialize the http target +func (h *Target) Init() error { ctx, cancel := context.WithTimeout(context.Background(), 2*webhookCallTimeout) defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.endpoint, strings.NewReader(`{}`)) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.config.Endpoint, strings.NewReader(`{}`)) if err != nil { return err } @@ -77,13 +83,14 @@ func (h *Target) Validate() error { // Set user-agent to indicate MinIO release // version to the configured log endpoint - req.Header.Set("User-Agent", h.userAgent) + req.Header.Set("User-Agent", h.config.UserAgent) - if h.authToken != "" { - req.Header.Set("Authorization", h.authToken) + if h.config.AuthToken != "" { + req.Header.Set("Authorization", h.config.AuthToken) } - resp, err := h.client.Do(req) + client := http.Client{Transport: h.config.Transport} + resp, err := client.Do(req) if err != nil { return err } @@ -95,12 +102,13 @@ func (h *Target) Validate() error { switch resp.StatusCode { case http.StatusForbidden: return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", - h.endpoint, resp.Status) + h.config.Endpoint, resp.Status) } return fmt.Errorf("%s returned '%s', please check your endpoint configuration", - h.endpoint, resp.Status) + h.config.Endpoint, resp.Status) } + go h.startHTTPLogger() return nil } @@ -116,7 +124,7 @@ func (h *Target) startHTTPLogger() { ctx, cancel := context.WithTimeout(context.Background(), webhookCallTimeout) req, err := http.NewRequestWithContext(ctx, http.MethodPost, - h.endpoint, bytes.NewReader(logJSON)) + h.config.Endpoint, bytes.NewReader(logJSON)) if err != nil { cancel() continue @@ -125,17 +133,17 @@ func (h *Target) startHTTPLogger() { // Set user-agent to indicate MinIO release // version to the configured log endpoint - req.Header.Set("User-Agent", h.userAgent) + req.Header.Set("User-Agent", h.config.UserAgent) - if h.authToken != "" { - req.Header.Set("Authorization", h.authToken) + if h.config.AuthToken != "" { + req.Header.Set("Authorization", h.config.AuthToken) } - resp, err := h.client.Do(req) + client := http.Client{Transport: h.config.Transport} + resp, err := client.Do(req) cancel() if err != nil { - logger.LogOnceIf(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", - h.endpoint, err), h.endpoint) + h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint) continue } @@ -145,88 +153,28 @@ func (h *Target) startHTTPLogger() { if resp.StatusCode != http.StatusOK { switch resp.StatusCode { case http.StatusForbidden: - logger.LogOnceIf(ctx, fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", - h.endpoint, resp.Status), h.endpoint) + h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.config.Endpoint, resp.Status), h.config.Endpoint) default: - logger.LogOnceIf(ctx, fmt.Errorf("%s returned '%s', please check your endpoint configuration", - h.endpoint, resp.Status), h.endpoint) + h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.config.Endpoint, resp.Status), h.config.Endpoint) } } } }() } -// Option is a function type that accepts a pointer Target -type Option func(*Target) - -// WithTargetName target name -func WithTargetName(name string) Option { - return func(t *Target) { - t.name = name - } -} - -// WithEndpoint adds a new endpoint -func WithEndpoint(endpoint string) Option { - return func(t *Target) { - t.endpoint = endpoint - } -} - -// WithLogKind adds a log type for this target -func WithLogKind(logKind string) Option { - return func(t *Target) { - t.logKind = strings.ToUpper(logKind) - } -} - -// WithUserAgent adds a custom user-agent sent to the target. -func WithUserAgent(userAgent string) Option { - return func(t *Target) { - t.userAgent = userAgent - } -} - -// WithAuthToken adds a new authorization header to be sent to target. -func WithAuthToken(authToken string) Option { - return func(t *Target) { - t.authToken = authToken - } -} - -// WithTransport adds a custom transport with custom timeouts and tuning. -func WithTransport(transport *http.Transport) Option { - return func(t *Target) { - t.client = http.Client{ - Transport: transport, - } - } -} - // New initializes a new logger target which // sends log over http to the specified endpoint -func New(opts ...Option) *Target { +func New(config Config) *Target { h := &Target{ - logCh: make(chan interface{}, 10000), + logCh: make(chan interface{}, 10000), + config: config, } - // Loop through each option - for _, opt := range opts { - // Call the option giving the instantiated - // *Target as the argument - opt(h) - } - - h.startHTTPLogger() return h } // Send log message 'e' to http target. func (h *Target) Send(entry interface{}, errKind string) error { - if h.logKind != errKind && h.logKind != "ALL" { - return nil - } - select { case h.logCh <- entry: default: diff --git a/internal/logger/target/kafka/kafka.go b/internal/logger/target/kafka/kafka.go new file mode 100644 index 000000000..52d42da17 --- /dev/null +++ b/internal/logger/target/kafka/kafka.go @@ -0,0 +1,208 @@ +// Copyright (c) 2015-2021 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package kafka + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "errors" + "net" + + sarama "github.com/Shopify/sarama" + saramatls "github.com/Shopify/sarama/tools/tls" + + "github.com/minio/minio/internal/logger/message/audit" + xnet "github.com/minio/pkg/net" +) + +// Target - Kafka target. +type Target struct { + // Channel of log entries + logCh chan interface{} + + producer sarama.SyncProducer + kconfig Config + config *sarama.Config +} + +// Send log message 'e' to kafka target. +func (h *Target) Send(entry interface{}, errKind string) error { + select { + case h.logCh <- entry: + default: + // log channel is full, do not wait and return + // an error immediately to the caller + return errors.New("log buffer full") + } + + return nil +} + +func (h *Target) startKakfaLogger() { + // Create a routine which sends json logs received + // from an internal channel. + go func() { + for entry := range h.logCh { + logJSON, err := json.Marshal(&entry) + if err != nil { + continue + } + + ae, ok := entry.(audit.Entry) + if ok { + msg := sarama.ProducerMessage{ + Topic: h.kconfig.Topic, + Key: sarama.StringEncoder(ae.RequestID), + Value: sarama.ByteEncoder(logJSON), + } + + _, _, err = h.producer.SendMessage(&msg) + if err != nil { + h.kconfig.LogOnce(context.Background(), err, h.kconfig.Topic) + continue + } + } + } + }() +} + +// Config - kafka target arguments. +type Config struct { + Enabled bool `json:"enable"` + Brokers []xnet.Host `json:"brokers"` + Topic string `json:"topic"` + Version string `json:"version"` + TLS struct { + Enable bool `json:"enable"` + RootCAs *x509.CertPool `json:"-"` + SkipVerify bool `json:"skipVerify"` + ClientAuth tls.ClientAuthType `json:"clientAuth"` + ClientTLSCert string `json:"clientTLSCert"` + ClientTLSKey string `json:"clientTLSKey"` + } `json:"tls"` + SASL struct { + Enable bool `json:"enable"` + User string `json:"username"` + Password string `json:"password"` + Mechanism string `json:"mechanism"` + } `json:"sasl"` + + // Custom logger + LogOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) `json:"-"` +} + +// Check if atleast one broker in cluster is active +func (k Config) pingBrokers() error { + var err error + for _, broker := range k.Brokers { + _, err1 := net.Dial("tcp", broker.String()) + if err1 != nil { + if err == nil { + // Set first error + err = err1 + } + } + } + return err +} + +// Endpoint - return kafka target +func (h *Target) Endpoint() string { + return "kafka" +} + +// String - kafka string +func (h *Target) String() string { + return "kafka" +} + +// Init initialize kafka target +func (h *Target) Init() error { + if !h.kconfig.Enabled { + return nil + } + if len(h.kconfig.Brokers) == 0 { + return errors.New("no broker address found") + } + for _, b := range h.kconfig.Brokers { + if _, err := xnet.ParseHost(b.String()); err != nil { + return err + } + } + if err := h.kconfig.pingBrokers(); err != nil { + return err + } + + sconfig := sarama.NewConfig() + if h.kconfig.Version != "" { + kafkaVersion, err := sarama.ParseKafkaVersion(h.kconfig.Version) + if err != nil { + return err + } + sconfig.Version = kafkaVersion + } + + sconfig.Net.SASL.User = h.kconfig.SASL.User + sconfig.Net.SASL.Password = h.kconfig.SASL.Password + initScramClient(h.kconfig, sconfig) // initializes configured scram client. + sconfig.Net.SASL.Enable = h.kconfig.SASL.Enable + + tlsConfig, err := saramatls.NewConfig(h.kconfig.TLS.ClientTLSCert, h.kconfig.TLS.ClientTLSKey) + if err != nil { + return err + } + + sconfig.Net.TLS.Enable = h.kconfig.TLS.Enable + sconfig.Net.TLS.Config = tlsConfig + sconfig.Net.TLS.Config.InsecureSkipVerify = h.kconfig.TLS.SkipVerify + sconfig.Net.TLS.Config.ClientAuth = h.kconfig.TLS.ClientAuth + sconfig.Net.TLS.Config.RootCAs = h.kconfig.TLS.RootCAs + + sconfig.Producer.RequiredAcks = sarama.WaitForAll + sconfig.Producer.Retry.Max = 10 + sconfig.Producer.Return.Successes = true + + h.config = sconfig + + var brokers []string + for _, broker := range h.kconfig.Brokers { + brokers = append(brokers, broker.String()) + } + + producer, err := sarama.NewSyncProducer(brokers, sconfig) + if err != nil { + return err + } + + h.producer = producer + + go h.startKakfaLogger() + return nil +} + +// New initializes a new logger target which +// sends log over http to the specified endpoint +func New(config Config) *Target { + target := &Target{ + logCh: make(chan interface{}, 10000), + kconfig: config, + } + return target +} diff --git a/internal/logger/target/kafka/kafka_scram_client_contrib.go b/internal/logger/target/kafka/kafka_scram_client_contrib.go new file mode 100644 index 000000000..e8e78e62a --- /dev/null +++ b/internal/logger/target/kafka/kafka_scram_client_contrib.go @@ -0,0 +1,84 @@ +// Copyright (c) 2015-2021 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package kafka + +import ( + "crypto/sha256" + "crypto/sha512" + + "github.com/Shopify/sarama" + "github.com/xdg/scram" +) + +func initScramClient(cfg Config, config *sarama.Config) { + if cfg.SASL.Mechanism == "sha512" { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: KafkaSHA512} } + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) + } else if cfg.SASL.Mechanism == "sha256" { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: KafkaSHA256} } + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256) + } else { + // default to PLAIN + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext) + } +} + +// KafkaSHA256 is a function that returns a crypto/sha256 hasher and should be used +// to create Client objects configured for SHA-256 hashing. +var KafkaSHA256 scram.HashGeneratorFcn = sha256.New + +// KafkaSHA512 is a function that returns a crypto/sha512 hasher and should be used +// to create Client objects configured for SHA-512 hashing. +var KafkaSHA512 scram.HashGeneratorFcn = sha512.New + +// XDGSCRAMClient implements the client-side of an authentication +// conversation with a server. A new conversation must be created for +// each authentication attempt. +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +// Begin constructs a SCRAM client component based on a given hash.Hash +// factory receiver. This constructor will normalize the username, password +// and authzID via the SASLprep algorithm, as recommended by RFC-5802. If +// SASLprep fails, the method returns an error. +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +// Step takes a string provided from a server (or just an empty string for the +// very first conversation step) and attempts to move the authentication +// conversation forward. It returns a string to be sent to the server or an +// error if the server message is invalid. Calling Step after a conversation +// completes is also an error. +func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +// Done returns true if the conversation is completed or has errored. +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +} diff --git a/internal/logger/targets.go b/internal/logger/targets.go index a3253d7cc..5aff7bff2 100644 --- a/internal/logger/targets.go +++ b/internal/logger/targets.go @@ -23,7 +23,7 @@ package logger type Target interface { String() string Endpoint() string - Validate() error + Init() error Send(entry interface{}, errKind string) error } @@ -36,7 +36,7 @@ var AuditTargets = []Target{} // AddAuditTarget adds a new audit logger target to the // list of enabled loggers func AddAuditTarget(t Target) error { - if err := t.Validate(); err != nil { + if err := t.Init(); err != nil { return err } @@ -47,7 +47,7 @@ func AddAuditTarget(t Target) error { // AddTarget adds a new logger target to the // list of enabled loggers func AddTarget(t Target) error { - if err := t.Validate(); err != nil { + if err := t.Init(); err != nil { return err } Targets = append(Targets, t)