mirror of
https://github.com/gravitational/teleport
synced 2024-10-21 17:53:28 +00:00
Perform event name filtering inside the database in the DynamoDB driver (#7231)
This commit is contained in:
parent
3ae1e49761
commit
a98e34e76a
|
@ -22,9 +22,11 @@ import (
|
|||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -738,6 +740,23 @@ func (notReadyYetError) Error() string {
|
|||
return "The DynamoDB event backend is not ready to accept queries yet. Please retry in a couple of seconds."
|
||||
}
|
||||
|
||||
// eventFilterList constructs a string of the form
|
||||
// "(:eventTypeN, :eventTypeN, ...)" where N is a succession of integers
|
||||
// starting from 0. The substrings :eventTypeN are automatically generated
|
||||
// variable names that are valid with in the DynamoDB query language.
|
||||
// The function generates a list of amount of these :eventTypeN variables that is a valid
|
||||
// list literal in the DynamoDB query language. In order for this list to work the request
|
||||
// needs to be supplied with the variable values for the event types you wish to fill the list with.
|
||||
//
|
||||
// The reason that this doesn't fill in the values as literals within the list is to prevent injection attacks.
|
||||
func eventFilterList(amount int) string {
|
||||
var eventTypes []string
|
||||
for i := 0; i < amount; i++ {
|
||||
eventTypes = append(eventTypes, fmt.Sprintf(":eventType%d", i))
|
||||
}
|
||||
return "(" + strings.Join(eventTypes, ", ") + ")"
|
||||
}
|
||||
|
||||
// searchEventsRaw is a low level function for searching for events. This is kept
|
||||
// separate from the SearchEvents function in order to allow tests to grab more metadata.
|
||||
func (l *Log) searchEventsRaw(fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, startKey string) ([]event, string, error) {
|
||||
|
@ -765,7 +784,12 @@ func (l *Log) searchEventsRaw(fromUTC, toUTC time.Time, namespace string, eventT
|
|||
} else {
|
||||
left = math.MaxInt64
|
||||
}
|
||||
doFilter := len(eventTypes) > 0
|
||||
|
||||
var typeFilter *string
|
||||
if len(eventTypes) != 0 {
|
||||
typeList := eventFilterList(len(eventTypes))
|
||||
typeFilter = aws.String(fmt.Sprintf("EventType IN %s", typeList))
|
||||
}
|
||||
|
||||
// Resume scanning at the correct date. We need to do this because we send individual queries per date
|
||||
// and you can't resume a query with the wrong iterator checkpoint.
|
||||
|
@ -793,6 +817,10 @@ dateLoop:
|
|||
":end": toUTC.Unix(),
|
||||
}
|
||||
|
||||
for i := range eventTypes {
|
||||
attributes[fmt.Sprintf(":eventType%d", i)] = eventTypes[i]
|
||||
}
|
||||
|
||||
attributeValues, err := dynamodbattribute.MarshalMap(attributes)
|
||||
if err != nil {
|
||||
return nil, "", trace.Wrap(err)
|
||||
|
@ -806,6 +834,7 @@ dateLoop:
|
|||
IndexName: aws.String(indexTimeSearchV2),
|
||||
ExclusiveStartKey: checkpoint.Iterator,
|
||||
Limit: aws.Int64(left),
|
||||
FilterExpression: typeFilter,
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
@ -840,34 +869,25 @@ dateLoop:
|
|||
foundStart = true
|
||||
}
|
||||
|
||||
accepted := false
|
||||
for i := range eventTypes {
|
||||
if e.EventType == eventTypes[i] {
|
||||
accepted = true
|
||||
break
|
||||
// Because this may break on non page boundaries an additional
|
||||
// checkpoint is needed for sub-page breaks.
|
||||
if totalSize+len(data) >= events.MaxEventBytesInResponse {
|
||||
hasLeft = i+1 != len(dates) || len(checkpoint.Iterator) != 0
|
||||
key, err := getSubPageCheckpoint(&e)
|
||||
if err != nil {
|
||||
return nil, "", trace.Wrap(err)
|
||||
}
|
||||
checkpoint.EventKey = key
|
||||
break dateLoop
|
||||
}
|
||||
if accepted || !doFilter {
|
||||
// Because this may break on non page boundaries an additional
|
||||
// checkpoint is needed for sub-page breaks.
|
||||
if totalSize+len(data) >= events.MaxEventBytesInResponse {
|
||||
hasLeft = i+1 != len(dates) || len(checkpoint.Iterator) != 0
|
||||
key, err := getSubPageCheckpoint(&e)
|
||||
if err != nil {
|
||||
return nil, "", trace.Wrap(err)
|
||||
}
|
||||
checkpoint.EventKey = key
|
||||
break dateLoop
|
||||
}
|
||||
|
||||
totalSize += len(data)
|
||||
values = append(values, e)
|
||||
left--
|
||||
totalSize += len(data)
|
||||
values = append(values, e)
|
||||
left--
|
||||
|
||||
if left == 0 {
|
||||
hasLeft = i+1 != len(dates) || len(checkpoint.Iterator) != 0
|
||||
break dateLoop
|
||||
}
|
||||
if left == 0 {
|
||||
hasLeft = i+1 != len(dates) || len(checkpoint.Iterator) != 0
|
||||
break dateLoop
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue