initialize streaming events without lazy initialization (#16016)

This commit is contained in:
Harshavardhana 2022-11-07 08:01:24 -08:00 committed by GitHub
parent 1f3db03bf0
commit 21251d8c22
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 81 additions and 51 deletions

View file

@ -332,9 +332,6 @@ func (target *AMQPTarget) initAMQP() error {
}
target.conn = conn
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
@ -349,11 +346,17 @@ func NewAMQPTarget(id string, args AMQPArgs, loggerOnce logger.LogOnce) (*AMQPTa
}
}
return &AMQPTarget{
target := &AMQPTarget{
id: event.TargetID{ID: id, Name: "amqp"},
args: args,
loggerOnce: loggerOnce,
store: store,
quitCh: make(chan struct{}),
}, nil
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
}

View file

@ -349,9 +349,6 @@ func (target *ElasticsearchTarget) initElasticsearch() error {
return err
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
@ -366,13 +363,19 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, loggerOnce logger
}
}
return &ElasticsearchTarget{
target := &ElasticsearchTarget{
id: event.TargetID{ID: id, Name: "elasticsearch"},
args: args,
store: store,
loggerOnce: loggerOnce,
quitCh: make(chan struct{}),
}, nil
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
}
// ES Client definitions and methods

View file

@ -329,9 +329,6 @@ func (target *KafkaTarget) initKafka() error {
return errNotConnected
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
@ -346,11 +343,17 @@ func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*Kafk
}
}
return &KafkaTarget{
target := &KafkaTarget{
id: event.TargetID{ID: id, Name: "kafka"},
args: args,
store: store,
loggerOnce: loggerOnce,
quitCh: make(chan struct{}),
}, nil
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
}

View file

@ -254,9 +254,6 @@ func (target *MQTTTarget) initMQTT() error {
return errNotConnected
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
@ -281,11 +278,17 @@ func NewMQTTTarget(id string, args MQTTArgs, loggerOnce logger.LogOnce) (*MQTTTa
}
}
return &MQTTTarget{
target := &MQTTTarget{
id: event.TargetID{ID: id, Name: "mqtt"},
args: args,
store: store,
quitCh: make(chan struct{}),
loggerOnce: loggerOnce,
}, nil
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
}

View file

@ -382,9 +382,6 @@ func (target *MySQLTarget) initMySQL() error {
return errNotConnected
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
@ -413,12 +410,18 @@ func NewMySQLTarget(id string, args MySQLArgs, loggerOnce logger.LogOnce) (*MySQ
args.DSN = config.FormatDSN()
}
return &MySQLTarget{
target := &MySQLTarget{
id: event.TargetID{ID: id, Name: "mysql"},
args: args,
firstPing: false,
store: store,
loggerOnce: loggerOnce,
quitCh: make(chan struct{}),
}, nil
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
}

View file

@ -410,9 +410,6 @@ func (target *NATSTarget) initNATS() error {
return errNotConnected
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
@ -427,11 +424,17 @@ func NewNATSTarget(id string, args NATSArgs, loggerOnce logger.LogOnce) (*NATSTa
}
}
return &NATSTarget{
target := &NATSTarget{
id: event.TargetID{ID: id, Name: "nats"},
args: args,
loggerOnce: loggerOnce,
store: store,
quitCh: make(chan struct{}),
}, nil
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
}

View file

@ -244,9 +244,6 @@ func (target *NSQTarget) initNSQ() error {
return errNotConnected
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
@ -261,11 +258,17 @@ func NewNSQTarget(id string, args NSQArgs, loggerOnce logger.LogOnce) (*NSQTarge
}
}
return &NSQTarget{
target := &NSQTarget{
id: event.TargetID{ID: id, Name: "nsq"},
args: args,
loggerOnce: loggerOnce,
store: store,
quitCh: make(chan struct{}),
}, nil
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
}

View file

@ -376,9 +376,6 @@ func (target *PostgreSQLTarget) initPostgreSQL() error {
return errNotConnected
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
@ -414,7 +411,7 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, loggerOnce logger.LogOn
}
}
return &PostgreSQLTarget{
target := &PostgreSQLTarget{
id: event.TargetID{ID: id, Name: "postgresql"},
args: args,
firstPing: false,
@ -422,5 +419,11 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, loggerOnce logger.LogOn
connString: connStr,
loggerOnce: loggerOnce,
quitCh: make(chan struct{}),
}, nil
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
}

View file

@ -296,9 +296,6 @@ func (target *RedisTarget) initRedis() error {
return errNotConnected
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
@ -343,12 +340,18 @@ func NewRedisTarget(id string, args RedisArgs, loggerOnce logger.LogOnce) (*Redi
},
}
return &RedisTarget{
target := &RedisTarget{
id: event.TargetID{ID: id, Name: "redis"},
args: args,
pool: pool,
store: store,
loggerOnce: loggerOnce,
quitCh: make(chan struct{}),
}, nil
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
}

View file

@ -179,7 +179,7 @@ func (target *WebhookTarget) send(eventData event.Event) error {
return err
}
req, err := http.NewRequest("POST", target.args.Endpoint.String(), bytes.NewReader(data))
req, err := http.NewRequest(http.MethodPost, target.args.Endpoint.String(), bytes.NewReader(data))
if err != nil {
return err
}
@ -272,9 +272,6 @@ func (target *WebhookTarget) initWebhook() error {
return errNotConnected
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.cancelCh, target.loggerOnce)
}
return nil
}
@ -292,7 +289,7 @@ func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOn
}
}
return &WebhookTarget{
target := &WebhookTarget{
id: event.TargetID{ID: id, Name: "webhook"},
args: args,
loggerOnce: loggerOnce,
@ -300,5 +297,11 @@ func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOn
store: store,
cancel: cancel,
cancelCh: ctx.Done(),
}, nil
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.cancelCh, target.loggerOnce)
}
return target, nil
}