athena audit logs - add migration script (#27099)

This commit is contained in:
Tobiasz Heller 2023-06-22 12:45:47 +02:00 committed by GitHub
parent a82f2d8587
commit c5486d15a7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 1637 additions and 29 deletions

View file

@ -0,0 +1,90 @@
# Dynamomigration tool
Dynamomigration tool allows to export Teleport audit events logs from DynamoDB
table into Athena Audit log.
It's using DynamoDB export to S3 to export data.
Requirements:
* Point-in-time recovery (PITR) on DynamoDB table
* Writable filesystem on machine where script will be executed
* IAM permissions:
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowDynamoExportAndList",
"Effect": "Allow",
"Action": [
"dynamodb:ExportTableToPointInTime"
],
"Resource": "arn:aws:dynamodb:region:account:table/tablename"
},
{
"Sid": "AllowDynamoExportDescribe",
"Effect": "Allow",
"Action": [
"dynamodb:DescribeExport"
],
"Resource": "arn:aws:dynamodb:region:account:table/tablename/*"
},
{
"Sid": "AllowWriteReadDestinationBucket",
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject"
],
"Resource": "arn:aws:s3:::export-bucket/*"
},
{
"Sid": "AllowWriteLargePayloadsBucket",
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:PutObjectAcl"
],
"Resource": "arn:aws:s3:::large-payloads-bucket/*"
},
{
"Sid": "AllowPublishToAthenaTopic",
"Effect": "Allow",
"Action": [
"sns:Publish"
],
"Resource": "arn:aws:sns:region:account:topicname"
}
]
}
```
## Example usage
Build: `cd examples/dynamoathenamigration/cmd && go build -o dynamoathenamigration`.
It is recommended to test export first using `-dryRun` flag. DryRun does not emit any events,
it makes sure that export is in valid format and events can be parsed.
Dry run example:
```shell
./dynamoathenamigration -dynamoARN='arn:aws:dynamodb:region:account:table/tablename' \
-exportPath='s3://bucket/prefix' \
-dryRun
```
Full migration:
```shell
./dynamoathenamigration -dynamoARN='arn:aws:dynamodb:region:account:table/tablename' \
-exportPath='s3://bucket/prefix' \
-snsTopicARN=arn:aws:sns:region:account:topicname \
-largePayloadsPath=s3://bucket/prefix
```
To reuse existing export without triggering new one, use `-exportARN=xxx`.

View file

@ -0,0 +1,104 @@
// Copyright 2023 Gravitational, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"flag"
"net/url"
"os"
"os/signal"
"strings"
"time"
log "github.com/sirupsen/logrus"
"github.com/gravitational/teleport/examples/dynamoathenamigration"
)
func main() {
timeStr := flag.String("exportTime", "", "exportTime is time (RFC3339 format) in the past from which to export table data, empty for the current time")
exportARN := flag.String("exportARN", "", "exportARN allows to reuse already finished export without triggering new")
dynamoARN := flag.String("dynamoARN", "", "ARN of DynamoDB table to export")
s3exportPath := flag.String("exportPath", "", "S3 address where export should be placed, in format s3://bucket/optional_prefix")
snsTopicARN := flag.String("snsTopicARN", "", "SNS topic ARN configured in athena logger")
s3largePayloadsPath := flag.String("largePayloadsPath", "", "S3 address configured in athena logger for placing large events payloads, in format s3://bucket/optional_prefix")
dryRun := flag.Bool("dryRun", false, "dryRun means export will be triggered and verified, however events won't be published on SNS topic")
noOfEmitWorker := flag.Int("noOfEmitWorker", 5, "noOfEmitWorker defines number of workers emitting events to athena logger")
checkpointPath := flag.String("checkpointPath", "", "checkpointPath defines where checkpoint file will be stored")
exportLocalDir := flag.String("exportLocalDir", "", "exportLocalDir defines directory where export will be downloaded")
debug := flag.Bool("d", false, "debug logs")
flag.Parse()
level := log.InfoLevel
if *debug {
level = log.DebugLevel
}
logger := log.New()
logger.SetLevel(level)
cfg := dynamoathenamigration.Config{
ExportARN: *exportARN,
DynamoTableARN: *dynamoARN,
DryRun: *dryRun,
NoOfEmitWorkers: *noOfEmitWorker,
TopicARN: *snsTopicARN,
ExportLocalDir: *exportLocalDir,
Logger: logger,
}
var err error
if *timeStr != "" {
cfg.ExportTime, err = time.Parse(time.RFC3339, *timeStr)
if err != nil {
logger.Fatal(err)
}
}
if *s3exportPath != "" {
u, err := url.Parse(*s3exportPath)
if err != nil {
logger.Fatal(err)
}
if u.Scheme != "s3" {
logger.Fatal("invalid scheme for s3 export path")
}
cfg.Bucket = u.Host
cfg.Prefix = strings.TrimSuffix(strings.TrimPrefix(u.Path, "/"), "/")
}
if *s3largePayloadsPath != "" {
u, err := url.Parse(*s3largePayloadsPath)
if err != nil {
logger.Fatal(err)
}
if u.Scheme != "s3" {
logger.Fatal("invalid scheme for s3 large payloads path")
}
cfg.LargePayloadBucket = u.Host
cfg.LargePayloadPrefix = strings.TrimSuffix(strings.TrimPrefix(u.Path, "/"), "/")
}
if *checkpointPath != "" {
cfg.CheckpointPath = *checkpointPath
}
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
if err := dynamoathenamigration.Migrate(ctx, cfg); err != nil {
logger.Fatal(err)
}
}

View file

@ -0,0 +1,370 @@
// Copyright 2023 Gravitational, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dynamoathenamigration
import (
"encoding/base64"
"fmt"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
// awsAwsjson10_deserializeDocumentAttributeMap is copied from aws-sdk-go-v2/service/dynamodb/deserializers.go
// AWS SDK does not expose fn to convert from dynamo json export format to types.AttributeValue.
func awsAwsjson10_deserializeDocumentAttributeMap(v *map[string]types.AttributeValue, value interface{}) error {
if v == nil {
return fmt.Errorf("unexpected nil of type %T", v)
}
if value == nil {
return nil
}
shape, ok := value.(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected JSON type %v", value)
}
var mv map[string]types.AttributeValue
if *v == nil {
mv = map[string]types.AttributeValue{}
} else {
mv = *v
}
for key, value := range shape {
var parsedVal types.AttributeValue
mapVar := parsedVal
if err := awsAwsjson10_deserializeDocumentAttributeValue(&mapVar, value); err != nil {
return err
}
parsedVal = mapVar
mv[key] = parsedVal
}
*v = mv
return nil
}
func awsAwsjson10_deserializeDocumentAttributeValue(v *types.AttributeValue, value interface{}) error {
if v == nil {
return fmt.Errorf("unexpected nil of type %T", v)
}
if value == nil {
return nil
}
shape, ok := value.(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected JSON type %v", value)
}
var uv types.AttributeValue
loop:
for key, value := range shape {
if value == nil {
continue
}
switch key {
case "B":
var mv []byte
if value != nil {
jtv, ok := value.(string)
if !ok {
return fmt.Errorf("expected BinaryAttributeValue to be []byte, got %T instead", value)
}
dv, err := base64.StdEncoding.DecodeString(jtv)
if err != nil {
return fmt.Errorf("failed to base64 decode BinaryAttributeValue, %w", err)
}
mv = dv
}
uv = &types.AttributeValueMemberB{Value: mv}
break loop
case "BOOL":
var mv bool
if value != nil {
jtv, ok := value.(bool)
if !ok {
return fmt.Errorf("expected BooleanAttributeValue to be of type *bool, got %T instead", value)
}
mv = jtv
}
uv = &types.AttributeValueMemberBOOL{Value: mv}
break loop
case "BS":
var mv [][]byte
if err := awsAwsjson10_deserializeDocumentBinarySetAttributeValue(&mv, value); err != nil {
return err
}
uv = &types.AttributeValueMemberBS{Value: mv}
break loop
case "L":
var mv []types.AttributeValue
if err := awsAwsjson10_deserializeDocumentListAttributeValue(&mv, value); err != nil {
return err
}
uv = &types.AttributeValueMemberL{Value: mv}
break loop
case "M":
var mv map[string]types.AttributeValue
if err := awsAwsjson10_deserializeDocumentMapAttributeValue(&mv, value); err != nil {
return err
}
uv = &types.AttributeValueMemberM{Value: mv}
break loop
case "N":
var mv string
if value != nil {
jtv, ok := value.(string)
if !ok {
return fmt.Errorf("expected NumberAttributeValue to be of type string, got %T instead", value)
}
mv = jtv
}
uv = &types.AttributeValueMemberN{Value: mv}
break loop
case "NS":
var mv []string
if err := awsAwsjson10_deserializeDocumentNumberSetAttributeValue(&mv, value); err != nil {
return err
}
uv = &types.AttributeValueMemberNS{Value: mv}
break loop
case "NULL":
var mv bool
if value != nil {
jtv, ok := value.(bool)
if !ok {
return fmt.Errorf("expected NullAttributeValue to be of type *bool, got %T instead", value)
}
mv = jtv
}
uv = &types.AttributeValueMemberNULL{Value: mv}
break loop
case "S":
var mv string
if value != nil {
jtv, ok := value.(string)
if !ok {
return fmt.Errorf("expected StringAttributeValue to be of type string, got %T instead", value)
}
mv = jtv
}
uv = &types.AttributeValueMemberS{Value: mv}
break loop
case "SS":
var mv []string
if err := awsAwsjson10_deserializeDocumentStringSetAttributeValue(&mv, value); err != nil {
return err
}
uv = &types.AttributeValueMemberSS{Value: mv}
break loop
default:
uv = &types.UnknownUnionMember{Tag: key}
break loop
}
}
*v = uv
return nil
}
func awsAwsjson10_deserializeDocumentStringSetAttributeValue(v *[]string, value interface{}) error {
if v == nil {
return fmt.Errorf("unexpected nil of type %T", v)
}
if value == nil {
return nil
}
shape, ok := value.([]interface{})
if !ok {
return fmt.Errorf("unexpected JSON type %v", value)
}
var cv []string
if *v == nil {
cv = []string{}
} else {
cv = *v
}
for _, value := range shape {
var col string
if value != nil {
jtv, ok := value.(string)
if !ok {
return fmt.Errorf("expected StringAttributeValue to be of type string, got %T instead", value)
}
col = jtv
}
cv = append(cv, col)
}
*v = cv
return nil
}
func awsAwsjson10_deserializeDocumentNumberSetAttributeValue(v *[]string, value interface{}) error {
if v == nil {
return fmt.Errorf("unexpected nil of type %T", v)
}
if value == nil {
return nil
}
shape, ok := value.([]interface{})
if !ok {
return fmt.Errorf("unexpected JSON type %v", value)
}
var cv []string
if *v == nil {
cv = []string{}
} else {
cv = *v
}
for _, value := range shape {
var col string
if value != nil {
jtv, ok := value.(string)
if !ok {
return fmt.Errorf("expected NumberAttributeValue to be of type string, got %T instead", value)
}
col = jtv
}
cv = append(cv, col)
}
*v = cv
return nil
}
func awsAwsjson10_deserializeDocumentBinarySetAttributeValue(v *[][]byte, value interface{}) error {
if v == nil {
return fmt.Errorf("unexpected nil of type %T", v)
}
if value == nil {
return nil
}
shape, ok := value.([]interface{})
if !ok {
return fmt.Errorf("unexpected JSON type %v", value)
}
var cv [][]byte
if *v == nil {
cv = [][]byte{}
} else {
cv = *v
}
for _, value := range shape {
var col []byte
if value != nil {
jtv, ok := value.(string)
if !ok {
return fmt.Errorf("expected BinaryAttributeValue to be []byte, got %T instead", value)
}
dv, err := base64.StdEncoding.DecodeString(jtv)
if err != nil {
return fmt.Errorf("failed to base64 decode BinaryAttributeValue, %w", err)
}
col = dv
}
cv = append(cv, col)
}
*v = cv
return nil
}
func awsAwsjson10_deserializeDocumentMapAttributeValue(v *map[string]types.AttributeValue, value interface{}) error {
if v == nil {
return fmt.Errorf("unexpected nil of type %T", v)
}
if value == nil {
return nil
}
shape, ok := value.(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected JSON type %v", value)
}
var mv map[string]types.AttributeValue
if *v == nil {
mv = map[string]types.AttributeValue{}
} else {
mv = *v
}
for key, value := range shape {
var parsedVal types.AttributeValue
mapVar := parsedVal
if err := awsAwsjson10_deserializeDocumentAttributeValue(&mapVar, value); err != nil {
return err
}
parsedVal = mapVar
mv[key] = parsedVal
}
*v = mv
return nil
}
func awsAwsjson10_deserializeDocumentListAttributeValue(v *[]types.AttributeValue, value interface{}) error {
if v == nil {
return fmt.Errorf("unexpected nil of type %T", v)
}
if value == nil {
return nil
}
shape, ok := value.([]interface{})
if !ok {
return fmt.Errorf("unexpected JSON type %v", value)
}
var cv []types.AttributeValue
if *v == nil {
cv = []types.AttributeValue{}
} else {
cv = *v
}
for _, value := range shape {
var col types.AttributeValue
if err := awsAwsjson10_deserializeDocumentAttributeValue(&col, value); err != nil {
return err
}
cv = append(cv, col)
}
*v = cv
return nil
}

View file

@ -0,0 +1,602 @@
// Copyright 2023 Gravitational, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dynamoathenamigration
import (
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/ratelimit"
"github.com/aws/aws-sdk-go-v2/aws/retry"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
dynamoTypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/events/athena"
"github.com/gravitational/teleport/lib/utils/prompt"
)
type Config struct {
// ExportTime is time in the past from which to export table data.
ExportTime time.Time
// ExportARN allows to use already finished export without triggering new.
ExportARN string
// DynamoTableARN that will be exported.
DynamoTableARN string
// ExportLocalDir specifies where export files will be downloaded (it must exists).
// If empty os.TempDir() will be used.
ExportLocalDir string
// Bucket used to store export.
Bucket string
// Prefix is s3 prefix where to store export inside bucket.
Prefix string
// DryRun allows to generate export and convert it to AuditEvents.
// Nothing is published to athena publisher.
// Can be used to test if export is valid.
DryRun bool
// NoOfEmitWorkers defines how many workers are used to emit audit events.
NoOfEmitWorkers int
bufferSize int
// CheckpointPath is full path of file where checkpoint data should be stored.
// Defaults to file in current directory (athenadynamomigration.json)
// Checkpoint allow to resume export which failed during emitting.
CheckpointPath string
// TopicARN is topic of athena logger.
TopicARN string
// LargePayloadBucket is s3 bucket configured for large payloads in athena logger.
LargePayloadBucket string
// LargePayloadPrefix is s3 prefix configured for large payloads in athena logger.
LargePayloadPrefix string
Logger log.FieldLogger
}
const defaultCheckpointPath = "athenadynamomigration.json"
func (cfg *Config) CheckAndSetDefaults() error {
if cfg.ExportTime.IsZero() {
cfg.ExportTime = time.Now()
}
if cfg.DynamoTableARN == "" && cfg.ExportARN == "" {
return trace.BadParameter("either DynamoTableARN or ExportARN is required")
}
if cfg.Bucket == "" {
return trace.BadParameter("missing export bucket")
}
if cfg.NoOfEmitWorkers == 0 {
cfg.NoOfEmitWorkers = 3
}
if cfg.bufferSize == 0 {
cfg.bufferSize = 10 * cfg.NoOfEmitWorkers
}
if !cfg.DryRun {
if cfg.TopicARN == "" {
return trace.BadParameter("missing Athena logger SNS Topic ARN")
}
if cfg.LargePayloadBucket == "" {
return trace.BadParameter("missing Athena logger large payload bucket")
}
}
if cfg.CheckpointPath == "" {
cfg.CheckpointPath = defaultCheckpointPath
}
if cfg.Logger == nil {
cfg.Logger = log.New()
}
return nil
}
type task struct {
Config
dynamoClient *dynamodb.Client
s3Downloader s3downloader
eventsEmitter eventsEmitter
}
type s3downloader interface {
Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (n int64, err error)
}
type eventsEmitter interface {
EmitAuditEvent(ctx context.Context, in apievents.AuditEvent) error
}
func newMigrateTask(ctx context.Context, cfg Config) (*task, error) {
awsCfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return nil, trace.Wrap(err)
}
s3Client := s3.NewFromConfig(awsCfg)
return &task{
Config: cfg,
dynamoClient: dynamodb.NewFromConfig(awsCfg),
s3Downloader: manager.NewDownloader(s3Client),
eventsEmitter: athena.NewPublisher(athena.PublisherConfig{
TopicARN: cfg.TopicARN,
SNSPublisher: sns.NewFromConfig(awsCfg, func(o *sns.Options) {
o.Retryer = retry.NewStandard(func(so *retry.StandardOptions) {
so.MaxAttempts = 30
so.MaxBackoff = 1 * time.Minute
// Use bigger rate limit to handle default sdk throttling: https://github.com/aws/aws-sdk-go-v2/issues/1665
so.RateLimiter = ratelimit.NewTokenRateLimit(1000000)
})
}),
Uploader: manager.NewUploader(s3Client),
PayloadBucket: cfg.LargePayloadBucket,
PayloadPrefix: cfg.LargePayloadPrefix,
}),
}, nil
}
// Migrate executed dynamodb -> athena migration.
func Migrate(ctx context.Context, cfg Config) error {
if err := cfg.CheckAndSetDefaults(); err != nil {
return trace.Wrap(err)
}
t, err := newMigrateTask(ctx, cfg)
if err != nil {
return trace.Wrap(err)
}
exportInfo, err := t.GetOrStartExportAndWaitForResults(ctx)
if err != nil {
return trace.Wrap(err)
}
if err := t.ProcessDataObjects(ctx, exportInfo); err != nil {
return trace.Wrap(err)
}
t.Logger.Info("Migration finished")
return nil
}
// GetOrStartExportAndWaitForResults return export results.
// It can either reused existing export or start new one, depending on FreshnessWindow.
func (t *task) GetOrStartExportAndWaitForResults(ctx context.Context) (*exportInfo, error) {
exportARN := t.Config.ExportARN
if exportARN == "" {
var err error
exportARN, err = t.startExportJob(ctx)
if err != nil {
return nil, trace.Wrap(err)
}
}
manifest, err := t.waitForCompletedExport(ctx, exportARN)
if err != nil {
return nil, trace.Wrap(err)
}
t.Logger.Debugf("Using export manifest %s", manifest)
dataObjectsInfo, err := t.getDataObjectsInfo(ctx, manifest)
if err != nil {
return nil, trace.Wrap(err)
}
return &exportInfo{
ExportARN: exportARN,
DataObjectsInfo: dataObjectsInfo,
}, nil
}
// ProcessDataObjects takes dataObjectInfo from export summary, downloads data files
// from s3, ungzip them and emitt them on SNS using athena publisher.
func (t *task) ProcessDataObjects(ctx context.Context, exportInfo *exportInfo) error {
eventsC := make(chan apievents.AuditEvent, t.bufferSize)
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
err := t.getEventsFromDataFiles(egCtx, exportInfo, eventsC)
close(eventsC)
return trace.Wrap(err)
})
eg.Go(func() error {
err := t.emitEvents(egCtx, eventsC, exportInfo.ExportARN)
return trace.Wrap(err)
})
return trace.Wrap(eg.Wait())
}
func (t *task) waitForCompletedExport(ctx context.Context, exportARN string) (exportManifest string, err error) {
req := &dynamodb.DescribeExportInput{
ExportArn: aws.String(exportARN),
}
for {
exportStatusOutput, err := t.dynamoClient.DescribeExport(ctx, req)
if err != nil {
return "", trace.Wrap(err)
}
if exportStatusOutput == nil || exportStatusOutput.ExportDescription == nil {
return "", errors.New("dynamo DescribeExport returned unexpected nil on response")
}
exportStatus := exportStatusOutput.ExportDescription.ExportStatus
switch exportStatus {
case dynamoTypes.ExportStatusCompleted:
return aws.ToString(exportStatusOutput.ExportDescription.ExportManifest), nil
case dynamoTypes.ExportStatusFailed:
return "", trace.Errorf("export %s returned failed status", exportARN)
case dynamoTypes.ExportStatusInProgress:
select {
case <-ctx.Done():
return "", trace.Wrap(ctx.Err())
case <-time.After(10 * time.Second):
t.Logger.Debug("Export job still in progress...")
}
}
}
}
func (t *task) startExportJob(ctx context.Context) (arn string, err error) {
exportOutput, err := t.dynamoClient.ExportTableToPointInTime(ctx, &dynamodb.ExportTableToPointInTimeInput{
S3Bucket: aws.String(t.Bucket),
TableArn: aws.String(t.DynamoTableARN),
ExportFormat: dynamoTypes.ExportFormatDynamodbJson,
ExportTime: aws.Time(t.ExportTime),
S3Prefix: aws.String(t.Prefix),
})
if err != nil {
return "", trace.Wrap(err)
}
if exportOutput == nil || exportOutput.ExportDescription == nil {
return "", errors.New("dynamo ExportTableToPointInTime returned unexpected nil on response")
}
exportArn := aws.ToString(exportOutput.ExportDescription.ExportArn)
t.Logger.Infof("Started export %s", exportArn)
return exportArn, nil
}
type exportInfo struct {
ExportARN string
DataObjectsInfo []dataObjectInfo
}
type dataObjectInfo struct {
DataFileS3Key string `json:"dataFileS3Key"`
ItemCount int `json:"itemCount"`
}
// getDataObjectsInfo downloads manifest-files.json and get data object info from it.
func (t *task) getDataObjectsInfo(ctx context.Context, manifestPath string) ([]dataObjectInfo, error) {
// summary file is small, we can use in-memory buffer.
writeAtBuf := manager.NewWriteAtBuffer([]byte{})
if _, err := t.s3Downloader.Download(ctx, writeAtBuf, &s3.GetObjectInput{
Bucket: aws.String(t.Bucket),
// AWS SDK returns manifest-summary.json path. We are interested in
// manifest-files.json because it's contains references about data export files.
Key: aws.String(path.Dir(manifestPath) + "/manifest-files.json"),
}); err != nil {
return nil, trace.Wrap(err)
}
var out []dataObjectInfo
scanner := bufio.NewScanner(bytes.NewBuffer(writeAtBuf.Bytes()))
// manifest-files are JSON lines files, that why it's scanned line by line.
for scanner.Scan() {
var obj dataObjectInfo
err := json.Unmarshal(scanner.Bytes(), &obj)
if err != nil {
return nil, trace.Wrap(err)
}
out = append(out, obj)
}
if err := scanner.Err(); err != nil {
return nil, trace.Wrap(err)
}
return out, nil
}
func (t *task) getEventsFromDataFiles(ctx context.Context, exportInfo *exportInfo, eventsC chan<- apievents.AuditEvent) error {
checkpoint, err := t.loadEmitterCheckpoint(ctx, exportInfo.ExportARN)
if err != nil {
return trace.Wrap(err)
}
if checkpoint != nil {
if checkpoint.FinishedWithError {
reuse, err := prompt.Confirmation(ctx, os.Stdout, prompt.Stdin(), fmt.Sprintf("It seems that previous migration %s stopped with error, do you want to resume it?", exportInfo.ExportARN))
if err != nil {
return trace.Wrap(err)
}
if reuse {
t.Logger.Info("Resuming emitting from checkpoint")
} else {
// selected not reuse
checkpoint = nil
}
} else {
// migration completed without any error, no sense of reusing checkpoint.
t.Logger.Info("Skipping checkpoint because previous migration finished without error")
checkpoint = nil
}
}
// afterCheckpoint is used to pass information between fromS3ToChan calls
// if checkpoint was reached.
var afterCheckpoint bool
for _, dataObj := range exportInfo.DataObjectsInfo {
t.Logger.Debugf("Downloading %s", dataObj.DataFileS3Key)
afterCheckpoint, err = t.fromS3ToChan(ctx, dataObj, eventsC, checkpoint, afterCheckpoint)
if err != nil {
return trace.Wrap(err)
}
}
return nil
}
func (t *task) fromS3ToChan(ctx context.Context, dataObj dataObjectInfo, eventsC chan<- apievents.AuditEvent, checkpoint *checkpointData, afterCheckpointIn bool) (afterCheckpointOut bool, err error) {
f, err := t.downloadFromS3(ctx, dataObj.DataFileS3Key)
if err != nil {
return false, trace.Wrap(err)
}
defer f.Close()
gzipReader, err := gzip.NewReader(f)
if err != nil {
return false, trace.Wrap(err)
}
defer gzipReader.Close()
checkpointValues := checkpoint.checkpointValues()
afterCheckpoint := afterCheckpointIn
scanner := bufio.NewScanner(gzipReader)
t.Logger.Debugf("Scanning %d events", dataObj.ItemCount)
count := 0
for scanner.Scan() {
count++
ev, err := exportedDynamoItemToAuditEvent(ctx, scanner.Bytes())
if err != nil {
return false, trace.Wrap(err)
}
// if checkpoint is present, it means that previous run ended with error
// and we want to continue from last valid checkpoint.
// We have list of checkpoints because processing is done in async way with
// multiple workers. We are looking for first id among checkpoints.
if checkpoint != nil && !afterCheckpoint {
if !slices.Contains(checkpointValues, ev.GetID()) {
// skipping because was processed in previous run.
continue
} else {
t.Logger.Debugf("Event %s is last checkpoint, will start emitting from next event on the list", ev.GetID())
// id is on list of valid checkpoints
afterCheckpoint = true
// This was last completed, skip it and from next iteration emit everything.
continue
}
}
select {
case eventsC <- ev:
case <-ctx.Done():
return false, ctx.Err()
}
if count%100 == 0 {
t.Logger.Debugf("Sent on buffer %d/%d events from %s", count, dataObj.ItemCount, dataObj.DataFileS3Key)
}
}
if err := scanner.Err(); err != nil {
return false, trace.Wrap(err)
}
return afterCheckpoint, nil
}
// exportedDynamoItemToAuditEvent converts single line of dynamo export into AuditEvent.
func exportedDynamoItemToAuditEvent(ctx context.Context, in []byte) (apievents.AuditEvent, error) {
var itemMap map[string]map[string]any
if err := json.Unmarshal(in, &itemMap); err != nil {
return nil, trace.Wrap(err)
}
var attributeMap map[string]dynamoTypes.AttributeValue
if err := awsAwsjson10_deserializeDocumentAttributeMap(&attributeMap, itemMap["Item"]); err != nil {
return nil, trace.Wrap(err)
}
var eventFields events.EventFields
if err := attributevalue.Unmarshal(attributeMap["FieldsMap"], &eventFields); err != nil {
return nil, trace.Wrap(err)
}
event, err := events.FromEventFields(eventFields)
return event, trace.Wrap(err)
}
func (t *task) downloadFromS3(ctx context.Context, key string) (*os.File, error) {
originalName := path.Base(key)
var dir string
if t.Config.ExportLocalDir != "" {
dir = t.Config.ExportLocalDir
} else {
dir = os.TempDir()
}
path := path.Join(dir, originalName)
f, err := os.Create(path)
if err != nil {
return nil, trace.Wrap(err)
}
if _, err := t.s3Downloader.Download(ctx, f, &s3.GetObjectInput{
Bucket: aws.String(t.Bucket),
Key: aws.String(key),
}); err != nil {
f.Close()
return nil, trace.Wrap(err)
}
return f, nil
}
type checkpointData struct {
ExportARN string `json:"export_arn"`
FinishedWithError bool `json:"finished_with_error"`
// Checkpoints key represents worker index.
// Checkpoints value represents last valid event id.
Checkpoints map[int]string `json:"checkpoints"`
}
func (c *checkpointData) checkpointValues() []string {
if c == nil {
return nil
}
return maps.Values(c.Checkpoints)
}
func (t *task) storeEmitterCheckpoint(in checkpointData) error {
bb, err := json.Marshal(in)
if err != nil {
return trace.Wrap(err)
}
return trace.Wrap(os.WriteFile(t.CheckpointPath, bb, 0o755))
}
func (t *task) loadEmitterCheckpoint(ctx context.Context, exportARN string) (*checkpointData, error) {
bb, err := os.ReadFile(t.CheckpointPath)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil, nil
}
return nil, trace.Wrap(err)
}
var out checkpointData
if err := json.Unmarshal(bb, &out); err != nil {
return nil, trace.Wrap(err)
}
// There are checkpoints for different export, assume there is no checkpoint saved.
if exportARN != out.ExportARN {
return nil, nil
}
return &out, nil
}
func (t *task) emitEvents(ctx context.Context, eventsC <-chan apievents.AuditEvent, exportARN string) error {
if t.DryRun {
// in dryRun we just want to count events, validation is done when reading from file.
var count int
var oldest, newest apievents.AuditEvent
for event := range eventsC {
count++
if oldest == nil && newest == nil {
// first iteration, initialize values with first event.
oldest = event
newest = event
}
if oldest.GetTime().After(event.GetTime()) {
oldest = event
}
if newest.GetTime().Before(event.GetTime()) {
newest = event
}
}
if count == 0 {
return errors.New("there were not events from export")
}
t.Logger.Infof("Dry run: there are %d events from %v to %v", count, oldest.GetTime(), newest.GetTime())
return nil
}
// mu protects checkpointsPerWorker.
var mu sync.Mutex
checkpointsPerWorker := map[int]string{}
errG, workerCtx := errgroup.WithContext(ctx)
for i := 0; i < t.NoOfEmitWorkers; i++ {
i := i
errG.Go(func() error {
for {
select {
case <-workerCtx.Done():
return trace.Wrap(ctx.Err())
case e, ok := <-eventsC:
if !ok {
return nil
}
if err := t.eventsEmitter.EmitAuditEvent(workerCtx, e); err != nil {
return trace.Wrap(err)
} else {
mu.Lock()
checkpointsPerWorker[i] = e.GetID()
mu.Unlock()
}
}
}
})
}
workersErr := errG.Wait()
// workersErr is handled below because we want to store checkpoint on error.
// If there is missing data from at least one worker, it means that worker
// does not have any valid checkpoint to store. Without any valid checkpoint
// we won't be able to calculate min checkpoint, so does not store checkpoint at all.
if len(checkpointsPerWorker) < t.NoOfEmitWorkers {
t.Logger.Warnf("Not enough checkpoints from workers, got %d, expected %d", len(checkpointsPerWorker), t.NoOfEmitWorkers)
return trace.Wrap(workersErr)
}
checkpoint := checkpointData{
FinishedWithError: workersErr != nil || ctx.Err() != nil,
ExportARN: exportARN,
Checkpoints: checkpointsPerWorker,
}
if err := t.storeEmitterCheckpoint(checkpoint); err != nil {
t.Logger.Errorf("Failed to store checkpoint: %v", err)
}
return trace.Wrap(workersErr)
}

View file

@ -0,0 +1,421 @@
// Copyright 2023 Gravitational, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dynamoathenamigration
import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"path"
"sort"
"strings"
"sync"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/teleport/lib/utils/prompt"
)
func TestMigrateProcessDataObjects(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// testDataObjects represents how dynamo export data using JSON lines format.
testDataObjects := map[string]string{
"testdata/dataObj1.json.gz": `{ "Item": { "EventIndex": { "N": "2147483647" }, "SessionID": { "S": "4298bd54-a747-4d53-b850-83ba17caae5a" }, "CreatedAtDate": { "S": "2023-05-22" }, "FieldsMap": { "M": { "cluster_name": { "S": "test.example.local" }, "uid": { "S": "850d0dd5-7d6b-415e-a404-c4f79cdf27c9" }, "code": { "S": "T2005I" }, "ei": { "N": "2147483647" }, "time": { "S": "2023-05-22T12:12:21.966Z" }, "event": { "S": "session.upload" }, "sid": { "S": "4298bd54-a747-4d53-b850-83ba17caae5a" } } }, "EventType": { "S": "session.upload" }, "EventNamespace": { "S": "default" }, "CreatedAt": { "N": "1684757541" } } }
{ "Item": { "EventIndex": { "N": "2147483647" }, "SessionID": { "S": "f81a2fda-4ede-4abc-86f9-7a58e189038e" }, "CreatedAtDate": { "S": "2023-05-22" }, "FieldsMap": { "M": { "cluster_name": { "S": "test.example.local" }, "uid": { "S": "19ab6e90-602c-4dcc-88b3-de5f28753f88" }, "code": { "S": "T2005I" }, "ei": { "N": "2147483647" }, "time": { "S": "2023-05-22T12:12:21.966Z" }, "event": { "S": "session.upload" }, "sid": { "S": "f81a2fda-4ede-4abc-86f9-7a58e189038e" } } }, "EventType": { "S": "session.upload" }, "EventNamespace": { "S": "default" }, "CreatedAt": { "N": "1684757541" } } }`,
"testdata/dataObj2.json.gz": `{ "Item": { "EventIndex": { "N": "2147483647" }, "SessionID": { "S": "35f35254-92f9-46a2-9b05-8c13f712389b" }, "CreatedAtDate": { "S": "2023-05-22" }, "FieldsMap": { "M": { "cluster_name": { "S": "test.example.local" }, "uid": { "S": "46c03b4f-4ef5-4d86-80aa-4b53c7efc28f" }, "code": { "S": "T2005I" }, "ei": { "N": "2147483647" }, "time": { "S": "2023-05-22T12:12:21.966Z" }, "event": { "S": "session.upload" }, "sid": { "S": "35f35254-92f9-46a2-9b05-8c13f712389b" } } }, "EventType": { "S": "session.upload" }, "EventNamespace": { "S": "default" }, "CreatedAt": { "N": "1684757541" } } }`,
}
emitter := &mockEmitter{}
mt := &task{
s3Downloader: &fakeDownloader{
dataObjects: testDataObjects,
},
eventsEmitter: emitter,
Config: Config{
Logger: utils.NewLoggerForTests(),
NoOfEmitWorkers: 5,
bufferSize: 10,
CheckpointPath: path.Join(t.TempDir(), "migration-tests.json"),
},
}
err := mt.ProcessDataObjects(ctx, &exportInfo{
ExportARN: "export-arn",
DataObjectsInfo: []dataObjectInfo{
{DataFileS3Key: "testdata/dataObj1.json.gz", ItemCount: 2},
{DataFileS3Key: "testdata/dataObj2.json.gz", ItemCount: 1},
},
})
require.NoError(t, err)
wantEventTimestamp := time.Date(2023, 5, 22, 12, 12, 21, 966000000, time.UTC)
requireEventsEqualInAnyOrder(t, []apievents.AuditEvent{
&apievents.SessionUpload{
Metadata: apievents.Metadata{
Index: 2147483647,
Type: "session.upload",
ID: "850d0dd5-7d6b-415e-a404-c4f79cdf27c9",
Code: "T2005I",
Time: wantEventTimestamp,
ClusterName: "test.example.local",
},
SessionMetadata: apievents.SessionMetadata{
SessionID: "4298bd54-a747-4d53-b850-83ba17caae5a",
},
},
&apievents.SessionUpload{
Metadata: apievents.Metadata{
Index: 2147483647,
Type: "session.upload",
ID: "19ab6e90-602c-4dcc-88b3-de5f28753f88",
Code: "T2005I",
Time: wantEventTimestamp,
ClusterName: "test.example.local",
},
SessionMetadata: apievents.SessionMetadata{
SessionID: "f81a2fda-4ede-4abc-86f9-7a58e189038e",
},
},
&apievents.SessionUpload{
Metadata: apievents.Metadata{
Index: 2147483647,
Type: "session.upload",
ID: "46c03b4f-4ef5-4d86-80aa-4b53c7efc28f",
Code: "T2005I",
Time: wantEventTimestamp,
ClusterName: "test.example.local",
},
SessionMetadata: apievents.SessionMetadata{
SessionID: "35f35254-92f9-46a2-9b05-8c13f712389b",
},
},
}, emitter.events)
}
type fakeDownloader struct {
dataObjects map[string]string
}
func (f *fakeDownloader) Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (int64, error) {
data, ok := f.dataObjects[*input.Key]
if !ok {
return 0, errors.New("object does not exists")
}
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)
_, err := zw.Write([]byte(data))
if err != nil {
return 0, err
}
if err := zw.Close(); err != nil {
return 0, err
}
n, err := w.WriteAt(buf.Bytes(), 0)
return int64(n), err
}
type mockEmitter struct {
mu sync.Mutex
events []apievents.AuditEvent
// failAfterNCalls if greater than 0, will cause failure of emitter after n calls
failAfterNCalls int
}
func (m *mockEmitter) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.failAfterNCalls > 0 && len(m.events) >= m.failAfterNCalls {
return errors.New("emitter failure")
}
m.events = append(m.events, in)
return nil
}
// requireEventsEqualInAnyOrder compares slices of auditevents ignoring order.
// It's useful in tests because consumer does not guarantee order.
func requireEventsEqualInAnyOrder(t *testing.T, want, got []apievents.AuditEvent) {
sort.Slice(want, func(i, j int) bool {
return want[i].GetID() < want[j].GetID()
})
sort.Slice(got, func(i, j int) bool {
return got[i].GetID() < got[j].GetID()
})
require.Empty(t, cmp.Diff(want, got))
}
func TestMigrationCheckpoint(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// There is confirmation prompt in migration when reusing checkpoint, that's why
// Stdin is overwritten in tests.
oldStdin := prompt.Stdin()
t.Cleanup(func() { prompt.SetStdin(oldStdin) })
noOfWorkers := 10
defaultConfig := Config{
Logger: utils.NewLoggerForTests(),
NoOfEmitWorkers: noOfWorkers,
bufferSize: noOfWorkers * 5,
CheckpointPath: path.Join(t.TempDir(), "migration-tests.json"),
}
t.Run("no migration checkpoint, emit every event", func(t *testing.T) {
prompt.SetStdin(prompt.NewFakeReader())
testDataObjects := map[string]string{
"testdata/dataObj1.json.gz": generateDynamoExportData(100),
"testdata/dataObj2.json.gz": generateDynamoExportData(100),
}
emitter := &mockEmitter{}
mt := &task{
s3Downloader: &fakeDownloader{
dataObjects: testDataObjects,
},
eventsEmitter: emitter,
Config: defaultConfig,
}
err := mt.ProcessDataObjects(ctx, &exportInfo{
ExportARN: uuid.NewString(),
DataObjectsInfo: []dataObjectInfo{
{DataFileS3Key: "testdata/dataObj1.json.gz"},
{DataFileS3Key: "testdata/dataObj2.json.gz"},
},
})
require.NoError(t, err)
require.Len(t, emitter.events, 200, "unexpected number of emitted events")
})
t.Run("failure after 50 calls, reuse checkpoint", func(t *testing.T) {
// y to prompt on if reuse checkpoint
prompt.SetStdin(prompt.NewFakeReader().AddString("y"))
exportARN := uuid.NewString()
testDataObjects := map[string]string{
"testdata/dataObj1.json.gz": generateDynamoExportData(100),
"testdata/dataObj2.json.gz": generateDynamoExportData(100),
}
emitter := &mockEmitter{
failAfterNCalls: 50,
}
mt := &task{
s3Downloader: &fakeDownloader{
dataObjects: testDataObjects,
},
eventsEmitter: emitter,
Config: defaultConfig,
}
err := mt.ProcessDataObjects(ctx, &exportInfo{
ExportARN: exportARN,
DataObjectsInfo: []dataObjectInfo{
{DataFileS3Key: "testdata/dataObj1.json.gz"},
{DataFileS3Key: "testdata/dataObj2.json.gz"},
},
})
require.Error(t, err)
require.Len(t, emitter.events, 50, "unexpected number of emitted events")
newEmitter := &mockEmitter{}
newMigration := task{
s3Downloader: &fakeDownloader{
dataObjects: testDataObjects,
},
eventsEmitter: newEmitter,
Config: defaultConfig,
}
err = newMigration.ProcessDataObjects(ctx, &exportInfo{
ExportARN: exportARN,
DataObjectsInfo: []dataObjectInfo{
{DataFileS3Key: "testdata/dataObj1.json.gz"},
{DataFileS3Key: "testdata/dataObj2.json.gz"},
},
})
require.NoError(t, err)
// There was 200 events, first migration finished after 50, so this one should emit at least 150.
// We are using range (150,199) to check because of checkpoint is stored per worker and we are using
// first from list so we expect up to noOfWorkers-1 more events, but in some condition it can be more (on worker processing faster).
require.GreaterOrEqual(t, len(newEmitter.events), 150, float64(noOfWorkers), "unexpected number of emitted events")
require.Less(t, len(newEmitter.events), 199, float64(noOfWorkers), "unexpected number of emitted events")
})
t.Run("failure after 150 calls (from 2nd export file), reuse checkpoint", func(t *testing.T) {
// y to prompt on if reuse checkpoint
prompt.SetStdin(prompt.NewFakeReader().AddString("y"))
exportARN := uuid.NewString()
testDataObjects := map[string]string{
"testdata/dataObj1.json.gz": generateDynamoExportData(100),
"testdata/dataObj2.json.gz": generateDynamoExportData(100),
}
emitter := &mockEmitter{
failAfterNCalls: 150,
}
mt := &task{
s3Downloader: &fakeDownloader{
dataObjects: testDataObjects,
},
eventsEmitter: emitter,
Config: defaultConfig,
}
err := mt.ProcessDataObjects(ctx, &exportInfo{
ExportARN: exportARN,
DataObjectsInfo: []dataObjectInfo{
{DataFileS3Key: "testdata/dataObj1.json.gz"},
{DataFileS3Key: "testdata/dataObj2.json.gz"},
},
})
require.Error(t, err)
require.Len(t, emitter.events, 150, "unexpected number of emitted events")
newEmitter := &mockEmitter{}
newMigration := task{
s3Downloader: &fakeDownloader{
dataObjects: testDataObjects,
},
eventsEmitter: newEmitter,
Config: defaultConfig,
}
err = newMigration.ProcessDataObjects(ctx, &exportInfo{
ExportARN: exportARN,
DataObjectsInfo: []dataObjectInfo{
{DataFileS3Key: "testdata/dataObj1.json.gz"},
{DataFileS3Key: "testdata/dataObj2.json.gz"},
},
})
require.NoError(t, err)
// There was 200 events, first migration finished after 150, so this one should emit at least 50.
// We are using range (50,99) to check because of checkpoint is stored per worker and we are using
// first from list so we expect up to noOfWorkers-1 more events, but in some condition it can be more (on worker processing faster).
require.GreaterOrEqual(t, len(newEmitter.events), 50, float64(noOfWorkers), "unexpected number of emitted events")
require.Less(t, len(newEmitter.events), 99, float64(noOfWorkers), "unexpected number of emitted events")
})
t.Run("checkpoint from export1 is not reused on export2", func(t *testing.T) {
prompt.SetStdin(prompt.NewFakeReader())
exportARN1 := uuid.NewString()
testDataObjects1 := map[string]string{
"testdata/dataObj11.json.gz": generateDynamoExportData(100),
"testdata/dataObj12.json.gz": generateDynamoExportData(100),
}
emitter := &mockEmitter{
// To use checkpoint.
failAfterNCalls: 50,
}
mt := &task{
s3Downloader: &fakeDownloader{
dataObjects: testDataObjects1,
},
eventsEmitter: emitter,
Config: defaultConfig,
}
err := mt.ProcessDataObjects(ctx, &exportInfo{
ExportARN: exportARN1,
DataObjectsInfo: []dataObjectInfo{
{DataFileS3Key: "testdata/dataObj11.json.gz"},
{DataFileS3Key: "testdata/dataObj12.json.gz"},
},
})
require.Error(t, err)
require.Len(t, emitter.events, 50, "unexpected number of emitted events")
exportARN2 := uuid.NewString()
testDataObjects2 := map[string]string{
"testdata/dataObj21.json.gz": generateDynamoExportData(100),
"testdata/dataObj22.json.gz": generateDynamoExportData(100),
}
newEmitter := &mockEmitter{}
newMigration := task{
s3Downloader: &fakeDownloader{
dataObjects: testDataObjects2,
},
eventsEmitter: newEmitter,
Config: defaultConfig,
}
err = newMigration.ProcessDataObjects(ctx, &exportInfo{
ExportARN: exportARN2,
DataObjectsInfo: []dataObjectInfo{
{DataFileS3Key: "testdata/dataObj21.json.gz"},
{DataFileS3Key: "testdata/dataObj22.json.gz"},
},
})
require.NoError(t, err)
require.Len(t, newEmitter.events, 200, "unexpected number of emitted events")
})
t.Run("failure after 50 calls, refuse to reuse checkpoint", func(t *testing.T) {
// y to prompt on if reuse checkpoint
prompt.SetStdin(prompt.NewFakeReader().AddString("n"))
exportARN := uuid.NewString()
testDataObjects := map[string]string{
"testdata/dataObj1.json.gz": generateDynamoExportData(100),
"testdata/dataObj2.json.gz": generateDynamoExportData(100),
}
emitter := &mockEmitter{
failAfterNCalls: 50,
}
mt := &task{
s3Downloader: &fakeDownloader{
dataObjects: testDataObjects,
},
eventsEmitter: emitter,
Config: defaultConfig,
}
err := mt.ProcessDataObjects(ctx, &exportInfo{
ExportARN: exportARN,
DataObjectsInfo: []dataObjectInfo{
{DataFileS3Key: "testdata/dataObj1.json.gz"},
{DataFileS3Key: "testdata/dataObj2.json.gz"},
},
})
require.Error(t, err)
require.Len(t, emitter.events, 50, "unexpected number of emitted events")
newEmitter := &mockEmitter{}
newMigration := task{
s3Downloader: &fakeDownloader{
dataObjects: testDataObjects,
},
eventsEmitter: newEmitter,
Config: defaultConfig,
}
err = newMigration.ProcessDataObjects(ctx, &exportInfo{
ExportARN: exportARN,
DataObjectsInfo: []dataObjectInfo{
{DataFileS3Key: "testdata/dataObj1.json.gz"},
{DataFileS3Key: "testdata/dataObj2.json.gz"},
},
})
require.NoError(t, err)
require.Len(t, newEmitter.events, 200, "unexpected number of emitted events")
})
}
func generateDynamoExportData(n int) string {
if n < 1 {
panic("number of events to generate must be > 0")
}
lineFmt := `{ "Item": { "EventIndex": { "N": "2147483647" }, "SessionID": { "S": "4298bd54-a747-4d53-b850-83ba17caae5a" }, "CreatedAtDate": { "S": "2023-05-22" }, "FieldsMap": { "M": { "cluster_name": { "S": "test.example.local" }, "uid": { "S": "%s" }, "code": { "S": "T2005I" }, "ei": { "N": "2147483647" }, "time": { "S": "2023-05-22T12:12:21.966Z" }, "event": { "S": "session.upload" }, "sid": { "S": "4298bd54-a747-4d53-b850-83ba17caae5a" } } }, "EventType": { "S": "session.upload" }, "EventNamespace": { "S": "default" }, "CreatedAt": { "N": "1684757541" } } }`
sb := strings.Builder{}
for i := 0; i < n; i++ {
sb.WriteString(fmt.Sprintf(lineFmt+"\n", uuid.NewString()))
}
return sb.String()
}

4
go.mod
View file

@ -33,6 +33,7 @@ require (
github.com/aws/aws-sdk-go-v2 v1.18.1
github.com/aws/aws-sdk-go-v2/config v1.18.27
github.com/aws/aws-sdk-go-v2/credentials v1.13.26
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.25
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.4
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.67
github.com/aws/aws-sdk-go-v2/service/athena v1.30.2
@ -220,7 +221,8 @@ require (
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.25 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.19.7 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.19.7
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.14.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.28 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.27 // indirect

4
go.sum
View file

@ -314,6 +314,8 @@ github.com/aws/aws-sdk-go-v2/credentials v1.11.2/go.mod h1:j8YsY9TXTm31k4eFhspiQ
github.com/aws/aws-sdk-go-v2/credentials v1.13.24/go.mod h1:jYPYi99wUOPIFi0rhiOvXeSEReVOzBqFNOX5bXYoG2o=
github.com/aws/aws-sdk-go-v2/credentials v1.13.26 h1:qmU+yhKmOCyujmuPY7tf5MxR/RKyZrOPO3V4DobiTUk=
github.com/aws/aws-sdk-go-v2/credentials v1.13.26/go.mod h1:GoXt2YC8jHUBbA4jr+W3JiemnIbkXOfxSXcisUsZ3os=
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.25 h1:/+Z/dCO+1QHOlCm7m9G61snvIaDRUTv/HXp+8HdESiY=
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.25/go.mod h1:JQ0HJ+3LaAKHx3uwRUAfR/tb/gOlgAGPT6mZfIq55Ec=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.3/go.mod h1:uk1vhHHERfSVCUnqSqz8O48LBYDSC+k6brng09jcMOk=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3/go.mod h1:4Q0UFP0YJf0NrsEuEYHpM9fTSEVnD16Z3uyEF7J9JGM=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.4 h1:LxK/bitrAr4lnh9LnIS6i7zWbCOdMsfzKFBI6LUCS0I=
@ -339,6 +341,8 @@ github.com/aws/aws-sdk-go-v2/service/athena v1.30.2 h1:DQU1rX6yPczsF9Xow7124U51a
github.com/aws/aws-sdk-go-v2/service/athena v1.30.2/go.mod h1:1wNfeJcbxrsrC/Oa3CjL1j8+0P1a6z//XZgcbYnCs/U=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.19.7 h1:yb2o8oh3Y+Gg2g+wlzrWS3pB89+dHrXayT/d9cs8McU=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.19.7/go.mod h1:1MNss6sqoIsFGisX92do/5doiUCBrN7EjhZCS/8DUjI=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.14.11 h1:WHi9VKMYGtWt2DzqeYHXzt55aflymO2EZ6axuKla8oU=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.14.11/go.mod h1:pP+91QTpJMvcFTqGky6puHrkBs8oqoB3XOCiGRDaXwI=
github.com/aws/aws-sdk-go-v2/service/ec2 v1.100.1 h1:AsLoN1zlf+PJ5DRzoegd8k/Zk9f/fBCMKxrZ4sXSE5k=
github.com/aws/aws-sdk-go-v2/service/ec2 v1.100.1/go.mod h1:tIctCeX9IbzsUTKHt53SVEcgyfxV2ElxJeEB+QUbc4M=
github.com/aws/aws-sdk-go-v2/service/ecs v1.27.1 h1:54QSuWR3Pot7HqBRXd+c1yF97h2bqzDBID8qFSAkTlE=

View file

@ -405,7 +405,7 @@ func New(ctx context.Context, cfg Config) (*Log, error) {
}
l := &Log{
publisher: newPublisher(cfg),
publisher: newPublisherFromAthenaConfig(cfg),
querier: querier,
consumerCloser: consumer,
}

View file

@ -293,8 +293,10 @@ func TestPublisherConsumer(t *testing.T) {
fS3 := newFakeS3manager()
fq := newFakeQueue()
p := &publisher{
snsPublisher: fq,
uploader: fS3,
PublisherConfig: PublisherConfig{
SNSPublisher: fq,
Uploader: fS3,
},
}
smallEvent := &apievents.AppCreate{

View file

@ -48,11 +48,7 @@ const (
// It publishes proto events directly to SNS topic, or use S3 bucket
// if payload is too large for SNS.
type publisher struct {
topicARN string
snsPublisher snsPublisher
uploader s3uploader
payloadBucket string
payloadPrefix string
PublisherConfig
}
type snsPublisher interface {
@ -63,23 +59,38 @@ type s3uploader interface {
Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error)
}
// newPublisher returns new instance of publisher.
func newPublisher(cfg Config) *publisher {
type PublisherConfig struct {
TopicARN string
SNSPublisher snsPublisher
Uploader s3uploader
PayloadBucket string
PayloadPrefix string
}
// NewPublisher returns new instance of publisher.
func NewPublisher(cfg PublisherConfig) *publisher {
return &publisher{
PublisherConfig: cfg,
}
}
// newPublisherFromAthenaConfig returns new instance of publisher from athena
// config.
func newPublisherFromAthenaConfig(cfg Config) *publisher {
r := retry.NewStandard(func(so *retry.StandardOptions) {
so.MaxAttempts = 20
so.MaxBackoff = 1 * time.Minute
})
// TODO(tobiaszheller): consider reworking lib/observability to work also on s3 sdk-v2.
return &publisher{
topicARN: cfg.TopicARN,
snsPublisher: sns.NewFromConfig(*cfg.AWSConfig, func(o *sns.Options) {
return NewPublisher(PublisherConfig{
TopicARN: cfg.TopicARN,
SNSPublisher: sns.NewFromConfig(*cfg.AWSConfig, func(o *sns.Options) {
o.Retryer = r
}),
uploader: manager.NewUploader(s3.NewFromConfig(*cfg.AWSConfig)),
payloadBucket: cfg.largeEventsBucket,
payloadPrefix: cfg.largeEventsPrefix,
}
// TODO(tobiaszheller): consider reworking lib/observability to work also on s3 sdk-v2.
Uploader: manager.NewUploader(s3.NewFromConfig(*cfg.AWSConfig)),
PayloadBucket: cfg.largeEventsBucket,
PayloadPrefix: cfg.largeEventsPrefix,
})
}
// EmitAuditEvent emits audit event to SNS topic. Topic should be connected with
@ -117,9 +128,9 @@ func (p *publisher) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent)
}
func (p *publisher) emitViaS3(ctx context.Context, uid string, marshaledEvent []byte) error {
path := filepath.Join(p.payloadPrefix, uid)
out, err := p.uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: aws.String(p.payloadBucket),
path := filepath.Join(p.PayloadPrefix, uid)
out, err := p.Uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: aws.String(p.PayloadBucket),
Key: aws.String(path),
Body: bytes.NewBuffer(marshaledEvent),
})
@ -140,8 +151,8 @@ func (p *publisher) emitViaS3(ctx context.Context, uid string, marshaledEvent []
return trace.Wrap(err)
}
_, err = p.snsPublisher.Publish(ctx, &sns.PublishInput{
TopicArn: aws.String(p.topicARN),
_, err = p.SNSPublisher.Publish(ctx, &sns.PublishInput{
TopicArn: aws.String(p.TopicARN),
Message: aws.String(base64.StdEncoding.EncodeToString(buf)),
MessageAttributes: map[string]snsTypes.MessageAttributeValue{
payloadTypeAttr: {DataType: aws.String("String"), StringValue: aws.String(payloadTypeS3Based)},
@ -151,8 +162,8 @@ func (p *publisher) emitViaS3(ctx context.Context, uid string, marshaledEvent []
}
func (p *publisher) emitViaSNS(ctx context.Context, uid string, b64Encoded string) error {
_, err := p.snsPublisher.Publish(ctx, &sns.PublishInput{
TopicArn: aws.String(p.topicARN),
_, err := p.SNSPublisher.Publish(ctx, &sns.PublishInput{
TopicArn: aws.String(p.TopicARN),
Message: aws.String(b64Encoded),
MessageAttributes: map[string]snsTypes.MessageAttributeValue{
payloadTypeAttr: {DataType: aws.String("String"), StringValue: aws.String(payloadTypeRawProtoEvent)},

View file

@ -88,8 +88,10 @@ func Test_EmitAuditEvent(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
fq := newFakeQueue()
p := &publisher{
snsPublisher: fq,
uploader: tt.uploader,
PublisherConfig: PublisherConfig{
SNSPublisher: fq,
Uploader: tt.uploader,
},
}
err := p.EmitAuditEvent(context.Background(), tt.in)
require.NoError(t, err)