From bfec5fe200fda100ef811873cc3762384c8457ce Mon Sep 17 00:00:00 2001 From: Praveen raj Mani Date: Tue, 14 Apr 2020 23:49:25 +0530 Subject: [PATCH] fix: fetchLambdaInfo should return consistent results (#9332) - Introduced a function `FetchRegisteredTargets` which will return a complete set of registered targets irrespective to their states, if the `returnOnTargetError` flag is set to `False` - Refactor NewTarget functions to return non-nil targets - Refactor GetARNList() to return a complete list of configured targets --- cmd/admin-handlers.go | 8 +- cmd/config/notify/parse.go | 155 +++++++++++++++++++++++------- pkg/event/target/amqp.go | 26 ++--- pkg/event/target/elasticsearch.go | 37 ++++--- pkg/event/target/kafka.go | 45 +++++---- pkg/event/target/mqtt.go | 9 +- pkg/event/target/mysql.go | 42 +++++--- pkg/event/target/nats.go | 91 +++++++++--------- pkg/event/target/nsq.go | 43 ++++++--- pkg/event/target/postgresql.go | 44 ++++++--- pkg/event/target/redis.go | 29 +++--- pkg/event/target/webhook.go | 32 +++--- 12 files changed, 357 insertions(+), 204 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 92cebd38c..a77a2958c 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1453,11 +1453,13 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque func fetchLambdaInfo(cfg config.Config) []map[string][]madmin.TargetIDStatus { - // Fetch the targets - targetList, err := notify.RegisterNotificationTargets(cfg, GlobalServiceDoneCh, NewGatewayHTTPTransport(), nil, true) - if err != nil { + // Fetch the configured targets + targetList, err := notify.FetchRegisteredTargets(cfg, GlobalServiceDoneCh, NewGatewayHTTPTransport(), true, false) + if err != nil && err != notify.ErrTargetsOffline { + logger.LogIf(GlobalContext, err) return nil } + lambdaMap := make(map[string][]madmin.TargetIDStatus) for targetID, target := range targetList.TargetMap() { diff --git a/cmd/config/notify/parse.go b/cmd/config/notify/parse.go index eceaf8300..71e5f8fb8 100644 --- a/cmd/config/notify/parse.go +++ b/cmd/config/notify/parse.go @@ -17,8 +17,10 @@ package notify import ( + "context" "crypto/tls" "crypto/x509" + "errors" "net/http" "strconv" "strings" @@ -37,12 +39,16 @@ const ( formatAccess = "access" ) +// ErrTargetsOffline - Indicates single/multiple target failures. +var ErrTargetsOffline = errors.New("one or more targets are offline. Please use `mc admin info --json` to check the offline targets") + // TestNotificationTargets is similar to GetNotificationTargets() // avoids explicit registration. func TestNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport, targetIDs []event.TargetID) error { test := true - targets, err := RegisterNotificationTargets(cfg, doneCh, transport, targetIDs, test) + returnOnTargetError := true + targets, err := RegisterNotificationTargets(cfg, doneCh, transport, targetIDs, test, returnOnTargetError) if err == nil { // Close all targets since we are only testing connections. for _, t := range targets.TargetMap() { @@ -57,7 +63,8 @@ func TestNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transpor // targets, returns error if any. func GetNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport) (*event.TargetList, error) { test := false - return RegisterNotificationTargets(cfg, doneCh, transport, nil, test) + returnOnTargetError := false + return RegisterNotificationTargets(cfg, doneCh, transport, nil, test, returnOnTargetError) } // RegisterNotificationTargets - returns TargetList which contains enabled targets in serverConfig. @@ -65,8 +72,34 @@ func GetNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport // * Add a new target in pkg/event/target package. // * Add newly added target configuration to serverConfig.Notify.. // * Handle the configuration in this function to create/add into TargetList. -func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport, targetIDs []event.TargetID, test bool) (_ *event.TargetList, err error) { +func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport, targetIDs []event.TargetID, test bool, returnOnTargetError bool) (*event.TargetList, error) { + + targetList, err := FetchRegisteredTargets(cfg, doneCh, transport, test, returnOnTargetError) + if err != nil { + return targetList, err + } + + if test { + // Verify if user is trying to disable already configured + // notification targets, based on their target IDs + for _, targetID := range targetIDs { + if !targetList.Exists(targetID) { + return nil, config.Errorf( + "Unable to disable configured targets '%v'", + targetID) + } + } + } + + return targetList, nil +} + +// FetchRegisteredTargets - Returns a set of configured TargetList +// If `returnOnTargetError` is set to true, The function returns when a target initialization fails +// Else, the function will return a complete TargetList irrespective of errors +func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport, test bool, returnOnTargetError bool) (_ *event.TargetList, err error) { targetList := event.NewTargetList() + var targetsOffline bool defer func() { // Automatically close all connections to targets when an error occur @@ -137,10 +170,17 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran } newTarget, err := target.NewAMQPTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { - return nil, err + targetsOffline = true + if returnOnTargetError { + return nil, err + } } + if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return nil, err + } } } @@ -150,11 +190,16 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran } newTarget, err := target.NewElasticsearchTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { - return nil, err - + targetsOffline = true + if returnOnTargetError { + return nil, err + } } if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return nil, err + } } } @@ -165,10 +210,16 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran args.TLS.RootCAs = transport.TLSClientConfig.RootCAs newTarget, err := target.NewKafkaTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { - return nil, err + targetsOffline = true + if returnOnTargetError { + return nil, err + } } if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return nil, err + } } } @@ -179,10 +230,16 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran args.RootCAs = transport.TLSClientConfig.RootCAs newTarget, err := target.NewMQTTTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { - return nil, err + targetsOffline = true + if returnOnTargetError { + return nil, err + } } if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return nil, err + } } } @@ -192,10 +249,16 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran } newTarget, err := target.NewMySQLTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { - return nil, err + targetsOffline = true + if returnOnTargetError { + return nil, err + } } if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return nil, err + } } } @@ -205,10 +268,16 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran } newTarget, err := target.NewNATSTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { - return nil, err + targetsOffline = true + if returnOnTargetError { + return nil, err + } } if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return nil, err + } } } @@ -218,10 +287,16 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran } newTarget, err := target.NewNSQTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { - return nil, err + targetsOffline = true + if returnOnTargetError { + return nil, err + } } if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return nil, err + } } } @@ -231,10 +306,16 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran } newTarget, err := target.NewPostgreSQLTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { - return nil, err + targetsOffline = true + if returnOnTargetError { + return nil, err + } } if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return nil, err + } } } @@ -244,10 +325,16 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran } newTarget, err := target.NewRedisTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { - return nil, err + targetsOffline = true + if returnOnTargetError { + return nil, err + } } if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return nil, err + } } } @@ -257,23 +344,21 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran } newTarget, err := target.NewWebhookTarget(id, args, doneCh, logger.LogOnceIf, transport, test) if err != nil { - return nil, err + targetsOffline = true + if returnOnTargetError { + return nil, err + } } - if err := targetList.Add(newTarget); err != nil { - return nil, err + if err = targetList.Add(newTarget); err != nil { + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return nil, err + } } } - if test { - // Verify if user is trying to disable already configured - // notification targets, based on their target IDs - for _, targetID := range targetIDs { - if !targetList.Exists(targetID) { - return nil, config.Errorf( - "Unable to disable configured targets '%v'", - targetID) - } - } + if targetsOffline { + return targetList, ErrTargetsOffline } return targetList, nil diff --git a/pkg/event/target/amqp.go b/pkg/event/target/amqp.go index ab0259a95..fdfae9f18 100644 --- a/pkg/event/target/amqp.go +++ b/pkg/event/target/amqp.go @@ -275,35 +275,37 @@ func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce var store Store + target := &AMQPTarget{ + id: event.TargetID{ID: id, Name: "amqp"}, + args: args, + loggerOnce: loggerOnce, + } + if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-amqp-"+id) store = NewQueueStore(queueDir, args.QueueLimit) if oErr := store.Open(); oErr != nil { - return nil, oErr + target.loggerOnce(context.Background(), oErr, target.ID()) + return target, oErr } + target.store = store } conn, err = amqp.Dial(args.URL.String()) if err != nil { if store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) { - return nil, err + target.loggerOnce(context.Background(), err, target.ID()) + return target, err } } - - target := &AMQPTarget{ - id: event.TargetID{ID: id, Name: "amqp"}, - args: args, - conn: conn, - store: store, - loggerOnce: loggerOnce, - } + target.conn = conn if target.store != nil && !test { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) + eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, loggerOnce) + go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) } return target, nil diff --git a/pkg/event/target/elasticsearch.go b/pkg/event/target/elasticsearch.go index 83e20ff6a..476fbdaa7 100644 --- a/pkg/event/target/elasticsearch.go +++ b/pkg/event/target/elasticsearch.go @@ -82,10 +82,11 @@ func (a ElasticsearchArgs) Validate() error { // ElasticsearchTarget - Elasticsearch target. type ElasticsearchTarget struct { - id event.TargetID - args ElasticsearchArgs - client *elastic.Client - store Store + id event.TargetID + args ElasticsearchArgs + client *elastic.Client + store Store + loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) } // ID - returns target ID. @@ -252,38 +253,42 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan str var store Store + target := &ElasticsearchTarget{ + id: event.TargetID{ID: id, Name: "elasticsearch"}, + args: args, + loggerOnce: loggerOnce, + } + if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id) store = NewQueueStore(queueDir, args.QueueLimit) if oErr := store.Open(); oErr != nil { - return nil, oErr + target.loggerOnce(context.Background(), oErr, target.ID()) + return target, oErr } + target.store = store } dErr := args.URL.DialHTTP(nil) if dErr != nil { if store == nil { - return nil, dErr + target.loggerOnce(context.Background(), dErr, target.ID()) + return target, dErr } } else { client, err = newClient(args) if err != nil { - return nil, err + target.loggerOnce(context.Background(), err, target.ID()) + return target, err } - } - - target := &ElasticsearchTarget{ - id: event.TargetID{ID: id, Name: "elasticsearch"}, - args: args, - client: client, - store: store, + target.client = client } if target.store != nil && !test { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) + eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, loggerOnce) + go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) } return target, nil diff --git a/pkg/event/target/kafka.go b/pkg/event/target/kafka.go index bf09a1a90..302f62795 100644 --- a/pkg/event/target/kafka.go +++ b/pkg/event/target/kafka.go @@ -123,11 +123,12 @@ func (k KafkaArgs) Validate() error { // KafkaTarget - Kafka target. type KafkaTarget struct { - id event.TargetID - args KafkaArgs - producer sarama.SyncProducer - config *sarama.Config - store Store + id event.TargetID + args KafkaArgs + producer sarama.SyncProducer + config *sarama.Config + store Store + loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) } // ID - returns target ID. @@ -248,10 +249,17 @@ func (k KafkaArgs) pingBrokers() bool { func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*KafkaTarget, error) { config := sarama.NewConfig() + target := &KafkaTarget{ + id: event.TargetID{ID: id, Name: "kafka"}, + args: args, + loggerOnce: loggerOnce, + } + if args.Version != "" { kafkaVersion, err := sarama.ParseKafkaVersion(args.Version) if err != nil { - return nil, err + target.loggerOnce(context.Background(), err, target.ID()) + return target, err } config.Version = kafkaVersion } @@ -273,7 +281,8 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc tlsConfig, err := saramatls.NewConfig(args.TLS.ClientTLSCert, args.TLS.ClientTLSKey) if err != nil { - return nil, err + target.loggerOnce(context.Background(), err, target.ID()) + return target, err } config.Net.TLS.Enable = args.TLS.Enable @@ -286,6 +295,8 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc config.Producer.Retry.Max = 10 config.Producer.Return.Successes = true + target.config = config + brokers := []string{} for _, broker := range args.Brokers { brokers = append(brokers, broker.String()) @@ -297,30 +308,26 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc queueDir := filepath.Join(args.QueueDir, storePrefix+"-kafka-"+id) store = NewQueueStore(queueDir, args.QueueLimit) if oErr := store.Open(); oErr != nil { - return nil, oErr + target.loggerOnce(context.Background(), oErr, target.ID()) + return target, oErr } + target.store = store } producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { if store == nil || err != sarama.ErrOutOfBrokers { - return nil, err + target.loggerOnce(context.Background(), err, target.ID()) + return target, err } } - - target := &KafkaTarget{ - id: event.TargetID{ID: id, Name: "kafka"}, - args: args, - producer: producer, - config: config, - store: store, - } + target.producer = producer if target.store != nil && !test { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) + eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, loggerOnce) + go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) } return target, nil diff --git a/pkg/event/target/mqtt.go b/pkg/event/target/mqtt.go index d02ff3a9e..ddcf6022d 100644 --- a/pkg/event/target/mqtt.go +++ b/pkg/event/target/mqtt.go @@ -258,19 +258,20 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id) target.store = NewQueueStore(queueDir, args.QueueLimit) if err := target.store.Open(); err != nil { - return nil, err + target.loggerOnce(context.Background(), err, target.ID()) + return target, err } if !test { go retryRegister() // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) + eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, loggerOnce) + go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) } } else { if token.Wait() && token.Error() != nil { - return nil, token.Error() + return target, token.Error() } } return target, nil diff --git a/pkg/event/target/mysql.go b/pkg/event/target/mysql.go index 61e19899f..096659bdd 100644 --- a/pkg/event/target/mysql.go +++ b/pkg/event/target/mysql.go @@ -178,6 +178,7 @@ type MySQLTarget struct { db *sql.DB store Store firstPing bool + loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) } // ID - returns target ID. @@ -187,6 +188,13 @@ func (target *MySQLTarget) ID() event.TargetID { // IsActive - Return true if target is up and active func (target *MySQLTarget) IsActive() (bool, error) { + if target.db == nil { + db, sErr := sql.Open("mysql", target.args.DSN) + if sErr != nil { + return false, sErr + } + target.db = db + } if err := target.db.Ping(); err != nil { if IsConnErr(err) { return false, errNotConnected @@ -346,7 +354,6 @@ func (target *MySQLTarget) executeStmts() error { // NewMySQLTarget - creates new MySQL target. func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MySQLTarget, error) { - var firstPing bool if args.DSN == "" { config := mysql.Config{ User: args.User, @@ -360,10 +367,19 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnc args.DSN = config.FormatDSN() } + target := &MySQLTarget{ + id: event.TargetID{ID: id, Name: "mysql"}, + args: args, + firstPing: false, + loggerOnce: loggerOnce, + } + db, err := sql.Open("mysql", args.DSN) if err != nil { - return nil, err + target.loggerOnce(context.Background(), err, target.ID()) + return target, err } + target.db = db var store Store @@ -371,35 +387,31 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnc queueDir := filepath.Join(args.QueueDir, storePrefix+"-mysql-"+id) store = NewQueueStore(queueDir, args.QueueLimit) if oErr := store.Open(); oErr != nil { - return nil, oErr + target.loggerOnce(context.Background(), oErr, target.ID()) + return target, oErr } - } - - target := &MySQLTarget{ - id: event.TargetID{ID: id, Name: "mysql"}, - args: args, - db: db, - store: store, - firstPing: firstPing, + target.store = store } err = target.db.Ping() if err != nil { if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) { - return nil, err + target.loggerOnce(context.Background(), err, target.ID()) + return target, err } } else { if err = target.executeStmts(); err != nil { - return nil, err + target.loggerOnce(context.Background(), err, target.ID()) + return target, err } target.firstPing = true } if target.store != nil && !test { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) + eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, loggerOnce) + go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) } return target, nil diff --git a/pkg/event/target/nats.go b/pkg/event/target/nats.go index 7bc1dec82..8b7178195 100644 --- a/pkg/event/target/nats.go +++ b/pkg/event/target/nats.go @@ -197,11 +197,12 @@ func (n NATSArgs) connectStan() (stan.Conn, error) { // NATSTarget - NATS target. type NATSTarget struct { - id event.TargetID - args NATSArgs - natsConn *nats.Conn - stanConn stan.Conn - store Store + id event.TargetID + args NATSArgs + natsConn *nats.Conn + stanConn stan.Conn + store Store + loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) } // ID - returns target ID. @@ -211,15 +212,32 @@ func (target *NATSTarget) ID() event.TargetID { // IsActive - Return true if target is up and active func (target *NATSTarget) IsActive() (bool, error) { + var connErr error if target.args.Streaming.Enable { - if !target.stanConn.NatsConn().IsConnected() { - return false, errNotConnected + if target.stanConn == nil || target.stanConn.NatsConn() == nil { + target.stanConn, connErr = target.args.connectStan() + } else { + if !target.stanConn.NatsConn().IsConnected() { + return false, errNotConnected + } } } else { - if !target.natsConn.IsConnected() { - return false, errNotConnected + if target.natsConn == nil { + target.natsConn, connErr = target.args.connectNats() + } else { + if !target.natsConn.IsConnected() { + return false, errNotConnected + } } } + + if connErr != nil { + if connErr.Error() == nats.ErrNoServers.Error() { + return false, errNotConnected + } + return false, connErr + } + return true, nil } @@ -262,31 +280,9 @@ func (target *NATSTarget) send(eventData event.Event) error { // Send - sends event to Nats. func (target *NATSTarget) Send(eventKey string) error { - var connErr error - - if target.args.Streaming.Enable { - if target.stanConn == nil || target.stanConn.NatsConn() == nil { - target.stanConn, connErr = target.args.connectStan() - } else { - if !target.stanConn.NatsConn().IsConnected() { - return errNotConnected - } - } - } else { - if target.natsConn == nil { - target.natsConn, connErr = target.args.connectNats() - } else { - if !target.natsConn.IsConnected() { - return errNotConnected - } - } - } - - if connErr != nil { - if connErr.Error() == nats.ErrNoServers.Error() { - return errNotConnected - } - return connErr + _, err := target.IsActive() + if err != nil { + return err } eventData, eErr := target.store.Get(eventKey) @@ -332,39 +328,42 @@ func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce var store Store + target := &NATSTarget{ + id: event.TargetID{ID: id, Name: "nats"}, + args: args, + loggerOnce: loggerOnce, + } + if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-nats-"+id) store = NewQueueStore(queueDir, args.QueueLimit) if oErr := store.Open(); oErr != nil { - return nil, oErr + target.loggerOnce(context.Background(), oErr, target.ID()) + return target, oErr } + target.store = store } if args.Streaming.Enable { stanConn, err = args.connectStan() + target.stanConn = stanConn } else { natsConn, err = args.connectNats() + target.natsConn = natsConn } if err != nil { if store == nil || err.Error() != nats.ErrNoServers.Error() { - return nil, err + target.loggerOnce(context.Background(), err, target.ID()) + return target, err } } - target := &NATSTarget{ - id: event.TargetID{ID: id, Name: "nats"}, - args: args, - stanConn: stanConn, - natsConn: natsConn, - store: store, - } - if target.store != nil && !test { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) + eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, loggerOnce) + go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) } return target, nil diff --git a/pkg/event/target/nsq.go b/pkg/event/target/nsq.go index e143d7238..8f5185d28 100644 --- a/pkg/event/target/nsq.go +++ b/pkg/event/target/nsq.go @@ -89,10 +89,12 @@ func (n NSQArgs) Validate() error { // NSQTarget - NSQ target. type NSQTarget struct { - id event.TargetID - args NSQArgs - producer *nsq.Producer - store Store + id event.TargetID + args NSQArgs + producer *nsq.Producer + store Store + config *nsq.Config + loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) } // ID - returns target ID. @@ -102,6 +104,14 @@ func (target *NSQTarget) ID() event.TargetID { // IsActive - Return true if target is up and active func (target *NSQTarget) IsActive() (bool, error) { + if target.producer != nil { + producer, err := nsq.NewProducer(target.args.NSQDAddress.String(), target.config) + if err != nil { + return false, err + } + target.producer = producer + } + if err := target.producer.Ping(); err != nil { // To treat "connection refused" errors as errNotConnected. if IsConnRefusedErr(err) { @@ -186,38 +196,43 @@ func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce fu var store Store + target := &NSQTarget{ + id: event.TargetID{ID: id, Name: "nsq"}, + args: args, + config: config, + loggerOnce: loggerOnce, + } + if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-nsq-"+id) store = NewQueueStore(queueDir, args.QueueLimit) if oErr := store.Open(); oErr != nil { + target.loggerOnce(context.Background(), oErr, target.ID()) return nil, oErr } + target.store = store } producer, err := nsq.NewProducer(args.NSQDAddress.String(), config) if err != nil { - return nil, err - } - - target := &NSQTarget{ - id: event.TargetID{ID: id, Name: "nsq"}, - args: args, - producer: producer, - store: store, + target.loggerOnce(context.Background(), err, target.ID()) + return target, err } + target.producer = producer if err := target.producer.Ping(); err != nil { // To treat "connection refused" errors as errNotConnected. if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) { + target.loggerOnce(context.Background(), err, target.ID()) return nil, err } } if target.store != nil && !test { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) + eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, loggerOnce) + go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) } return target, nil diff --git a/pkg/event/target/postgresql.go b/pkg/event/target/postgresql.go index f1dade7a7..0a0ecec76 100644 --- a/pkg/event/target/postgresql.go +++ b/pkg/event/target/postgresql.go @@ -176,6 +176,8 @@ type PostgreSQLTarget struct { db *sql.DB store Store firstPing bool + connString string + loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) } // ID - returns target ID. @@ -185,6 +187,13 @@ func (target *PostgreSQLTarget) ID() event.TargetID { // IsActive - Return true if target is up and active func (target *PostgreSQLTarget) IsActive() (bool, error) { + if target.db == nil { + db, err := sql.Open("postgres", target.connString) + if err != nil { + return false, err + } + target.db = db + } if err := target.db.Ping(); err != nil { if IsConnErr(err) { return false, errNotConnected @@ -345,8 +354,6 @@ func (target *PostgreSQLTarget) executeStmts() error { // NewPostgreSQLTarget - creates new PostgreSQL target. func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*PostgreSQLTarget, error) { - var firstPing bool - params := []string{args.ConnectionString} if !args.Host.IsEmpty() { params = append(params, "host="+args.Host.String()) @@ -365,10 +372,19 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, } connStr := strings.Join(params, " ") + target := &PostgreSQLTarget{ + id: event.TargetID{ID: id, Name: "postgresql"}, + args: args, + firstPing: false, + connString: connStr, + loggerOnce: loggerOnce, + } + db, err := sql.Open("postgres", connStr) if err != nil { - return nil, err + return target, err } + target.db = db var store Store @@ -376,35 +392,31 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, queueDir := filepath.Join(args.QueueDir, storePrefix+"-postgresql-"+id) store = NewQueueStore(queueDir, args.QueueLimit) if oErr := store.Open(); oErr != nil { - return nil, oErr + target.loggerOnce(context.Background(), oErr, target.ID()) + return target, oErr } - } - - target := &PostgreSQLTarget{ - id: event.TargetID{ID: id, Name: "postgresql"}, - args: args, - db: db, - store: store, - firstPing: firstPing, + target.store = store } err = target.db.Ping() if err != nil { if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) { - return nil, err + target.loggerOnce(context.Background(), err, target.ID()) + return target, err } } else { if err = target.executeStmts(); err != nil { - return nil, err + target.loggerOnce(context.Background(), err, target.ID()) + return target, err } target.firstPing = true } if target.store != nil && !test { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) + eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, loggerOnce) + go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) } return target, nil diff --git a/pkg/event/target/redis.go b/pkg/event/target/redis.go index 979415d30..14124f7aa 100644 --- a/pkg/event/target/redis.go +++ b/pkg/event/target/redis.go @@ -286,22 +286,23 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnc var store Store - if args.QueueDir != "" { - queueDir := filepath.Join(args.QueueDir, storePrefix+"-redis-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if oErr := store.Open(); oErr != nil { - return nil, oErr - } - } - target := &RedisTarget{ id: event.TargetID{ID: id, Name: "redis"}, args: args, pool: pool, - store: store, loggerOnce: loggerOnce, } + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-redis-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) + if oErr := store.Open(); oErr != nil { + target.loggerOnce(context.Background(), oErr, target.ID()) + return target, oErr + } + target.store = store + } + conn := target.pool.Get() defer func() { cErr := conn.Close() @@ -311,20 +312,22 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnc _, pingErr := conn.Do("PING") if pingErr != nil { if target.store == nil || !(IsConnRefusedErr(pingErr) || IsConnResetErr(pingErr)) { - return nil, pingErr + target.loggerOnce(context.Background(), pingErr, target.ID()) + return target, pingErr } } else { if err := target.args.validateFormat(conn); err != nil { - return nil, err + target.loggerOnce(context.Background(), err, target.ID()) + return target, err } target.firstPing = true } if target.store != nil && !test { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) + eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, loggerOnce) + go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) } return target, nil diff --git a/pkg/event/target/webhook.go b/pkg/event/target/webhook.go index 52e888624..679a42d7f 100644 --- a/pkg/event/target/webhook.go +++ b/pkg/event/target/webhook.go @@ -82,6 +82,7 @@ type WebhookTarget struct { args WebhookArgs httpClient *http.Client store Store + loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) } // ID - returns target ID. @@ -195,28 +196,37 @@ func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, logge var store Store - if args.QueueDir != "" { - queueDir := filepath.Join(args.QueueDir, storePrefix+"-webhook-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if err := store.Open(); err != nil { - return nil, err - } - } - target := &WebhookTarget{ id: event.TargetID{ID: id, Name: "webhook"}, args: args, httpClient: &http.Client{ Transport: transport, }, - store: store, + loggerOnce: loggerOnce, + } + + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-webhook-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) + if err := store.Open(); err != nil { + target.loggerOnce(context.Background(), err, target.ID()) + return target, err + } + target.store = store + } + + if _, err := target.IsActive(); err != nil { + if target.store == nil || err != errNotConnected { + target.loggerOnce(context.Background(), err, target.ID()) + return target, err + } } if target.store != nil && !test { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) + eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, loggerOnce) + go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) } return target, nil