Update lib/events/dynamoevents to use aws-sdk-go-v2 (#44363)

This is a continuation of converting dynamodb components to use the
latest version of the sdk that was started in
https://github.com/gravitational/teleport/pull/44356.

This should have feature parity with the existing backend except
for prometheus metrics. In an attempt to isolate the changes here
the metrics are omitted for the time being and will be added in a
follow up.

In addition, a few of the events test suite cases were updated to
be more reliable when testing against a real backend.
This commit is contained in:
rosstimothy 2024-07-23 12:14:09 -04:00 committed by GitHub
parent 58a91d5107
commit 80e6b4d888
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 281 additions and 203 deletions

View file

@ -551,7 +551,7 @@ func TestSearchEvents(t *testing.T) {
Limit: 100,
Order: types.EventOrderAscending,
// startKey generated by dynamo which points to Apr 27 2023 08:22:58 UTC
StartKey: `{"date":"2023-04-27","iterator":{"CreatedAt":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":"1682583778","NS":null,"NULL":null,"S":null,"SS":null},"CreatedAtDate":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":null,"NS":null,"NULL":null,"S":"2023-04-27","SS":null},"EventIndex":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":"0","NS":null,"NULL":null,"S":null,"SS":null},"SessionID":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":null,"NS":null,"NULL":null,"S":"4bc51fd7-4f0c-47ee-b9a5-da621fbdbabb","SS":null}}}`,
StartKey: `{"date":"2023-04-27","iterator":"{\"CreatedAt\":1682583778,\"CreatedAtDate\":\"2023-04-27\",\"EventIndex\":0,\"SessionID\":\"4bc51fd7-4f0c-47ee-b9a5-da621fbdbabb\"}"}`,
},
queryResultsResps: singleCallResults(100),
check: func(t *testing.T, mock *mockAthenaExecutor, paginationKey string) {
@ -572,12 +572,11 @@ func TestSearchEvents(t *testing.T) {
searchParams: &events.SearchEventsRequest{
// To is set here as value from keyset -5h to test case
// when cost optimized search should not be used.
From: dynamoKeysetTimestamp.Add(-5 * time.Hour),
To: toUTC,
Limit: 100,
Order: types.EventOrderDescending,
// startKey generated by dynamo which points to Apr 27 2023 08:22:58 UTC
StartKey: `{"date":"2023-04-27","iterator":{"CreatedAt":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":"1682583778","NS":null,"NULL":null,"S":null,"SS":null},"CreatedAtDate":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":null,"NS":null,"NULL":null,"S":"2023-04-27","SS":null},"EventIndex":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":"0","NS":null,"NULL":null,"S":null,"SS":null},"SessionID":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":null,"NS":null,"NULL":null,"S":"4bc51fd7-4f0c-47ee-b9a5-da621fbdbabb","SS":null}}}`,
From: dynamoKeysetTimestamp.Add(-5 * time.Hour),
To: toUTC,
Limit: 100,
Order: types.EventOrderDescending,
StartKey: `{"date":"2023-04-27","iterator":"{\"CreatedAt\":1682583778,\"CreatedAtDate\":\"2023-04-27\",\"EventIndex\":0,\"SessionID\":\"4bc51fd7-4f0c-47ee-b9a5-da621fbdbabb\"}"}`,
},
queryResultsResps: singleCallResults(100),
check: func(t *testing.T, mock *mockAthenaExecutor, paginationKey string) {

View file

@ -27,20 +27,22 @@ import (
"fmt"
"maps"
"math"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
awssession "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/applicationautoscaling"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/applicationautoscaling"
autoscalingtypes "github.com/aws/aws-sdk-go-v2/service/applicationautoscaling/types"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
dynamodbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/smithy-go"
smithyendpoints "github.com/aws/smithy-go/endpoints"
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
@ -50,8 +52,9 @@ import (
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
dynamometrics "github.com/gravitational/teleport/lib/observability/metrics/dynamo"
"github.com/gravitational/teleport/lib/modules"
"github.com/gravitational/teleport/lib/utils"
)
@ -71,24 +74,24 @@ const (
)
// Defines the attribute schema for the DynamoDB event table and index.
var tableSchema = []*dynamodb.AttributeDefinition{
var tableSchema = []dynamodbtypes.AttributeDefinition{
// Existing attributes pre RFD 24.
{
AttributeName: aws.String(keySessionID),
AttributeType: aws.String("S"),
AttributeType: dynamodbtypes.ScalarAttributeTypeS,
},
{
AttributeName: aws.String(keyEventIndex),
AttributeType: aws.String("N"),
AttributeType: dynamodbtypes.ScalarAttributeTypeN,
},
{
AttributeName: aws.String(keyCreatedAt),
AttributeType: aws.String("N"),
AttributeType: dynamodbtypes.ScalarAttributeTypeN,
},
// New attribute in RFD 24.
{
AttributeName: aws.String(keyDate),
AttributeType: aws.String("S"),
AttributeType: dynamodbtypes.ScalarAttributeTypeS,
},
}
@ -116,15 +119,15 @@ type Config struct {
DisableConflictCheck bool
// ReadMaxCapacity is the maximum provisioned read capacity.
ReadMaxCapacity int64
ReadMaxCapacity int32
// ReadMinCapacity is the minimum provisioned read capacity.
ReadMinCapacity int64
ReadMinCapacity int32
// ReadTargetValue is the ratio of consumed read to provisioned capacity.
ReadTargetValue float64
// WriteMaxCapacity is the maximum provisioned write capacity.
WriteMaxCapacity int64
WriteMaxCapacity int32
// WriteMinCapacity is the minimum provisioned write capacity.
WriteMinCapacity int64
WriteMinCapacity int32
// WriteTargetValue is the ratio of consumed write to provisioned capacity.
WriteTargetValue float64
@ -203,10 +206,7 @@ type Log struct {
*log.Entry
// Config is a backend configuration
Config
svc dynamodbiface.DynamoDBAPI
// session holds the AWS client.
session *awssession.Session
svc *dynamodb.Client
}
type event struct {
@ -214,7 +214,7 @@ type event struct {
EventIndex int64
EventType string
CreatedAt int64
Expires *int64 `json:"Expires,omitempty"`
Expires *int64 `json:"Expires,omitempty" dynamodbav:",omitempty"`
FieldsMap events.EventFields
EventNamespace string
CreatedAtDate string
@ -265,51 +265,66 @@ func New(ctx context.Context, cfg Config) (*Log, error) {
if err != nil {
return nil, trace.Wrap(err)
}
b := &Log{
Entry: l,
Config: cfg,
opts := []func(*config.LoadOptions) error{
config.WithRegion(cfg.Region),
config.WithHTTPClient(&http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
MaxIdleConns: defaults.HTTPMaxIdleConns,
MaxIdleConnsPerHost: defaults.HTTPMaxIdleConnsPerHost,
},
}),
}
awsConfig := aws.Config{}
// Override the default environment's region if value set in YAML file:
if cfg.Region != "" {
awsConfig.Region = aws.String(cfg.Region)
awsConfig, err := config.LoadDefaultConfig(ctx, opts...)
if err != nil {
return nil, trace.Wrap(err)
}
var dynamoOpts []func(*dynamodb.Options)
// Override the service endpoint using the "endpoint" query parameter from
// "audit_events_uri". This is for non-AWS DynamoDB-compatible backends.
if cfg.Endpoint != "" {
awsConfig.Endpoint = aws.String(cfg.Endpoint)
u, err := url.Parse(cfg.Endpoint)
if err != nil {
return nil, trace.BadParameter("configured DynamoDB events endpoint is invalid: %s", err.Error())
}
dynamoOpts = append(dynamoOpts, dynamodb.WithEndpointResolverV2(&staticResolver{endpoint: u}))
}
b.session, err = awssession.NewSessionWithOptions(awssession.Options{
SharedConfigState: awssession.SharedConfigEnable,
Config: awsConfig,
})
if err != nil {
return nil, trace.Wrap(err)
// FIPS settings are applied on the individual service instead of the aws config,
// as DynamoDB Streams and Application Auto Scaling do not yet have FIPS endpoints in non-GovCloud.
// See also: https://aws.amazon.com/compliance/fips/#FIPS_Endpoints_by_Service
if modules.GetModules().IsBoringBinary() && cfg.UseFIPSEndpoint == types.ClusterAuditConfigSpecV2_FIPS_ENABLED {
dynamoOpts = append(dynamoOpts, func(o *dynamodb.Options) {
o.EndpointOptions.UseFIPSEndpoint = aws.FIPSEndpointStateEnabled
})
}
// Create DynamoDB service.
svc, err := dynamometrics.NewAPIMetrics(dynamometrics.Events, dynamodb.New(b.session, &aws.Config{
// Setting this on the individual service instead of the session, as DynamoDB Streams
// and Application Auto Scaling do not yet have FIPS endpoints in non-GovCloud.
// See also: https://aws.amazon.com/compliance/fips/#FIPS_Endpoints_by_Service
UseFIPSEndpoint: events.FIPSProtoStateToAWSState(cfg.UseFIPSEndpoint),
}))
if err != nil {
return nil, trace.Wrap(err)
b := &Log{
Entry: l,
Config: cfg,
svc: dynamodb.NewFromConfig(awsConfig, dynamoOpts...),
}
b.svc = svc
if err := b.configureTable(ctx); err != nil {
if err := b.configureTable(ctx, applicationautoscaling.NewFromConfig(awsConfig)); err != nil {
return nil, trace.Wrap(err)
}
return b, nil
}
type staticResolver struct {
endpoint *url.URL
}
func (s *staticResolver) ResolveEndpoint(ctx context.Context, params dynamodb.EndpointParameters) (smithyendpoints.Endpoint, error) {
return smithyendpoints.Endpoint{URI: *s.endpoint}, nil
}
type tableStatus int
const (
@ -319,7 +334,7 @@ const (
tableStatusOK
)
func (l *Log) configureTable(ctx context.Context) error {
func (l *Log) configureTable(ctx context.Context, svc *applicationautoscaling.Client) error {
// check if the table exists?
ts, err := l.getTableStatus(ctx, l.Tablename)
if err != nil {
@ -337,18 +352,18 @@ func (l *Log) configureTable(ctx context.Context) error {
return trace.Wrap(err)
}
tableName := aws.String(l.Tablename)
ttlStatus, err := l.svc.DescribeTimeToLiveWithContext(ctx, &dynamodb.DescribeTimeToLiveInput{
ttlStatus, err := l.svc.DescribeTimeToLive(ctx, &dynamodb.DescribeTimeToLiveInput{
TableName: tableName,
})
if err != nil {
return trace.Wrap(convertError(err))
}
switch aws.StringValue(ttlStatus.TimeToLiveDescription.TimeToLiveStatus) {
case dynamodb.TimeToLiveStatusEnabled, dynamodb.TimeToLiveStatusEnabling:
switch ttlStatus.TimeToLiveDescription.TimeToLiveStatus {
case dynamodbtypes.TimeToLiveStatusEnabled, dynamodbtypes.TimeToLiveStatusEnabling:
default:
_, err = l.svc.UpdateTimeToLiveWithContext(ctx, &dynamodb.UpdateTimeToLiveInput{
_, err = l.svc.UpdateTimeToLive(ctx, &dynamodb.UpdateTimeToLiveInput{
TableName: tableName,
TimeToLiveSpecification: &dynamodb.TimeToLiveSpecification{
TimeToLiveSpecification: &dynamodbtypes.TimeToLiveSpecification{
AttributeName: aws.String(keyExpires),
Enabled: aws.Bool(true),
},
@ -361,8 +376,8 @@ func (l *Log) configureTable(ctx context.Context) error {
// Enable continuous backups if requested.
if l.Config.EnableContinuousBackups {
// Make request to AWS to update continuous backups settings.
_, err := l.svc.UpdateContinuousBackupsWithContext(ctx, &dynamodb.UpdateContinuousBackupsInput{
PointInTimeRecoverySpecification: &dynamodb.PointInTimeRecoverySpecification{
_, err := l.svc.UpdateContinuousBackups(ctx, &dynamodb.UpdateContinuousBackupsInput{
PointInTimeRecoverySpecification: &dynamodbtypes.PointInTimeRecoverySpecification{
PointInTimeRecoveryEnabled: aws.Bool(true),
},
TableName: tableName,
@ -374,11 +389,9 @@ func (l *Log) configureTable(ctx context.Context) error {
// Enable auto scaling if requested.
if l.Config.EnableAutoScaling {
svc := applicationautoscaling.New(l.session)
type autoscalingParams struct {
readDimension string
writeDimension string
readDimension autoscalingtypes.ScalableDimension
writeDimension autoscalingtypes.ScalableDimension
resourceID string
readPolicy string
writePolicy string
@ -386,15 +399,15 @@ func (l *Log) configureTable(ctx context.Context) error {
params := []autoscalingParams{
{
readDimension: applicationautoscaling.ScalableDimensionDynamodbTableReadCapacityUnits,
writeDimension: applicationautoscaling.ScalableDimensionDynamodbTableWriteCapacityUnits,
readDimension: autoscalingtypes.ScalableDimensionDynamoDBTableReadCapacityUnits,
writeDimension: autoscalingtypes.ScalableDimensionDynamoDBTableWriteCapacityUnits,
resourceID: fmt.Sprintf("table/%s", l.Tablename),
readPolicy: fmt.Sprintf("%s-write-target-tracking-scaling-policy", l.Tablename),
writePolicy: fmt.Sprintf("%s-write-target-tracking-scaling-policy", l.Tablename),
},
{
readDimension: applicationautoscaling.ScalableDimensionDynamodbIndexReadCapacityUnits,
writeDimension: applicationautoscaling.ScalableDimensionDynamodbIndexWriteCapacityUnits,
readDimension: autoscalingtypes.ScalableDimensionDynamoDBIndexReadCapacityUnits,
writeDimension: autoscalingtypes.ScalableDimensionDynamoDBIndexWriteCapacityUnits,
resourceID: fmt.Sprintf("table/%s/index/%s", l.Tablename, indexTimeSearchV2),
readPolicy: fmt.Sprintf("%s/index/%s-write-target-tracking-scaling-policy", l.Tablename, indexTimeSearchV2),
writePolicy: fmt.Sprintf("%s/index/%s-write-target-tracking-scaling-policy", l.Tablename, indexTimeSearchV2),
@ -403,36 +416,36 @@ func (l *Log) configureTable(ctx context.Context) error {
for _, p := range params {
// Define scaling targets. Defines minimum and maximum {read,write} capacity.
if _, err := svc.RegisterScalableTargetWithContext(ctx, &applicationautoscaling.RegisterScalableTargetInput{
MinCapacity: aws.Int64(l.ReadMinCapacity),
MaxCapacity: aws.Int64(l.ReadMaxCapacity),
if _, err := svc.RegisterScalableTarget(ctx, &applicationautoscaling.RegisterScalableTargetInput{
MinCapacity: aws.Int32(l.ReadMinCapacity),
MaxCapacity: aws.Int32(l.ReadMaxCapacity),
ResourceId: aws.String(p.resourceID),
ScalableDimension: aws.String(p.readDimension),
ServiceNamespace: aws.String(applicationautoscaling.ServiceNamespaceDynamodb),
ScalableDimension: p.readDimension,
ServiceNamespace: autoscalingtypes.ServiceNamespaceDynamodb,
}); err != nil {
return trace.Wrap(convertError(err))
}
if _, err := svc.RegisterScalableTargetWithContext(ctx, &applicationautoscaling.RegisterScalableTargetInput{
MinCapacity: aws.Int64(l.WriteMinCapacity),
MaxCapacity: aws.Int64(l.WriteMaxCapacity),
if _, err := svc.RegisterScalableTarget(ctx, &applicationautoscaling.RegisterScalableTargetInput{
MinCapacity: aws.Int32(l.WriteMinCapacity),
MaxCapacity: aws.Int32(l.WriteMaxCapacity),
ResourceId: aws.String(p.resourceID),
ScalableDimension: aws.String(p.writeDimension),
ServiceNamespace: aws.String(applicationautoscaling.ServiceNamespaceDynamodb),
ScalableDimension: p.writeDimension,
ServiceNamespace: autoscalingtypes.ServiceNamespaceDynamodb,
}); err != nil {
return trace.Wrap(convertError(err))
}
// Define scaling policy. Defines the ratio of {read,write} consumed capacity to
// provisioned capacity DynamoDB will try and maintain.
if _, err := svc.PutScalingPolicyWithContext(ctx, &applicationautoscaling.PutScalingPolicyInput{
if _, err := svc.PutScalingPolicy(ctx, &applicationautoscaling.PutScalingPolicyInput{
PolicyName: aws.String(p.readPolicy),
PolicyType: aws.String(applicationautoscaling.PolicyTypeTargetTrackingScaling),
PolicyType: autoscalingtypes.PolicyTypeTargetTrackingScaling,
ResourceId: aws.String(p.resourceID),
ScalableDimension: aws.String(p.readDimension),
ServiceNamespace: aws.String(applicationautoscaling.ServiceNamespaceDynamodb),
TargetTrackingScalingPolicyConfiguration: &applicationautoscaling.TargetTrackingScalingPolicyConfiguration{
PredefinedMetricSpecification: &applicationautoscaling.PredefinedMetricSpecification{
PredefinedMetricType: aws.String(applicationautoscaling.MetricTypeDynamoDbreadCapacityUtilization),
ScalableDimension: p.readDimension,
ServiceNamespace: autoscalingtypes.ServiceNamespaceDynamodb,
TargetTrackingScalingPolicyConfiguration: &autoscalingtypes.TargetTrackingScalingPolicyConfiguration{
PredefinedMetricSpecification: &autoscalingtypes.PredefinedMetricSpecification{
PredefinedMetricType: autoscalingtypes.MetricTypeDynamoDBReadCapacityUtilization,
},
TargetValue: aws.Float64(l.ReadTargetValue),
},
@ -440,15 +453,15 @@ func (l *Log) configureTable(ctx context.Context) error {
return trace.Wrap(convertError(err))
}
if _, err := svc.PutScalingPolicyWithContext(ctx, &applicationautoscaling.PutScalingPolicyInput{
if _, err := svc.PutScalingPolicy(ctx, &applicationautoscaling.PutScalingPolicyInput{
PolicyName: aws.String(p.writePolicy),
PolicyType: aws.String(applicationautoscaling.PolicyTypeTargetTrackingScaling),
PolicyType: autoscalingtypes.PolicyTypeTargetTrackingScaling,
ResourceId: aws.String(p.resourceID),
ScalableDimension: aws.String(p.writeDimension),
ServiceNamespace: aws.String(applicationautoscaling.ServiceNamespaceDynamodb),
TargetTrackingScalingPolicyConfiguration: &applicationautoscaling.TargetTrackingScalingPolicyConfiguration{
PredefinedMetricSpecification: &applicationautoscaling.PredefinedMetricSpecification{
PredefinedMetricType: aws.String(applicationautoscaling.MetricTypeDynamoDbwriteCapacityUtilization),
ScalableDimension: p.writeDimension,
ServiceNamespace: autoscalingtypes.ServiceNamespaceDynamodb,
TargetTrackingScalingPolicyConfiguration: &autoscalingtypes.TargetTrackingScalingPolicyConfiguration{
PredefinedMetricSpecification: &autoscalingtypes.PredefinedMetricSpecification{
PredefinedMetricType: autoscalingtypes.MetricTypeDynamoDBWriteCapacityUtilization,
},
TargetValue: aws.Float64(l.WriteTargetValue),
},
@ -543,7 +556,7 @@ func (l *Log) putAuditEvent(ctx context.Context, sessionID string, in apievents.
return trace.Wrap(err)
}
if _, err = l.svc.PutItemWithContext(ctx, input); err != nil {
if _, err = l.svc.PutItem(ctx, input); err != nil {
err = convertError(err)
switch {
@ -584,7 +597,7 @@ func (l *Log) createPutItem(sessionID string, in apievents.AuditEvent) (*dynamod
CreatedAtDate: in.GetTime().Format(iso8601DateFormat),
}
l.setExpiry(&e)
av, err := dynamodbattribute.MarshalMap(e)
av, err := attributevalue.MarshalMap(e)
if err != nil {
return nil, trace.Wrap(err)
}
@ -638,7 +651,7 @@ type checkpointKey struct {
Date string `json:"date,omitempty"`
// A DynamoDB query iterator. Allows us to resume a partial query.
Iterator map[string]*dynamodb.AttributeValue `json:"iterator,omitempty"`
Iterator string `json:"iterator,omitempty"`
// EventKey is a derived identifier for an event used for resuming
// sub-page breaks due to size constraints.
@ -768,11 +781,11 @@ func (l *Log) searchEventsRaw(ctx context.Context, fromUTC, toUTC time.Time, nam
}
indexName := aws.String(indexTimeSearchV2)
var left int64
var left int32
if limit != 0 {
left = int64(limit)
left = int32(limit)
} else {
left = math.MaxInt64
left = math.MaxInt32
}
// Resume scanning at the correct date. We need to do this because we send individual queries per date
@ -861,11 +874,11 @@ func GetCreatedAtFromStartKey(startKey string) (time.Time, error) {
if err != nil {
return time.Time{}, trace.Wrap(err)
}
if checkpoint.Iterator == nil {
if checkpoint.Iterator == "" {
return time.Time{}, errors.New("missing iterator")
}
var e event
if err := dynamodbattribute.UnmarshalMap(checkpoint.Iterator, &e); err != nil {
if err := json.Unmarshal([]byte(checkpoint.Iterator), &e); err != nil {
return time.Time{}, trace.Wrap(err)
}
if e.CreatedAt <= 0 {
@ -884,7 +897,7 @@ func getCheckpointFromStartKey(startKey string) (checkpointKey, error) {
}
// If a checkpoint key is provided, unmarshal it so we can work with it's parts.
if err := json.Unmarshal([]byte(startKey), &checkpoint); err != nil {
return checkpoint, trace.Wrap(err)
return checkpointKey{}, trace.Wrap(err)
}
return checkpoint, nil
}
@ -1015,7 +1028,7 @@ func fromWhereExpr(cond *types.WhereExpr, params *condFilterParams) (string, err
// getTableStatus checks if a given table exists
func (l *Log) getTableStatus(ctx context.Context, tableName string) (tableStatus, error) {
_, err := l.svc.DescribeTableWithContext(ctx, &dynamodb.DescribeTableInput{
_, err := l.svc.DescribeTable(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
err = convertError(err)
@ -1030,7 +1043,7 @@ func (l *Log) getTableStatus(ctx context.Context, tableName string) (tableStatus
// indexExists checks if a given index exists on a given table and that it is active or updating.
func (l *Log) indexExists(ctx context.Context, tableName, indexName string) (bool, error) {
tableDescription, err := l.svc.DescribeTableWithContext(ctx, &dynamodb.DescribeTableInput{
tableDescription, err := l.svc.DescribeTable(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
if err != nil {
@ -1038,7 +1051,7 @@ func (l *Log) indexExists(ctx context.Context, tableName, indexName string) (boo
}
for _, gsi := range tableDescription.Table.GlobalSecondaryIndexes {
if *gsi.IndexName == indexName && (*gsi.IndexStatus == dynamodb.IndexStatusActive || *gsi.IndexStatus == dynamodb.IndexStatusUpdating) {
if *gsi.IndexName == indexName && (gsi.IndexStatus == dynamodbtypes.IndexStatusActive || gsi.IndexStatus == dynamodbtypes.IndexStatusUpdating) {
return true, nil
}
}
@ -1052,18 +1065,18 @@ func (l *Log) indexExists(ctx context.Context, tableName, indexName string) (boo
// currently is always set to "FullPath" (used to be something else, that's
// why it's a parameter for migration purposes)
func (l *Log) createTable(ctx context.Context, tableName string) error {
provisionedThroughput := dynamodb.ProvisionedThroughput{
provisionedThroughput := dynamodbtypes.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(l.ReadCapacityUnits),
WriteCapacityUnits: aws.Int64(l.WriteCapacityUnits),
}
elems := []*dynamodb.KeySchemaElement{
elems := []dynamodbtypes.KeySchemaElement{
{
AttributeName: aws.String(keySessionID),
KeyType: aws.String("HASH"),
KeyType: dynamodbtypes.KeyTypeHash,
},
{
AttributeName: aws.String(keyEventIndex),
KeyType: aws.String("RANGE"),
KeyType: dynamodbtypes.KeyTypeRange,
},
}
c := dynamodb.CreateTableInput{
@ -1071,38 +1084,41 @@ func (l *Log) createTable(ctx context.Context, tableName string) error {
AttributeDefinitions: tableSchema,
KeySchema: elems,
ProvisionedThroughput: &provisionedThroughput,
GlobalSecondaryIndexes: []*dynamodb.GlobalSecondaryIndex{
GlobalSecondaryIndexes: []dynamodbtypes.GlobalSecondaryIndex{
{
IndexName: aws.String(indexTimeSearchV2),
KeySchema: []*dynamodb.KeySchemaElement{
KeySchema: []dynamodbtypes.KeySchemaElement{
{
// Partition by date instead of namespace.
AttributeName: aws.String(keyDate),
KeyType: aws.String("HASH"),
KeyType: dynamodbtypes.KeyTypeHash,
},
{
AttributeName: aws.String(keyCreatedAt),
KeyType: aws.String("RANGE"),
KeyType: dynamodbtypes.KeyTypeRange,
},
},
Projection: &dynamodb.Projection{
ProjectionType: aws.String("ALL"),
Projection: &dynamodbtypes.Projection{
ProjectionType: dynamodbtypes.ProjectionTypeAll,
},
ProvisionedThroughput: &provisionedThroughput,
},
},
}
_, err := l.svc.CreateTableWithContext(ctx, &c)
_, err := l.svc.CreateTable(ctx, &c)
if err != nil {
return trace.Wrap(err)
}
log.Infof("Waiting until table %q is created", tableName)
err = l.svc.WaitUntilTableExistsWithContext(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
waiter := dynamodb.NewTableExistsWaiter(l.svc)
err = waiter.Wait(ctx,
&dynamodb.DescribeTableInput{TableName: aws.String(tableName)},
10*time.Minute,
)
if err == nil {
log.Infof("Table %q has been created", tableName)
}
return trace.Wrap(err)
}
@ -1113,15 +1129,15 @@ func (l *Log) Close() error {
// deleteAllItems deletes all items from the database, used in tests
func (l *Log) deleteAllItems(ctx context.Context) error {
out, err := l.svc.ScanWithContext(ctx, &dynamodb.ScanInput{TableName: aws.String(l.Tablename)})
out, err := l.svc.Scan(ctx, &dynamodb.ScanInput{TableName: aws.String(l.Tablename)})
if err != nil {
return trace.Wrap(err)
}
var requests []*dynamodb.WriteRequest
var requests []dynamodbtypes.WriteRequest
for _, item := range out.Items {
requests = append(requests, &dynamodb.WriteRequest{
DeleteRequest: &dynamodb.DeleteRequest{
Key: map[string]*dynamodb.AttributeValue{
requests = append(requests, dynamodbtypes.WriteRequest{
DeleteRequest: &dynamodbtypes.DeleteRequest{
Key: map[string]dynamodbtypes.AttributeValue{
keySessionID: item[keySessionID],
keyEventIndex: item[keyEventIndex],
},
@ -1137,8 +1153,8 @@ func (l *Log) deleteAllItems(ctx context.Context) error {
chunk := requests[:top]
requests = requests[top:]
_, err := l.svc.BatchWriteItemWithContext(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{
_, err := l.svc.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]dynamodbtypes.WriteRequest{
l.Tablename: chunk,
},
})
@ -1154,15 +1170,20 @@ func (l *Log) deleteAllItems(ctx context.Context) error {
// deleteTable deletes DynamoDB table with a given name
func (l *Log) deleteTable(ctx context.Context, tableName string, wait bool) error {
tn := aws.String(tableName)
_, err := l.svc.DeleteTableWithContext(ctx, &dynamodb.DeleteTableInput{TableName: tn})
_, err := l.svc.DeleteTable(ctx, &dynamodb.DeleteTableInput{TableName: tn})
if err != nil {
return trace.Wrap(err)
}
if wait {
return trace.Wrap(
l.svc.WaitUntilTableNotExistsWithContext(ctx, &dynamodb.DescribeTableInput{TableName: tn}))
if !wait {
return nil
}
return nil
waiter := dynamodb.NewTableNotExistsWaiter(l.svc)
return trace.Wrap(waiter.Wait(ctx,
&dynamodb.DescribeTableInput{TableName: tn},
10*time.Minute,
))
}
var errAWSValidation = errors.New("aws validation error")
@ -1171,34 +1192,47 @@ func convertError(err error) error {
if err == nil {
return nil
}
var aerr awserr.Error
if !errors.As(err, &aerr) {
return err
var conditionalCheckFailedError *dynamodbtypes.ConditionalCheckFailedException
if errors.As(err, &conditionalCheckFailedError) {
return trace.AlreadyExists(conditionalCheckFailedError.ErrorMessage())
}
switch aerr.Code() {
case dynamodb.ErrCodeConditionalCheckFailedException:
return trace.AlreadyExists(aerr.Error())
case dynamodb.ErrCodeProvisionedThroughputExceededException:
return trace.ConnectionProblem(aerr, aerr.Error())
case dynamodb.ErrCodeResourceNotFoundException:
return trace.NotFound(aerr.Error())
case dynamodb.ErrCodeItemCollectionSizeLimitExceededException:
return trace.BadParameter(aerr.Error())
case dynamodb.ErrCodeInternalServerError:
return trace.BadParameter(aerr.Error())
case ErrValidationException:
// A ValidationException type is missing from AWS SDK.
// Use errAWSValidation that for most cases will contain:
// "Item size has exceeded the maximum allowed size" AWS validation error.
return trace.Wrap(errAWSValidation, aerr.Error())
default:
return err
var throughputExceededError *dynamodbtypes.ProvisionedThroughputExceededException
if errors.As(err, &throughputExceededError) {
return trace.ConnectionProblem(throughputExceededError, throughputExceededError.ErrorMessage())
}
var notFoundError *dynamodbtypes.ResourceNotFoundException
if errors.As(err, &notFoundError) {
return trace.NotFound(notFoundError.ErrorMessage())
}
var collectionLimitExceededError *dynamodbtypes.ItemCollectionSizeLimitExceededException
if errors.As(err, &notFoundError) {
return trace.BadParameter(collectionLimitExceededError.ErrorMessage())
}
var internalError *dynamodbtypes.InternalServerError
if errors.As(err, &internalError) {
return trace.BadParameter(internalError.ErrorMessage())
}
var ae smithy.APIError
if errors.As(err, &ae) {
if ae.ErrorCode() == ErrValidationException {
// A ValidationException type is missing from AWS SDK.
// Use errAWSValidation that for most cases will contain:
// "Item size has exceeded the maximum allowed size" AWS validation error.
return trace.Wrap(errAWSValidation, ae.Error())
}
}
return err
}
type query interface {
QueryWithContext(ctx context.Context, input *dynamodb.QueryInput, opts ...request.Option) (*dynamodb.QueryOutput, error)
Query(ctx context.Context, params *dynamodb.QueryInput, optFns ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error)
}
type eventsFetcher struct {
@ -1210,7 +1244,7 @@ type eventsFetcher struct {
checkpoint *checkpointKey
foundStart bool
dates []string
left int64
left int32
fromUTC time.Time
toUTC time.Time
@ -1221,13 +1255,26 @@ type eventsFetcher struct {
}
func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeftFun func() bool) ([]event, bool, error) {
var out []event
oldIterator := l.checkpoint.Iterator
l.checkpoint.Iterator = output.LastEvaluatedKey
l.checkpoint.Iterator = ""
if output.LastEvaluatedKey != nil {
m := make(map[string]any)
if err := attributevalue.UnmarshalMap(output.LastEvaluatedKey, &m); err != nil {
return nil, false, trace.Wrap(err)
}
iter, err := json.Marshal(&m)
if err != nil {
return nil, false, err
}
l.checkpoint.Iterator = string(iter)
}
var out []event
for _, item := range output.Items {
var e event
if err := dynamodbattribute.UnmarshalMap(item, &e); err != nil {
if err := attributevalue.UnmarshalMap(item, &e); err != nil {
return nil, false, trace.WrapWithMessage(err, "failed to unmarshal event")
}
data, err := json.Marshal(e.FieldsMap)
@ -1272,7 +1319,7 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft
if hasLeftFun != nil {
hf = hasLeftFun()
}
l.hasLeft = hf || len(l.checkpoint.Iterator) != 0
l.hasLeft = hf || l.checkpoint.Iterator != ""
l.checkpoint.EventKey = ""
return out, true, nil
}
@ -1282,10 +1329,6 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft
func (l *eventsFetcher) QueryByDateIndex(ctx context.Context, filterExpr *string) (values []event, err error) {
query := "CreatedAtDate = :date AND CreatedAt BETWEEN :start and :end"
var attributeNames map[string]*string
if len(l.filter.condParams.attrNames) > 0 {
attributeNames = aws.StringMap(l.filter.condParams.attrNames)
}
dateLoop:
for i, date := range l.dates {
@ -1300,7 +1343,7 @@ dateLoop:
attributes[fmt.Sprintf(":eventType%d", i)] = eventType
}
maps.Copy(attributes, l.filter.condParams.attrValues)
attributeValues, err := dynamodbattribute.MarshalMap(attributes)
attributeValues, err := attributevalue.MarshalMap(attributes)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1308,16 +1351,29 @@ dateLoop:
input := dynamodb.QueryInput{
KeyConditionExpression: aws.String(query),
TableName: aws.String(l.tableName),
ExpressionAttributeNames: attributeNames,
ExpressionAttributeNames: l.filter.condParams.attrNames,
ExpressionAttributeValues: attributeValues,
IndexName: aws.String(indexTimeSearchV2),
ExclusiveStartKey: l.checkpoint.Iterator,
Limit: aws.Int64(l.left),
Limit: aws.Int32(l.left),
FilterExpression: filterExpr,
ScanIndexForward: aws.Bool(l.forward),
}
if l.checkpoint.Iterator != "" {
m := make(map[string]any)
err = json.Unmarshal([]byte(l.checkpoint.Iterator), &m)
if err != nil {
return nil, trace.Wrap(err)
}
input.ExclusiveStartKey, err = attributevalue.MarshalMap(&m)
if err != nil {
return nil, trace.Wrap(err)
}
}
start := time.Now()
out, err := l.api.QueryWithContext(ctx, &input)
out, err := l.api.Query(ctx, &input)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1346,12 +1402,12 @@ dateLoop:
// from the same date and the request's iterator to fetch the remainder of the page.
// If the input iterator is empty but the EventKey is not, we need to resume the query from the same date
// and we shouldn't move to the next date.
if i < len(l.dates)-1 && len(l.checkpoint.Iterator) == 0 && l.checkpoint.EventKey == "" {
if i < len(l.dates)-1 && l.checkpoint.Iterator == "" && l.checkpoint.EventKey == "" {
l.checkpoint.Date = l.dates[i+1]
}
return values, nil
}
if len(l.checkpoint.Iterator) == 0 {
if l.checkpoint.Iterator == "" {
continue dateLoop
}
}
@ -1361,10 +1417,6 @@ dateLoop:
func (l *eventsFetcher) QueryBySessionIDIndex(ctx context.Context, sessionID string, filterExpr *string) (values []event, err error) {
query := "SessionID = :id"
var attributeNames map[string]*string
if len(l.filter.condParams.attrNames) > 0 {
attributeNames = aws.StringMap(l.filter.condParams.attrNames)
}
attributes := map[string]interface{}{
":id": sessionID,
@ -1374,23 +1426,35 @@ func (l *eventsFetcher) QueryBySessionIDIndex(ctx context.Context, sessionID str
}
maps.Copy(attributes, l.filter.condParams.attrValues)
attributeValues, err := dynamodbattribute.MarshalMap(attributes)
attributeValues, err := attributevalue.MarshalMap(attributes)
if err != nil {
return nil, trace.Wrap(err)
}
input := dynamodb.QueryInput{
KeyConditionExpression: aws.String(query),
TableName: aws.String(l.tableName),
ExpressionAttributeNames: attributeNames,
ExpressionAttributeNames: l.filter.condParams.attrNames,
ExpressionAttributeValues: attributeValues,
IndexName: nil, // Use primary SessionID index.
ExclusiveStartKey: l.checkpoint.Iterator,
Limit: aws.Int64(l.left),
Limit: aws.Int32(l.left),
FilterExpression: filterExpr,
ScanIndexForward: aws.Bool(l.forward),
}
if l.checkpoint.Iterator != "" {
m := make(map[string]string)
if err = json.Unmarshal([]byte(l.checkpoint.Iterator), &m); err != nil {
return nil, trace.Wrap(err)
}
input.ExclusiveStartKey, err = attributevalue.MarshalMap(&m)
if err != nil {
return nil, trace.Wrap(err)
}
}
start := time.Now()
out, err := l.api.QueryWithContext(ctx, &input)
out, err := l.api.Query(ctx, &input)
if err != nil {
return nil, trace.Wrap(err)
}

View file

@ -23,6 +23,8 @@ import (
"encoding/json"
"fmt"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strconv"
@ -65,7 +67,6 @@ func setupDynamoContext(t *testing.T) *dynamoContext {
fakeClock := clockwork.NewFakeClockAt(time.Now().UTC())
log, err := New(context.Background(), Config{
Region: "us-east-1",
Tablename: fmt.Sprintf("teleport-test-%v", uuid.New().String()),
Clock: fakeClock,
UIDGenerator: utils.NewFakeUID(),
@ -589,3 +590,25 @@ func randStringAlpha(n int) string {
}
return string(b)
}
func TestCustomEndpoint(t *testing.T) {
ctx := context.Background()
t.Setenv("AWS_ACCESS_KEY", "llama")
t.Setenv("AWS_SECRET_KEY", "alpaca")
mux := http.NewServeMux()
mux.HandleFunc("/*", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusTeapot)
})
srv := httptest.NewServer(mux)
defer srv.Close()
b, err := New(ctx, Config{
Tablename: "teleport-test",
UIDGenerator: utils.NewFakeUID(),
Endpoint: srv.URL,
})
assert.Error(t, err)
assert.Nil(t, b)
require.Contains(t, err.Error(), fmt.Sprintf("StatusCode: %d", http.StatusTeapot))
}

View file

@ -34,7 +34,6 @@ import (
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/fixtures"
"github.com/gravitational/teleport/lib/session"
@ -283,20 +282,16 @@ func (s *EventsSuite) SessionEventsCRUD(t *testing.T) {
var history []apievents.AuditEvent
ctx := context.Background()
err = retryutils.RetryStaticFor(time.Minute*5, time.Second*5, func() error {
require.EventuallyWithT(t, func(t *assert.CollectT) {
history, _, err = s.Log.SearchEvents(ctx, events.SearchEventsRequest{
From: loginTime.Add(-1 * time.Hour),
To: loginTime.Add(time.Hour),
Limit: 100,
Order: types.EventOrderAscending,
})
if err != nil {
t.Logf("Retrying searching of events because of: %v", err)
}
return err
})
require.NoError(t, err)
require.Len(t, history, 1)
assert.NoError(t, err)
assert.Len(t, history, 1)
}, 10*time.Second, 500*time.Millisecond)
// start the session and emit data stream to it and wrap it up
sessionID := session.NewID()
@ -339,20 +334,17 @@ func (s *EventsSuite) SessionEventsCRUD(t *testing.T) {
require.NoError(t, err)
// search for the session event.
err = retryutils.RetryStaticFor(time.Minute*5, time.Second*5, func() error {
require.EventuallyWithT(t, func(t *assert.CollectT) {
history, _, err = s.Log.SearchEvents(ctx, events.SearchEventsRequest{
From: s.Clock.Now().UTC().Add(-1 * time.Hour),
To: s.Clock.Now().UTC().Add(time.Hour),
Limit: 100,
Order: types.EventOrderAscending,
})
if err != nil {
t.Logf("Retrying searching of events because of: %v", err)
}
return err
})
require.NoError(t, err)
require.Len(t, history, 3)
assert.NoError(t, err)
assert.Len(t, history, 3)
}, 10*time.Second, 500*time.Millisecond)
require.Equal(t, events.SessionStartEvent, history[1].GetType())
require.Equal(t, events.SessionEndEvent, history[2].GetType())

View file

@ -1755,11 +1755,11 @@ func (process *TeleportProcess) initAuthExternalAuditLog(auditConfig types.Clust
Region: auditConfig.Region(),
EnableContinuousBackups: auditConfig.EnableContinuousBackups(),
EnableAutoScaling: auditConfig.EnableAutoScaling(),
ReadMinCapacity: auditConfig.ReadMinCapacity(),
ReadMaxCapacity: auditConfig.ReadMaxCapacity(),
ReadMinCapacity: int32(auditConfig.ReadMinCapacity()),
ReadMaxCapacity: int32(auditConfig.ReadMaxCapacity()),
ReadTargetValue: auditConfig.ReadTargetValue(),
WriteMinCapacity: auditConfig.WriteMinCapacity(),
WriteMaxCapacity: auditConfig.WriteMaxCapacity(),
WriteMinCapacity: int32(auditConfig.WriteMinCapacity()),
WriteMaxCapacity: int32(auditConfig.WriteMaxCapacity()),
WriteTargetValue: auditConfig.WriteTargetValue(),
RetentionPeriod: auditConfig.RetentionPeriod(),
UseFIPSEndpoint: auditConfig.GetUseFIPSEndpoint(),