From 5e763b71dcc72c146e566a8e35fd55308b94c313 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 27 Jul 2022 09:44:59 -0700 Subject: [PATCH] use logger.LogOnce to reduce printing disconnection logs (#15408) fixes #15334 - re-use net/url parsed value for http.Request{} - remove gosimple, structcheck and unusued due to https://github.com/golangci/golangci-lint/issues/2649 - unwrapErrs upto leafErr to ensure that we store exactly the correct errors --- .golangci.yml | 4 - cmd/bucket-replication.go | 6 +- cmd/metrics-v2.go | 2 +- cmd/notification.go | 4 +- docs/site-replication/run-multi-site-ldap.sh | 15 +- .../run-multi-site-minio-idp.sh | 17 ++- docs/site-replication/run-multi-site-oidc.sh | 15 +- internal/event/target/amqp.go | 19 +-- internal/event/target/elasticsearch.go | 25 ++- internal/event/target/kafka.go | 13 +- internal/event/target/mqtt.go | 9 +- internal/event/target/mysql.go | 13 +- internal/event/target/nats.go | 13 +- internal/event/target/nsq.go | 11 +- internal/event/target/postgresql.go | 11 +- internal/event/target/redis.go | 41 ++--- internal/event/target/store.go | 31 ++-- internal/event/target/webhook.go | 9 +- internal/logger/logonce.go | 76 +++++++--- internal/logger/target/http/http.go | 2 +- internal/logger/target/kafka/kafka.go | 2 +- internal/rest/client.go | 143 +++++++++++++++--- 22 files changed, 312 insertions(+), 169 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 4079a5918..0ae9b6d30 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -14,13 +14,9 @@ linters: - govet - revive - ineffassign - - gosimple - deadcode - - structcheck - gomodguard - gofmt - - unused - - structcheck - unconvert - varcheck - gocritic diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 767998a67..a20e6b975 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1492,7 +1492,7 @@ func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) { }) case p.mrfReplicaCh <- ri: default: - logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up retrying failed replication - we recommend increasing number of replication failed workers with `mc admin config set api replication_failed_workers=%d`", p.suggestedWorkers(true)), replicationSubsystem) + logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up retrying failed replication - we recommend increasing number of replication failed workers with `mc admin config set api replication_failed_workers=%d`", p.suggestedWorkers(true)), string(replicationSubsystem)) } } @@ -1518,7 +1518,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { }) case ch <- ri: default: - logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing number of replicate object workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), replicationSubsystem) + logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing number of replicate object workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), string(replicationSubsystem)) } } @@ -1555,7 +1555,7 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf }) case ch <- doi: default: - logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing number of replicate workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), replicationSubsystem) + logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing number of replicate workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), string(replicationSubsystem)) } } diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index 24efe52d1..5811d426c 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -996,7 +996,7 @@ func getMinioProcMetrics() *MetricsGroup { metrics = make([]Metric, 0, 20) p, err := procfs.Self() if err != nil { - logger.LogOnceIf(ctx, err, nodeMetricNamespace) + logger.LogOnceIf(ctx, err, string(nodeMetricNamespace)) return } diff --git a/cmd/notification.go b/cmd/notification.go index aa930ff42..92e2a2cac 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -456,7 +456,7 @@ func (sys *NotificationSys) updateBloomFilter(ctx context.Context, current uint6 defer mu.Unlock() if err != nil || !serverBF.Complete || bf == nil { - logger.LogOnceIf(ctx, err, fmt.Sprintf("host:%s, cycle:%d", client.host, current), client.cycleServerBloomFilter) + logger.LogOnceIf(ctx, err, client.host.String(), client.cycleServerBloomFilter) bf = nil return nil } @@ -717,7 +717,7 @@ func (sys *NotificationSys) InitBucketTargets(ctx context.Context, objAPI Object if res.Err != nil { reqInfo := &logger.ReqInfo{} reqInfo.AppendTags("targetID", res.ID.Name) - logger.LogOnceIf(logger.SetReqInfo(GlobalContext, reqInfo), res.Err, res.ID) + logger.LogOnceIf(logger.SetReqInfo(GlobalContext, reqInfo), res.Err, res.ID.String()) } } }() diff --git a/docs/site-replication/run-multi-site-ldap.sh b/docs/site-replication/run-multi-site-ldap.sh index a1adcf7b3..b3134739f 100755 --- a/docs/site-replication/run-multi-site-ldap.sh +++ b/docs/site-replication/run-multi-site-ldap.sh @@ -3,6 +3,14 @@ # shellcheck disable=SC2120 exit_1() { cleanup + + echo "minio1 ============" + cat /tmp/minio1_1.log + echo "minio2 ============" + cat /tmp/minio2_1.log + echo "minio3 ============" + cat /tmp/minio3_1.log + exit 1 } @@ -38,7 +46,7 @@ export MINIO_IDENTITY_LDAP_GROUP_SEARCH_FILTER="(&(objectclass=groupOfNames)(mem if [ ! -f ./mc ]; then wget -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc \ - && chmod +x mc + && chmod +x mc fi minio server --config-dir /tmp/minio-ldap --address ":9001" /tmp/minio-ldap-idp1/{1...4} >/tmp/minio1_1.log 2>&1 & @@ -268,7 +276,8 @@ kill -9 ${site1_pid} # Restart minio1 instance minio server --config-dir /tmp/minio-ldap --address ":9001" /tmp/minio-ldap-idp1/{1...4} >/tmp/minio1_1.log 2>&1 & -sleep 15 +sleep 30 + # Test whether most recent tag update on minio2 is replicated to minio1 val=$(./mc tag list minio1/newbucket --json | jq -r .tagset | jq -r .key ) if [ "${val}" != "val2" ]; then @@ -279,7 +288,7 @@ fi # Test if bucket created/deleted when minio1 is down healed diff -q <(./mc ls minio1) <(./mc ls minio2) 1>/dev/null if [ $? -ne 0 ]; then - echo "expected `bucket2` delete and `newbucket2` creation to have replicated, exiting..." + echo "expected 'bucket2' delete and 'newbucket2' creation to have replicated, exiting..." exit_1; fi diff --git a/docs/site-replication/run-multi-site-minio-idp.sh b/docs/site-replication/run-multi-site-minio-idp.sh index 71b8e26d1..9581a7b18 100755 --- a/docs/site-replication/run-multi-site-minio-idp.sh +++ b/docs/site-replication/run-multi-site-minio-idp.sh @@ -3,6 +3,14 @@ # shellcheck disable=SC2120 exit_1() { cleanup + + echo "minio1 ============" + cat /tmp/minio1_1.log + echo "minio2 ============" + cat /tmp/minio2_1.log + echo "minio3 ============" + cat /tmp/minio3_1.log + exit 1 } @@ -30,7 +38,7 @@ export MINIO_KMS_SECRET_KEY=my-minio-key:OSMM+vkKUTCvQs9YL/CVMIMt43HFhkUpqJxTmGl if [ ! -f ./mc ]; then wget -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc \ - && chmod +x mc + && chmod +x mc fi minio server --config-dir /tmp/minio-internal --address ":9001" /tmp/minio-internal-idp1/{1...4} >/tmp/minio1_1.log 2>&1 & @@ -327,7 +335,7 @@ kill -9 ${site1_pid} ./mc rb minio2/bucket2 # Restart minio1 instance minio server --config-dir /tmp/minio-internal --address ":9001" /tmp/minio-internal-idp1/{1...4} >/tmp/minio1_1.log 2>&1 & -sleep 15 +sleep 30 # Test whether most recent tag update on minio2 is replicated to minio1 val=$(./mc tag list minio1/newbucket --json | jq -r .tagset | jq -r .key ) @@ -335,9 +343,10 @@ if [ "${val}" != "val2" ]; then echo "expected bucket tag to have replicated, exiting..." exit_1; fi + # Test if bucket created/deleted when minio1 is down healed diff -q <(./mc ls minio1) <(./mc ls minio2) 1>/dev/null if [ $? -ne 0 ]; then - echo "expected `bucket2` delete and `newbucket2` creation to have replicated, exiting..." + echo "expected 'bucket2' delete and 'newbucket2' creation to have replicated, exiting..." exit_1; -fi \ No newline at end of file +fi diff --git a/docs/site-replication/run-multi-site-oidc.sh b/docs/site-replication/run-multi-site-oidc.sh index e3d6f618a..743c7593f 100755 --- a/docs/site-replication/run-multi-site-oidc.sh +++ b/docs/site-replication/run-multi-site-oidc.sh @@ -3,6 +3,14 @@ # shellcheck disable=SC2120 exit_1() { cleanup + + echo "minio1 ============" + cat /tmp/minio1_1.log + echo "minio2 ============" + cat /tmp/minio2_1.log + echo "minio3 ============" + cat /tmp/minio3_1.log + exit 1 } @@ -46,7 +54,7 @@ site3_pid=$! if [ ! -f ./mc ]; then wget -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc \ - && chmod +x mc + && chmod +x mc fi sleep 10 @@ -252,7 +260,8 @@ kill -9 ${site1_pid} # Restart minio1 instance minio server --address ":9001" --console-address ":10000" /tmp/minio1/{1...4} >/tmp/minio1_1.log 2>&1 & -sleep 15 +sleep 30 + # Test whether most recent tag update on minio2 is replicated to minio1 val=$(./mc tag list minio1/newbucket --json | jq -r .tagset | jq -r .key ) if [ "${val}" != "val2" ]; then @@ -263,6 +272,6 @@ fi # Test if bucket created/deleted when minio1 is down healed diff -q <(./mc ls minio1) <(./mc ls minio2) 1>/dev/null if [ $? -ne 0 ]; then - echo "expected `bucket2` delete and `newbucket2` creation to have replicated, exiting..." + echo "expected 'bucket2' delete and 'newbucket2' creation to have replicated, exiting..." exit_1; fi diff --git a/internal/event/target/amqp.go b/internal/event/target/amqp.go index ffcfc2ce2..ad7a11f65 100644 --- a/internal/event/target/amqp.go +++ b/internal/event/target/amqp.go @@ -29,6 +29,7 @@ import ( "sync" "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/logger" xnet "github.com/minio/pkg/net" "github.com/streadway/amqp" ) @@ -115,7 +116,7 @@ type AMQPTarget struct { conn *amqp.Connection connMutex sync.Mutex store Store - loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) + loggerOnce logger.LogOnce } // ID - returns TargetID. @@ -262,10 +263,7 @@ func (target *AMQPTarget) Save(eventData event.Event) error { if err != nil { return err } - defer func() { - cErr := ch.Close() - target.loggerOnce(context.Background(), cErr, target.ID()) - }() + defer ch.Close() return target.send(eventData, ch, confirms) } @@ -276,10 +274,7 @@ func (target *AMQPTarget) Send(eventKey string) error { if err != nil { return err } - defer func() { - cErr := ch.Close() - target.loggerOnce(context.Background(), cErr, target.ID()) - }() + defer ch.Close() eventData, eErr := target.store.Get(eventKey) if eErr != nil { @@ -308,7 +303,7 @@ func (target *AMQPTarget) Close() error { } // NewAMQPTarget - creates new AMQP target. -func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}), test bool) (*AMQPTarget, error) { +func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*AMQPTarget, error) { var conn *amqp.Connection var err error @@ -324,7 +319,7 @@ func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce queueDir := filepath.Join(args.QueueDir, storePrefix+"-amqp-"+id) store = NewQueueStore(queueDir, args.QueueLimit) if oErr := store.Open(); oErr != nil { - target.loggerOnce(context.Background(), oErr, target.ID()) + target.loggerOnce(context.Background(), oErr, target.ID().String()) return target, oErr } target.store = store @@ -333,7 +328,7 @@ func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce conn, err = amqp.Dial(args.URL.String()) if err != nil { if store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } } diff --git a/internal/event/target/elasticsearch.go b/internal/event/target/elasticsearch.go index 8db0787c4..d7c598d29 100644 --- a/internal/event/target/elasticsearch.go +++ b/internal/event/target/elasticsearch.go @@ -36,6 +36,7 @@ import ( elasticsearch7 "github.com/elastic/go-elasticsearch/v7" "github.com/minio/highwayhash" "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/logger" xnet "github.com/minio/pkg/net" "github.com/pkg/errors" ) @@ -156,7 +157,7 @@ type ElasticsearchTarget struct { args ElasticsearchArgs client esClient store Store - loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) + loggerOnce logger.LogOnce } // ID - returns target ID. @@ -320,7 +321,7 @@ func (target *ElasticsearchTarget) checkAndInitClient(ctx context.Context) error } // NewElasticsearchTarget - creates new Elasticsearch target. -func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*ElasticsearchTarget, error) { +func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*ElasticsearchTarget, error) { target := &ElasticsearchTarget{ id: event.TargetID{ID: id, Name: "elasticsearch"}, args: args, @@ -331,7 +332,7 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan str queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id) target.store = NewQueueStore(queueDir, args.QueueLimit) if err := target.store.Open(); err != nil { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } } @@ -342,7 +343,7 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan str err := target.checkAndInitClient(ctx) if err != nil { if target.store == nil || err != errNotConnected { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } } @@ -492,11 +493,10 @@ func (c *esClientV7) removeEntry(ctx context.Context, index string, key string) return err } defer res.Body.Close() + defer io.Copy(ioutil.Discard, res.Body) if res.IsError() { - err := fmt.Errorf("Delete err: %s", res.String()) - return err + return fmt.Errorf("Delete err: %s", res.String()) } - io.Copy(ioutil.Discard, res.Body) return nil } return err @@ -522,11 +522,11 @@ func (c *esClientV7) updateEntry(ctx context.Context, index string, key string, return err } defer res.Body.Close() + defer io.Copy(ioutil.Discard, res.Body) if res.IsError() { - err := fmt.Errorf("Update err: %s", res.String()) - return err + return fmt.Errorf("Update err: %s", res.String()) } - io.Copy(ioutil.Discard, res.Body) + return nil } @@ -549,11 +549,10 @@ func (c *esClientV7) addEntry(ctx context.Context, index string, eventData event return err } defer res.Body.Close() + defer io.Copy(ioutil.Discard, res.Body) if res.IsError() { - err := fmt.Errorf("Add err: %s", res.String()) - return err + return fmt.Errorf("Add err: %s", res.String()) } - io.Copy(ioutil.Discard, res.Body) return nil } diff --git a/internal/event/target/kafka.go b/internal/event/target/kafka.go index e6b81715d..718dea35d 100644 --- a/internal/event/target/kafka.go +++ b/internal/event/target/kafka.go @@ -29,6 +29,7 @@ import ( "path/filepath" "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/logger" xnet "github.com/minio/pkg/net" sarama "github.com/Shopify/sarama" @@ -126,7 +127,7 @@ type KafkaTarget struct { producer sarama.SyncProducer config *sarama.Config store Store - loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) + loggerOnce logger.LogOnce } // ID - returns target ID. @@ -251,7 +252,7 @@ func (k KafkaArgs) pingBrokers() bool { } // NewKafkaTarget - creates new Kafka target with auth credentials. -func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*KafkaTarget, error) { +func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*KafkaTarget, error) { config := sarama.NewConfig() target := &KafkaTarget{ @@ -263,7 +264,7 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc if args.Version != "" { kafkaVersion, err := sarama.ParseKafkaVersion(args.Version) if err != nil { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } config.Version = kafkaVersion @@ -276,7 +277,7 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc tlsConfig, err := saramatls.NewConfig(args.TLS.ClientTLSCert, args.TLS.ClientTLSKey) if err != nil { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } @@ -303,7 +304,7 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc queueDir := filepath.Join(args.QueueDir, storePrefix+"-kafka-"+id) store = NewQueueStore(queueDir, args.QueueLimit) if oErr := store.Open(); oErr != nil { - target.loggerOnce(context.Background(), oErr, target.ID()) + target.loggerOnce(context.Background(), oErr, target.ID().String()) return target, oErr } target.store = store @@ -312,7 +313,7 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { if store == nil || err != sarama.ErrOutOfBrokers { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } } diff --git a/internal/event/target/mqtt.go b/internal/event/target/mqtt.go index 82e5809a8..bc9ac528c 100644 --- a/internal/event/target/mqtt.go +++ b/internal/event/target/mqtt.go @@ -31,6 +31,7 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/logger" xnet "github.com/minio/pkg/net" ) @@ -111,7 +112,7 @@ type MQTTTarget struct { client mqtt.Client store Store quitCh chan struct{} - loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}) + loggerOnce logger.LogOnce } // ID - returns target ID. @@ -202,7 +203,7 @@ func (target *MQTTTarget) Close() error { } // NewMQTTTarget - creates new MQTT target. -func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MQTTTarget, error) { +func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*MQTTTarget, error) { if args.MaxReconnectInterval == 0 { // Default interval // https://github.com/eclipse/paho.mqtt.golang/blob/master/options.go#L115 @@ -253,7 +254,7 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce target.loggerOnce(context.Background(), fmt.Errorf("Previous connect failed with %w attempting a reconnect", token.Error()), - target.ID()) + target.ID().String()) time.Sleep(reconnectInterval * time.Second) token = client.Connect() goto retry @@ -270,7 +271,7 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id) target.store = NewQueueStore(queueDir, args.QueueLimit) if err := target.store.Open(); err != nil { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } diff --git a/internal/event/target/mysql.go b/internal/event/target/mysql.go index 3d2709db5..27e928ee5 100644 --- a/internal/event/target/mysql.go +++ b/internal/event/target/mysql.go @@ -32,6 +32,7 @@ import ( "github.com/go-sql-driver/mysql" "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/logger" xnet "github.com/minio/pkg/net" ) @@ -152,7 +153,7 @@ type MySQLTarget struct { db *sql.DB store Store firstPing bool - loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) + loggerOnce logger.LogOnce } // ID - returns target ID. @@ -333,7 +334,7 @@ func (target *MySQLTarget) executeStmts() error { } // NewMySQLTarget - creates new MySQL target. -func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MySQLTarget, error) { +func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*MySQLTarget, error) { if args.DSN == "" { config := mysql.Config{ User: args.User, @@ -357,7 +358,7 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnc db, err := sql.Open("mysql", args.DSN) if err != nil { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } target.db = db @@ -373,7 +374,7 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnc queueDir := filepath.Join(args.QueueDir, storePrefix+"-mysql-"+id) store = NewQueueStore(queueDir, args.QueueLimit) if oErr := store.Open(); oErr != nil { - target.loggerOnce(context.Background(), oErr, target.ID()) + target.loggerOnce(context.Background(), oErr, target.ID().String()) return target, oErr } target.store = store @@ -382,12 +383,12 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnc err = target.db.Ping() if err != nil { if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } } else { if err = target.executeStmts(); err != nil { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } target.firstPing = true diff --git a/internal/event/target/nats.go b/internal/event/target/nats.go index 441b1d0b8..bc118da58 100644 --- a/internal/event/target/nats.go +++ b/internal/event/target/nats.go @@ -28,6 +28,7 @@ import ( "path/filepath" "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/logger" xnet "github.com/minio/pkg/net" "github.com/nats-io/nats.go" "github.com/nats-io/stan.go" @@ -217,7 +218,7 @@ type NATSTarget struct { stanConn stan.Conn jstream nats.JetStream store Store - loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) + loggerOnce logger.LogOnce } // ID - returns target ID. @@ -350,7 +351,7 @@ func (target *NATSTarget) Close() (err error) { } // NewNATSTarget - creates new NATS target. -func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NATSTarget, error) { +func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*NATSTarget, error) { var natsConn *nats.Conn var stanConn stan.Conn var jstream nats.JetStream @@ -369,14 +370,14 @@ func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce queueDir := filepath.Join(args.QueueDir, storePrefix+"-nats-"+id) store = NewQueueStore(queueDir, args.QueueLimit) if oErr := store.Open(); oErr != nil { - target.loggerOnce(context.Background(), oErr, target.ID()) + target.loggerOnce(context.Background(), oErr, target.ID().String()) return target, oErr } target.store = store } if args.Streaming.Enable { - target.loggerOnce(context.Background(), errors.New("NATS Streaming is deprecated please migrate to JetStream"), target.ID()) + target.loggerOnce(context.Background(), errors.New("NATS Streaming is deprecated please migrate to JetStream"), target.ID().String()) stanConn, err = args.connectStan() target.stanConn = stanConn @@ -387,7 +388,7 @@ func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce if err != nil { if store == nil || err.Error() != nats.ErrNoServers.Error() { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } } @@ -396,7 +397,7 @@ func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce jstream, err = target.natsConn.JetStream() if err != nil { if store == nil || err.Error() != nats.ErrNoServers.Error() { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } } diff --git a/internal/event/target/nsq.go b/internal/event/target/nsq.go index 2af40a5a3..8eb7aac32 100644 --- a/internal/event/target/nsq.go +++ b/internal/event/target/nsq.go @@ -29,6 +29,7 @@ import ( "github.com/nsqio/go-nsq" "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/logger" xnet "github.com/minio/pkg/net" ) @@ -92,7 +93,7 @@ type NSQTarget struct { producer *nsq.Producer store Store config *nsq.Config - loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) + loggerOnce logger.LogOnce } // ID - returns target ID. @@ -188,7 +189,7 @@ func (target *NSQTarget) Close() (err error) { } // NewNSQTarget - creates new NSQ target. -func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NSQTarget, error) { +func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*NSQTarget, error) { config := nsq.NewConfig() if args.TLS.Enable { config.TlsV1 = true @@ -210,7 +211,7 @@ func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce fu queueDir := filepath.Join(args.QueueDir, storePrefix+"-nsq-"+id) store = NewQueueStore(queueDir, args.QueueLimit) if oErr := store.Open(); oErr != nil { - target.loggerOnce(context.Background(), oErr, target.ID()) + target.loggerOnce(context.Background(), oErr, target.ID().String()) return target, oErr } target.store = store @@ -218,7 +219,7 @@ func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce fu producer, err := nsq.NewProducer(args.NSQDAddress.String(), config) if err != nil { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } target.producer = producer @@ -226,7 +227,7 @@ func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce fu if err := target.producer.Ping(); err != nil { // To treat "connection refused" errors as errNotConnected. if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } } diff --git a/internal/event/target/postgresql.go b/internal/event/target/postgresql.go index a865d5fd9..22bb2b3c4 100644 --- a/internal/event/target/postgresql.go +++ b/internal/event/target/postgresql.go @@ -33,6 +33,7 @@ import ( _ "github.com/lib/pq" // Register postgres driver "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/logger" xnet "github.com/minio/pkg/net" ) @@ -145,7 +146,7 @@ type PostgreSQLTarget struct { store Store firstPing bool connString string - loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) + loggerOnce logger.LogOnce } // ID - returns target ID. @@ -329,7 +330,7 @@ func (target *PostgreSQLTarget) executeStmts() error { } // NewPostgreSQLTarget - creates new PostgreSQL target. -func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*PostgreSQLTarget, error) { +func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*PostgreSQLTarget, error) { params := []string{args.ConnectionString} if args.ConnectionString == "" { params = []string{} @@ -376,7 +377,7 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, queueDir := filepath.Join(args.QueueDir, storePrefix+"-postgresql-"+id) store = NewQueueStore(queueDir, args.QueueLimit) if oErr := store.Open(); oErr != nil { - target.loggerOnce(context.Background(), oErr, target.ID()) + target.loggerOnce(context.Background(), oErr, target.ID().String()) return target, oErr } target.store = store @@ -385,12 +386,12 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, err = target.db.Ping() if err != nil { if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } } else { if err = target.executeStmts(); err != nil { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } target.firstPing = true diff --git a/internal/event/target/redis.go b/internal/event/target/redis.go index 03395317e..87706c48d 100644 --- a/internal/event/target/redis.go +++ b/internal/event/target/redis.go @@ -30,6 +30,7 @@ import ( "github.com/gomodule/redigo/redis" "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/logger" xnet "github.com/minio/pkg/net" ) @@ -121,7 +122,7 @@ type RedisTarget struct { pool *redis.Pool store Store firstPing bool - loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) + loggerOnce logger.LogOnce } // ID - returns target ID. @@ -137,10 +138,8 @@ func (target *RedisTarget) HasQueueStore() bool { // IsActive - Return true if target is up and active func (target *RedisTarget) IsActive() (bool, error) { conn := target.pool.Get() - defer func() { - cErr := conn.Close() - target.loggerOnce(context.Background(), cErr, target.ID()) - }() + defer conn.Close() + _, pingErr := conn.Do("PING") if pingErr != nil { if IsConnRefusedErr(pingErr) { @@ -166,10 +165,7 @@ func (target *RedisTarget) Save(eventData event.Event) error { // send - sends an event to the redis. func (target *RedisTarget) send(eventData event.Event) error { conn := target.pool.Get() - defer func() { - cErr := conn.Close() - target.loggerOnce(context.Background(), cErr, target.ID()) - }() + defer conn.Close() if target.args.Format == event.NamespaceFormat { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) @@ -209,10 +205,8 @@ func (target *RedisTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to redis. func (target *RedisTarget) Send(eventKey string) error { conn := target.pool.Get() - defer func() { - cErr := conn.Close() - target.loggerOnce(context.Background(), cErr, target.ID()) - }() + defer conn.Close() + _, pingErr := conn.Do("PING") if pingErr != nil { if IsConnRefusedErr(pingErr) { @@ -258,7 +252,7 @@ func (target *RedisTarget) Close() error { } // NewRedisTarget - creates new Redis target. -func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}), test bool) (*RedisTarget, error) { +func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*RedisTarget, error) { pool := &redis.Pool{ MaxIdle: 3, IdleTimeout: 2 * 60 * time.Second, @@ -270,18 +264,14 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnc if args.Password != "" { if _, err = conn.Do("AUTH", args.Password); err != nil { - cErr := conn.Close() - targetID := event.TargetID{ID: id, Name: "redis"} - loggerOnce(context.Background(), cErr, targetID) + conn.Close() return nil, err } } // Must be done after AUTH if _, err = conn.Do("CLIENT", "SETNAME", "MinIO"); err != nil { - cErr := conn.Close() - targetID := event.TargetID{ID: id, Name: "redis"} - loggerOnce(context.Background(), cErr, targetID) + conn.Close() return nil, err } @@ -306,27 +296,24 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnc queueDir := filepath.Join(args.QueueDir, storePrefix+"-redis-"+id) store = NewQueueStore(queueDir, args.QueueLimit) if oErr := store.Open(); oErr != nil { - target.loggerOnce(context.Background(), oErr, target.ID()) + target.loggerOnce(context.Background(), oErr, target.ID().String()) return target, oErr } target.store = store } conn := target.pool.Get() - defer func() { - cErr := conn.Close() - target.loggerOnce(context.Background(), cErr, target.ID()) - }() + defer conn.Close() _, pingErr := conn.Do("PING") if pingErr != nil { if target.store == nil || !(IsConnRefusedErr(pingErr) || IsConnResetErr(pingErr)) { - target.loggerOnce(context.Background(), pingErr, target.ID()) + target.loggerOnce(context.Background(), pingErr, target.ID().String()) return target, pingErr } } else { if err := target.args.validateFormat(conn); err != nil { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } target.firstPing = true diff --git a/internal/event/target/store.go b/internal/event/target/store.go index fbf024e83..a22ff0d47 100644 --- a/internal/event/target/store.go +++ b/internal/event/target/store.go @@ -26,6 +26,7 @@ import ( "time" "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/logger" ) const retryInterval = 3 * time.Second @@ -46,36 +47,34 @@ type Store interface { } // replayEvents - Reads the events from the store and replays. -func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), id event.TargetID) <-chan string { +func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce logger.LogOnce, id event.TargetID) <-chan string { eventKeyCh := make(chan string) go func() { + defer close(eventKeyCh) + retryTicker := time.NewTicker(retryInterval) defer retryTicker.Stop() - defer close(eventKeyCh) + for { names, err := store.List() - if err == nil { + if err != nil { + loggerOnce(context.Background(), fmt.Errorf("eventStore.List() failed with: %w", err), id.String()) + } else { for _, name := range names { select { case eventKeyCh <- strings.TrimSuffix(name, eventExt): - // Get next key. + // Get next key. case <-doneCh: return } } } - if len(names) < 2 { - select { - case <-retryTicker.C: - if err != nil { - loggerOnce(context.Background(), - fmt.Errorf("store.List() failed '%w'", err), id) - } - case <-doneCh: - return - } + select { + case <-retryTicker.C: + case <-doneCh: + return } } }() @@ -98,7 +97,7 @@ func IsConnResetErr(err error) bool { } // sendEvents - Reads events from the store and re-plays. -func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) { +func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}, loggerOnce logger.LogOnce) { retryTicker := time.NewTicker(retryInterval) defer retryTicker.Stop() @@ -112,7 +111,7 @@ func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan str if err != errNotConnected && !IsConnResetErr(err) { loggerOnce(context.Background(), fmt.Errorf("target.Send() failed with '%w'", err), - target.ID()) + target.ID().String()) } // Retrying after 3secs back-off diff --git a/internal/event/target/webhook.go b/internal/event/target/webhook.go index 6421fa519..1d78a8a79 100644 --- a/internal/event/target/webhook.go +++ b/internal/event/target/webhook.go @@ -35,6 +35,7 @@ import ( "time" "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/logger" "github.com/minio/pkg/certs" xnet "github.com/minio/pkg/net" ) @@ -94,7 +95,7 @@ type WebhookTarget struct { args WebhookArgs httpClient *http.Client store Store - loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) + loggerOnce logger.LogOnce } // ID - returns target ID. @@ -232,7 +233,7 @@ func (target *WebhookTarget) Close() error { } // NewWebhookTarget - creates new Webhook target. -func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), transport *http.Transport, test bool) (*WebhookTarget, error) { +func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOnce logger.LogOnce, transport *http.Transport, test bool) (*WebhookTarget, error) { var store Store target := &WebhookTarget{ id: event.TargetID{ID: id, Name: "webhook"}, @@ -254,7 +255,7 @@ func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOn queueDir := filepath.Join(args.QueueDir, storePrefix+"-webhook-"+id) store = NewQueueStore(queueDir, args.QueueLimit) if err := store.Open(); err != nil { - target.loggerOnce(context.Background(), err, target.ID()) + target.loggerOnce(context.Background(), err, target.ID().String()) return target, err } target.store = store @@ -263,7 +264,7 @@ func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOn _, err := target.IsActive() if err != nil { if target.store == nil || err != errNotConnected { - target.loggerOnce(ctx, err, target.ID()) + target.loggerOnce(ctx, err, target.ID().String()) return target, err } } diff --git a/internal/logger/logonce.go b/internal/logger/logonce.go index 13c98fa4e..26e7b9fed 100644 --- a/internal/logger/logonce.go +++ b/internal/logger/logonce.go @@ -19,29 +19,34 @@ package logger import ( "context" + "errors" "sync" "time" ) +// LogOnce provides the function type for logger.LogOnceIf() function +type LogOnce func(ctx context.Context, err error, id string, errKind ...interface{}) + // Holds a map of recently logged errors. type logOnceType struct { - IDMap map[interface{}]error + IDMap map[string]error sync.Mutex } -func (l *logOnceType) logOnceConsoleIf(ctx context.Context, err error, id interface{}, errKind ...interface{}) { +func (l *logOnceType) logOnceConsoleIf(ctx context.Context, err error, id string, errKind ...interface{}) { if err == nil { return } + + nerr := unwrapErrs(err) l.Lock() - shouldLog := false - prevErr := l.IDMap[id] - if prevErr == nil { - l.IDMap[id] = err - shouldLog = true - } else if prevErr.Error() != err.Error() { - l.IDMap[id] = err - shouldLog = true + shouldLog := true + prevErr, ok := l.IDMap[id] + if !ok { + l.IDMap[id] = nerr + } else { + // if errors are equal do not log. + shouldLog = prevErr.Error() != nerr.Error() } l.Unlock() @@ -50,20 +55,45 @@ func (l *logOnceType) logOnceConsoleIf(ctx context.Context, err error, id interf } } +const unwrapErrsDepth = 3 + +// unwrapErrs upto the point where errors.Unwrap(err) returns nil +func unwrapErrs(err error) (leafErr error) { + uerr := errors.Unwrap(err) + depth := 1 + for uerr != nil { + // Save the current `uerr` + leafErr = uerr + // continue to look for leaf errors underneath + uerr = errors.Unwrap(leafErr) + depth++ + if depth == unwrapErrsDepth { + // If we have reached enough depth we + // do not further recurse down, this + // is done to avoid any unnecessary + // latencies this might bring. + break + } + } + return leafErr +} + // One log message per error. -func (l *logOnceType) logOnceIf(ctx context.Context, err error, id interface{}, errKind ...interface{}) { +func (l *logOnceType) logOnceIf(ctx context.Context, err error, id string, errKind ...interface{}) { if err == nil { return } + + nerr := unwrapErrs(err) + l.Lock() - shouldLog := false - prevErr := l.IDMap[id] - if prevErr == nil { - l.IDMap[id] = err - shouldLog = true - } else if prevErr.Error() != err.Error() { - l.IDMap[id] = err - shouldLog = true + shouldLog := true + prevErr, ok := l.IDMap[id] + if !ok { + l.IDMap[id] = nerr + } else { + // if errors are equal do not log. + shouldLog = prevErr.Error() != nerr.Error() } l.Unlock() @@ -76,7 +106,7 @@ func (l *logOnceType) logOnceIf(ctx context.Context, err error, id interface{}, func (l *logOnceType) cleanupRoutine() { for { l.Lock() - l.IDMap = make(map[interface{}]error) + l.IDMap = make(map[string]error) l.Unlock() time.Sleep(30 * time.Minute) @@ -85,7 +115,7 @@ func (l *logOnceType) cleanupRoutine() { // Returns logOnceType func newLogOnceType() *logOnceType { - l := &logOnceType{IDMap: make(map[interface{}]error)} + l := &logOnceType{IDMap: make(map[string]error)} go l.cleanupRoutine() return l } @@ -95,7 +125,7 @@ var logOnce = newLogOnceType() // LogOnceIf - Logs notification errors - once per error. // id is a unique identifier for related log messages, refer to cmd/notification.go // on how it is used. -func LogOnceIf(ctx context.Context, err error, id interface{}, errKind ...interface{}) { +func LogOnceIf(ctx context.Context, err error, id string, errKind ...interface{}) { if logIgnoreError(err) { return } @@ -103,7 +133,7 @@ func LogOnceIf(ctx context.Context, err error, id interface{}, errKind ...interf } // LogOnceConsoleIf - similar to LogOnceIf but exclusively only logs to console target. -func LogOnceConsoleIf(ctx context.Context, err error, id interface{}, errKind ...interface{}) { +func LogOnceConsoleIf(ctx context.Context, err error, id string, errKind ...interface{}) { if logIgnoreError(err) { return } diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 531b40c87..1048220a8 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -48,7 +48,7 @@ type Config struct { Transport http.RoundTripper `json:"-"` // Custom logger - LogOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) `json:"-"` + LogOnce func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"` } // Target implements logger.Target and sends the json diff --git a/internal/logger/target/kafka/kafka.go b/internal/logger/target/kafka/kafka.go index b7c929425..123f9f6df 100644 --- a/internal/logger/target/kafka/kafka.go +++ b/internal/logger/target/kafka/kafka.go @@ -129,7 +129,7 @@ type Config struct { } `json:"sasl"` // Custom logger - LogOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) `json:"-"` + LogOnce func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"` } // Check if atleast one broker in cluster is active diff --git a/internal/rest/client.go b/internal/rest/client.go index 1fa7e0811..994962107 100644 --- a/internal/rest/client.go +++ b/internal/rest/client.go @@ -18,6 +18,7 @@ package rest import ( + "bytes" "context" "errors" "fmt" @@ -26,6 +27,9 @@ import ( "math/rand" "net/http" "net/url" + "path" + "strings" + "sync" "sync/atomic" "time" @@ -105,12 +109,10 @@ type Client struct { httpClient *http.Client url *url.URL newAuthToken func(audience string) string -} -// URL query separator constants -const ( - querySep = "?" -) + sync.RWMutex // mutex for lastErr + lastErr error +} type restError string @@ -122,22 +124,112 @@ func (e restError) Timeout() bool { return true } -// Call - make a REST call with context. -func (c *Client) Call(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) { - if !c.IsOnline() { - return nil, &NetworkError{Err: &url.Error{Op: method, URL: c.url.String(), Err: restError("remote server offline")}} +// Given a string of the form "host", "host:port", or "[ipv6::address]:port", +// return true if the string includes a port. +func hasPort(s string) bool { return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") } + +// removeEmptyPort strips the empty port in ":port" to "" +// as mandated by RFC 3986 Section 6.2.3. +func removeEmptyPort(host string) string { + if hasPort(host) { + return strings.TrimSuffix(host, ":") } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.url.String()+method+querySep+values.Encode(), body) - if err != nil { - return nil, &NetworkError{err} + return host +} + +// Copied from http.NewRequest but implemented to ensure we re-use `url.URL` instance. +func (c *Client) newRequest(ctx context.Context, u *url.URL, body io.Reader) (*http.Request, error) { + rc, ok := body.(io.ReadCloser) + if !ok && body != nil { + rc = io.NopCloser(body) } + u.Host = removeEmptyPort(u.Host) + // The host's colon:port should be normalized. See Issue 14836. + req := &http.Request{ + Method: http.MethodPost, + URL: u, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + Header: make(http.Header), + Body: rc, + Host: u.Host, + } + req = req.WithContext(ctx) + if body != nil { + switch v := body.(type) { + case *bytes.Buffer: + req.ContentLength = int64(v.Len()) + buf := v.Bytes() + req.GetBody = func() (io.ReadCloser, error) { + r := bytes.NewReader(buf) + return io.NopCloser(r), nil + } + case *bytes.Reader: + req.ContentLength = int64(v.Len()) + snapshot := *v + req.GetBody = func() (io.ReadCloser, error) { + r := snapshot + return io.NopCloser(&r), nil + } + case *strings.Reader: + req.ContentLength = int64(v.Len()) + snapshot := *v + req.GetBody = func() (io.ReadCloser, error) { + r := snapshot + return io.NopCloser(&r), nil + } + default: + // This is where we'd set it to -1 (at least + // if body != NoBody) to mean unknown, but + // that broke people during the Go 1.8 testing + // period. People depend on it being 0 I + // guess. Maybe retry later. See Issue 18117. + } + // For client requests, Request.ContentLength of 0 + // means either actually 0, or unknown. The only way + // to explicitly say that the ContentLength is zero is + // to set the Body to nil. But turns out too much code + // depends on NewRequest returning a non-nil Body, + // so we use a well-known ReadCloser variable instead + // and have the http package also treat that sentinel + // variable to mean explicitly zero. + if req.GetBody != nil && req.ContentLength == 0 { + req.Body = http.NoBody + req.GetBody = func() (io.ReadCloser, error) { return http.NoBody, nil } + } + } + if c.newAuthToken != nil { - req.Header.Set("Authorization", "Bearer "+c.newAuthToken(req.URL.RawQuery)) + req.Header.Set("Authorization", "Bearer "+c.newAuthToken(u.RawQuery)) } req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339)) if body != nil { req.Header.Set("Expect", "100-continue") } + + return req, nil +} + +// Call - make a REST call with context. +func (c *Client) Call(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) { + urlStr := c.url.String() + if !c.IsOnline() { + return nil, &NetworkError{c.LastError()} + } + + u, err := url.Parse(urlStr) + if err != nil { + return nil, &NetworkError{Err: &url.Error{Op: method, URL: urlStr, Err: err}} + } + + u.Path = path.Join(u.Path, method) + u.RawQuery = values.Encode() + + req, err := c.newRequest(ctx, u, body) + if err != nil { + return nil, &NetworkError{err} + } if length > 0 { req.ContentLength = length } @@ -147,8 +239,8 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod if !c.NoMetrics { atomic.AddUint64(&networkErrsCounter, 1) } - if c.MarkOffline() { - logger.LogIf(ctx, fmt.Errorf("Marking %s temporary offline; caused by %w", c.url.String(), err)) + if c.MarkOffline(err) { + logger.LogOnceIf(ctx, fmt.Errorf("Marking %s offline temporarily; caused by %w", c.url.Host, err), c.url.Host) } } return nil, &NetworkError{err} @@ -171,8 +263,9 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod // fully it should make sure to respond with '412' // instead, see cmd/storage-rest-server.go for ideas. if c.HealthCheckFn != nil && resp.StatusCode == http.StatusPreconditionFailed { - logger.LogIf(ctx, fmt.Errorf("Marking %s temporary offline; caused by PreconditionFailed with disk ID mismatch", c.url.String())) - c.MarkOffline() + err = fmt.Errorf("Marking %s offline temporarily; caused by PreconditionFailed with disk ID mismatch", c.url.Host) + logger.LogOnceIf(ctx, err, c.url.Host) + c.MarkOffline(err) } defer xhttp.DrainBody(resp.Body) // Limit the ReadAll(), just in case, because of a bug, the server responds with large data. @@ -182,8 +275,8 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod if !c.NoMetrics { atomic.AddUint64(&networkErrsCounter, 1) } - if c.MarkOffline() { - logger.LogIf(ctx, fmt.Errorf("Marking %s temporary offline; caused by %w", c.url.String(), err)) + if c.MarkOffline(err) { + logger.LogOnceIf(ctx, fmt.Errorf("Marking %s offline temporarily; caused by %w", c.url.Host, err), c.url.Host) } } return nil, err @@ -227,10 +320,20 @@ func (c *Client) LastConn() time.Time { return time.Unix(0, atomic.LoadInt64(&c.lastConn)) } +// LastError returns previous error +func (c *Client) LastError() error { + c.RLock() + defer c.RUnlock() + return c.lastErr +} + // MarkOffline - will mark a client as being offline and spawns // a goroutine that will attempt to reconnect if HealthCheckFn is set. // returns true if the node changed state from online to offline -func (c *Client) MarkOffline() bool { +func (c *Client) MarkOffline(err error) bool { + c.Lock() + c.lastErr = err + c.Unlock() // Start goroutine that will attempt to reconnect. // If server is already trying to reconnect this will have no effect. if c.HealthCheckFn != nil && atomic.CompareAndSwapInt32(&c.connected, online, offline) {