Update DynamoDB backend

* Add support for TTL
* Add support for Batch reads
* Update default values
* Use batch reads to retrieve nodes
This commit is contained in:
Sasha Klizhentas 2017-11-10 12:20:18 -08:00
parent d15a4d0e4b
commit f2549155fd
13 changed files with 570 additions and 294 deletions

View file

@ -0,0 +1,19 @@
REGION ?=
TABLE_NAME ?=
export
.PHONY: autoscale-plan
autoscale-plan:
cd autoscale && terraform init
cd autoscale && TF_VAR_table_name=$(TABLE_NAME) TF_VAR_region=$(REGION) terraform plan
.PHONY: autoscale-apply
autoscale-apply:
cd autoscale && terraform init
cd autoscale && TF_VAR_table_name=$(TABLE_NAME) TF_VAR_region=$(REGION) terraform apply
.PHONY: autoscale-destroy
autoscale-destroy:
cd autoscale && terraform init
cd autoscale && TF_VAR_table_name=$(TABLE_NAME) TF_VAR_region=$(REGION) terraform destroy

View file

@ -0,0 +1,3 @@
## DynamoDB examples
* [Autoscaling]

View file

@ -0,0 +1,16 @@
provider "aws" {
region = "${var.region}"
}
variable "region" {
type = "string"
}
variable "table_name" {
type = "string"
}
module dynamoautoscale {
source = "../../../modules/dynamoautoscale"
table_name = "${var.table_name}"
}

View file

@ -61,6 +61,21 @@ type Backend interface {
Clock() clockwork.Clock
}
// Item is a pair of key and value
type Item struct {
// Key is an item key
Key string
// Value is an item value
Value []byte
}
// ItemsGetter is an interface that allows gettings all
// items in the bucket at once
type ItemsGetter interface {
// GetItems returns a list of items - key value pairs
GetItems(bucket []string) ([]Item, error)
}
// backend.Params type defines a flexible unified back-end configuration API.
// It is just a map of key/value pairs which gets populated by `storage` section
// in Teleport YAML config.

View file

@ -18,6 +18,7 @@ limitations under the License.
package dynamo
import (
"fmt"
"sort"
"strings"
"time"
@ -47,14 +48,34 @@ type DynamoConfig struct {
SecretKey string `json:"secret_key,omitempty"`
// Tablename where to store K/V in DynamoDB
Tablename string `json:"table_name,omitempty"`
// ReadCapacityUnits is Dynamodb read capacity units
ReadCapacityUnits int64 `json:"read_capacity_units"`
// WriteCapacityUnits is Dynamodb write capacity units
WriteCapacityUnits int64 `json:"write_capacity_units"`
}
// CheckAndSetDefaults is a helper returns an error if the supplied configuration
// is not enough to connect to DynamoDB
func (cfg *DynamoConfig) CheckAndSetDefaults() error {
// table is not configured?
if cfg.Tablename == "" {
return trace.BadParameter("DynamoDB: table_name is not specified")
}
if cfg.ReadCapacityUnits == 0 {
cfg.ReadCapacityUnits = DefaultReadCapacityUnits
}
if cfg.WriteCapacityUnits == 0 {
cfg.WriteCapacityUnits = DefaultWriteCapacityUnits
}
return nil
}
// DynamoDBBackend struct
type DynamoDBBackend struct {
tableName string
region string
svc *dynamodb.DynamoDB
clock clockwork.Clock
*log.Entry
DynamoConfig
svc *dynamodb.DynamoDB
clock clockwork.Clock
}
type record struct {
@ -63,6 +84,8 @@ type record struct {
Value []byte
Timestamp int64
TTL time.Duration
Expires *int64 `json:"Expires,omitempty"`
key string
}
type keyLookup struct {
@ -79,24 +102,30 @@ const (
// such table needs to be migrated
oldPathAttr = "Key"
// conditionFailedErr is an AWS error code for "already exists"
// when creating a new object:
conditionFailedErr = "ConditionalCheckFailedException"
// BackendName is the name of this backend
BackendName = "dynamodb"
// resourceNotFoundErr is an AWS error code for "resource not found"
resourceNotFoundErr = "ResourceNotFoundException"
// ttlKey is a key used for TTL specification
ttlKey = "Expires"
// DefaultReadCapacityUnits specifies default value for read capacity units
DefaultReadCapacityUnits = 10
// DefaultWriteCapacityUnits specifies default value for write capacity units
DefaultWriteCapacityUnits = 10
)
// GetName() is a part of backend API and it returns DynamoDB backend type
// as it appears in `storage/type` section of Teleport YAML
func GetName() string {
return "dynamodb"
return BackendName
}
// New returns new instance of DynamoDB backend.
// It's an implementation of backend API's NewFunc
func New(params backend.Params) (backend.Backend, error) {
log.Info("[DynamoDB] Initializing DynamoDB backend")
l := log.WithFields(log.Fields{trace.Component: BackendName})
l.Info("initializing backend")
var cfg *DynamoConfig
err := utils.ObjectToStruct(params, &cfg)
@ -105,15 +134,15 @@ func New(params backend.Params) (backend.Backend, error) {
return nil, trace.BadParameter("DynamoDB configuration is invalid", err)
}
defer log.Debug("[DynamoDB] AWS session created")
defer log.Debug("AWS session created")
if err := checkConfig(cfg); err != nil {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
b := &DynamoDBBackend{
tableName: cfg.Tablename,
region: cfg.Region,
clock: clockwork.NewRealClock(),
Entry: l,
DynamoConfig: *cfg,
clock: clockwork.NewRealClock(),
}
// create an AWS session using default SDK behavior, i.e. it will interpret
// the environment and ~/.aws directory just like an AWS CLI tool would:
@ -137,7 +166,7 @@ func New(params backend.Params) (backend.Backend, error) {
b.svc = dynamodb.New(sess)
// check if the table exists?
ts, err := b.getTableStatus(b.tableName)
ts, err := b.getTableStatus(b.Tablename)
if err != nil {
return nil, trace.Wrap(err)
}
@ -145,13 +174,17 @@ func New(params backend.Params) (backend.Backend, error) {
case tableStatusOK:
break
case tableStatusMissing:
err = b.createTable(b.tableName, "FullPath")
err = b.createTable(b.Tablename, "FullPath")
case tableStatusNeedsMigration:
err = b.migrate(b.tableName)
return nil, trace.BadParameter("unsupported schema")
}
if err != nil {
return nil, trace.Wrap(err)
}
err = b.turnOnTimeToLive()
if err != nil {
return nil, trace.Wrap(err)
}
return b, nil
}
@ -169,16 +202,36 @@ func (b *DynamoDBBackend) Clock() clockwork.Clock {
return b.clock
}
func (b *DynamoDBBackend) turnOnTimeToLive() error {
status, err := b.svc.DescribeTimeToLive(&dynamodb.DescribeTimeToLiveInput{
TableName: aws.String(b.Tablename),
})
if err != nil {
return trace.Wrap(convertError(err))
}
switch aws.StringValue(status.TimeToLiveDescription.TimeToLiveStatus) {
case dynamodb.TimeToLiveStatusEnabled, dynamodb.TimeToLiveStatusEnabling:
return nil
}
_, err = b.svc.UpdateTimeToLive(&dynamodb.UpdateTimeToLiveInput{
TableName: aws.String(b.Tablename),
TimeToLiveSpecification: &dynamodb.TimeToLiveSpecification{
AttributeName: aws.String(ttlKey),
Enabled: aws.Bool(true),
},
})
return convertError(err)
}
// getTableStatus checks if a given table exists
func (b *DynamoDBBackend) getTableStatus(tableName string) (tableStatus, error) {
td, err := b.svc.DescribeTable(&dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
err = convertError(err)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == resourceNotFoundErr {
return tableStatusMissing, nil
}
if trace.IsNotFound(err) {
return tableStatusMissing, nil
}
return tableStatusError, trace.Wrap(err)
}
@ -198,8 +251,8 @@ func (b *DynamoDBBackend) getTableStatus(tableName string) (tableStatus, error)
// why it's a parameter for migration purposes)
func (b *DynamoDBBackend) createTable(tableName string, rangeKey string) error {
pThroughput := dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(5),
WriteCapacityUnits: aws.Int64(5),
ReadCapacityUnits: aws.Int64(b.ReadCapacityUnits),
WriteCapacityUnits: aws.Int64(b.WriteCapacityUnits),
}
def := []*dynamodb.AttributeDefinition{
{
@ -255,102 +308,6 @@ func (b *DynamoDBBackend) deleteTable(tableName string, wait bool) error {
return nil
}
// migrate checks if the table contains existing data in "old" format
// which used "Key" and "HashKey" attributes prior to Teleport 1.5
//
// this migration function replaces "Key" with "FullPath":
// - load all existing entries and keep them in RAM
// - create <table_name>.bak backup table and copy all entries to it
// - delete the original table_name
// - re-create table_name with a new schema (with "FullPath" instead of "Key")
// - copy all entries to it
func (b *DynamoDBBackend) migrate(tableName string) error {
backupTableName := tableName + ".bak"
noMigrationNeededErr := trace.AlreadyExists("table '%s' has already been migrated. see backup in '%s'",
tableName, backupTableName)
// make sure migration is needed:
if status, _ := b.getTableStatus(tableName); status != tableStatusNeedsMigration {
return trace.Wrap(noMigrationNeededErr)
}
// create backup table, or refuse migration if backup table already exists
s, err := b.getTableStatus(backupTableName)
if err != nil {
return trace.Wrap(err)
}
if s != tableStatusMissing {
return trace.Wrap(noMigrationNeededErr)
}
log.Infof("[DynamoDB] creating backup table '%s'", backupTableName)
if err = b.createTable(backupTableName, oldPathAttr); err != nil {
return trace.Wrap(err)
}
log.Infof("[DynamoDB] backup table '%s' created", backupTableName)
// request all entries in the table (up to 1MB):
log.Infof("[DynamoDB] pulling legacy records out of '%s'", tableName)
result, err := b.svc.Query(&dynamodb.QueryInput{
TableName: aws.String(tableName),
KeyConditions: map[string]*dynamodb.Condition{
"HashKey": {
ComparisonOperator: aws.String(dynamodb.ComparisonOperatorEq),
AttributeValueList: []*dynamodb.AttributeValue{
{
S: aws.String("teleport"),
},
},
},
oldPathAttr: {
ComparisonOperator: aws.String(dynamodb.ComparisonOperatorBeginsWith),
AttributeValueList: []*dynamodb.AttributeValue{
{
S: aws.String("teleport"),
},
},
},
},
})
if err != nil {
return trace.Wrap(err)
}
// copy all items into the backup table:
log.Infof("[DynamoDB] migrating legacy records to backup table '%s'", backupTableName)
for _, item := range result.Items {
_, err = b.svc.PutItem(&dynamodb.PutItemInput{
TableName: aws.String(backupTableName),
Item: item,
})
if err != nil {
return trace.Wrap(err)
}
}
// kill the original table:
log.Infof("[DynamoDB] deleting legacy table '%s'", tableName)
if err = b.deleteTable(tableName, true); err != nil {
log.Warn(err)
}
// re-create the original table:
log.Infof("[DynamoDB] re-creating table '%s' with a new schema", tableName)
if err = b.createTable(tableName, "FullPath"); err != nil {
return trace.Wrap(err)
}
// copy the items into the new table:
log.Infof("[DynamoDB] migrating legacy records to the new schema in '%s'", tableName)
for _, item := range result.Items {
item["FullPath"] = item["Key"]
delete(item, "Key")
_, err = b.svc.PutItem(&dynamodb.PutItemInput{
TableName: aws.String(tableName),
Item: item,
})
if err != nil {
return trace.Wrap(err)
}
}
log.Infof("[DynamoDB] migration succeeded")
return nil
}
// Close the DynamoDB driver
func (b *DynamoDBBackend) Close() error {
return nil
@ -360,20 +317,24 @@ func (b *DynamoDBBackend) fullPath(bucket ...string) string {
return strings.Join(append([]string{"teleport"}, bucket...), "/")
}
// getKeys retrieve all prefixed keys
// WARNING: there is no bucket feature, retrieving a "bucket" mean a full scan on DynamoDB table
// might be quite resource intensive (take care of read provisioning)
func (b *DynamoDBBackend) getKeys(path string) ([]string, error) {
var vals []string
query := "HashKey = :hashKey AND begins_with (#K, :fullpath)"
attrV := map[string]string{":fullpath": path, ":hashKey": hashKey}
attrN := map[string]*string{"#K": aws.String("FullPath")}
// getRecords retrieve all prefixed keys
func (b *DynamoDBBackend) getRecords(path string) ([]record, error) {
var vals []record
query := "HashKey = :hashKey AND begins_with (FullPath, :fullPath)"
attrV := map[string]interface{}{
":fullPath": path,
":hashKey": hashKey,
":timestamp": b.clock.Now().UTC().Unix(),
}
// filter out expired items, otherwise they might show up in the query
// http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html
filter := fmt.Sprintf("attribute_not_exists(Expires) OR Expires >= :timestamp")
av, err := dynamodbattribute.MarshalMap(attrV)
input := dynamodb.QueryInput{
KeyConditionExpression: aws.String(query),
TableName: &b.tableName,
TableName: &b.Tablename,
ExpressionAttributeValues: av,
ExpressionAttributeNames: attrN,
FilterExpression: aws.String(filter),
}
out, err := b.svc.Query(&input)
if err != nil {
@ -388,10 +349,13 @@ func (b *DynamoDBBackend) getKeys(path string) ([]string, error) {
if r.isExpired() {
b.deleteKey(r.FullPath)
} else {
vals = append(vals, suffix(r.FullPath[len(path)+1:]))
r.key = suffix(r.FullPath[len(path)+1:])
vals = append(vals, r)
}
}
}
sort.Sort(records(vals))
vals = removeDuplicates(vals)
return vals, nil
}
@ -407,17 +371,22 @@ func (r *record) isExpired() bool {
return nowUTC.After(expiryDateUTC)
}
func removeDuplicates(elements []string) []string {
func suffix(key string) string {
vals := strings.Split(key, "/")
return vals[0]
}
func removeDuplicates(elements []record) []record {
// Use map to record duplicates as we find them.
encountered := map[string]bool{}
result := []string{}
result := []record{}
for v := range elements {
if encountered[elements[v]] == true {
if encountered[elements[v].key] == true {
// Do not add duplicate.
} else {
// Record this element as an encountered element.
encountered[elements[v]] = true
encountered[elements[v].key] = true
// Append to result slice.
result = append(result, elements[v])
}
@ -426,16 +395,38 @@ func removeDuplicates(elements []string) []string {
return result
}
// GetKeys retrieve all keys matching specific path
func (b *DynamoDBBackend) GetKeys(path []string) ([]string, error) {
log.Debugf("[DynamoDB] call GetKeys(%s)", path)
keys, err := b.getKeys(b.fullPath(path...))
// GetItems is a function that retuns keys in batch
func (b *DynamoDBBackend) GetItems(path []string) ([]backend.Item, error) {
start := time.Now()
fullPath := b.fullPath(path...)
records, err := b.getRecords(fullPath)
if err != nil {
return nil, trace.Wrap(err)
}
sort.Sort(sort.StringSlice(keys))
keys = removeDuplicates(keys)
log.Debugf("[DynamoDB] return GetKeys(%s)=%s", path, keys)
values := make([]backend.Item, len(records))
for i, r := range records {
values[i] = backend.Item{
Key: r.key,
Value: r.Value,
}
}
b.Debugf("GetItems(%v) in %v", fullPath, time.Now().Sub(start))
return values, nil
}
// GetKeys retrieve all keys matching specific path
func (b *DynamoDBBackend) GetKeys(path []string) ([]string, error) {
start := time.Now()
fullPath := b.fullPath(path...)
records, err := b.getRecords(b.fullPath(path...))
if err != nil {
return nil, trace.Wrap(err)
}
keys := make([]string, len(records))
for i, r := range records {
keys[i] = r.key
}
b.Debugf("GetKeys(%v) in %v", fullPath, time.Now().Sub(start))
return keys, nil
}
@ -451,25 +442,23 @@ func (b *DynamoDBBackend) createKey(fullPath string, val []byte, ttl time.Durati
TTL: ttl,
Timestamp: time.Now().UTC().Unix(),
}
if ttl != backend.Forever {
r.Expires = aws.Int64(b.clock.Now().UTC().Add(ttl).Unix())
}
av, err := dynamodbattribute.MarshalMap(r)
if err != nil {
return trace.Wrap(err)
}
input := dynamodb.PutItemInput{
Item: av,
TableName: aws.String(b.tableName),
TableName: aws.String(b.Tablename),
}
if !overwrite {
input.SetConditionExpression("attribute_not_exists(FullPath)")
}
_, err = b.svc.PutItem(&input)
err = convertError(err)
if err != nil {
// special handling for 'already exists':
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == conditionFailedErr {
return trace.AlreadyExists("%s already exists", fullPath)
}
}
return trace.Wrap(err)
}
return nil
@ -530,7 +519,7 @@ func (b *DynamoDBBackend) DeleteBucket(path []string, key string) error {
av, err := dynamodbattribute.MarshalMap(attrV)
input := dynamodb.QueryInput{
KeyConditionExpression: aws.String(query),
TableName: &b.tableName,
TableName: &b.Tablename,
ExpressionAttributeValues: av, ExpressionAttributeNames: attrN,
}
out, err := b.svc.Query(&input)
@ -567,7 +556,7 @@ func (b *DynamoDBBackend) deleteKey(fullPath string) error {
if err != nil {
return trace.Wrap(err)
}
input := dynamodb.DeleteItemInput{Key: av, TableName: aws.String(b.tableName)}
input := dynamodb.DeleteItemInput{Key: av, TableName: aws.String(b.Tablename)}
if _, err = b.svc.DeleteItem(&input); err != nil {
return trace.Wrap(err)
}
@ -582,7 +571,7 @@ func (b *DynamoDBBackend) getKey(fullPath string) (*record, error) {
if err != nil {
return nil, trace.Wrap(err)
}
input := dynamodb.GetItemInput{Key: av, TableName: aws.String(b.tableName)}
input := dynamodb.GetItemInput{Key: av, TableName: aws.String(b.Tablename)}
out, err := b.svc.GetItem(&input)
if err != nil {
return nil, trace.NotFound("%v not found", fullPath)
@ -595,7 +584,7 @@ func (b *DynamoDBBackend) getKey(fullPath string) (*record, error) {
av, _ := dynamodbattribute.MarshalMap(attrV)
input := dynamodb.QueryInput{
KeyConditionExpression: aws.String(query),
TableName: &b.tableName,
TableName: &b.Tablename,
ExpressionAttributeValues: av,
ExpressionAttributeNames: attrN,
}
@ -628,17 +617,43 @@ func (b *DynamoDBBackend) GetVal(path []string, key string) ([]byte, error) {
return r.Value, nil
}
func suffix(key string) string {
vals := strings.Split(key, "/")
return vals[0]
func convertError(err error) error {
if err == nil {
return nil
}
aerr, ok := err.(awserr.Error)
if !ok {
return err
}
switch aerr.Code() {
case dynamodb.ErrCodeConditionalCheckFailedException:
return trace.AlreadyExists(aerr.Error())
case dynamodb.ErrCodeProvisionedThroughputExceededException:
return trace.ConnectionProblem(aerr, aerr.Error())
case dynamodb.ErrCodeResourceNotFoundException:
return trace.NotFound(aerr.Error())
case dynamodb.ErrCodeItemCollectionSizeLimitExceededException:
return trace.BadParameter(aerr.Error())
case dynamodb.ErrCodeInternalServerError:
return trace.BadParameter(aerr.Error())
default:
return err
}
}
// checkConfig helper returns an error if the supplied configuration
// is not enough to connect to DynamoDB
func checkConfig(cfg *DynamoConfig) (err error) {
// table is not configured?
if cfg.Tablename == "" {
return trace.BadParameter("DynamoDB: table_name is not specified")
}
return nil
type records []record
// Len is part of sort.Interface.
func (r records) Len() int {
return len(r)
}
// Swap is part of sort.Interface.
func (r records) Swap(i, j int) {
r[i], r[j] = r[j], r[i]
}
// Less is part of sort.Interface.
func (r records) Less(i, j int) bool {
return r[i].key < r[j].key
}

View file

@ -22,7 +22,6 @@ package dynamo
import (
"testing"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/test"
"github.com/gravitational/teleport/lib/utils"
@ -32,9 +31,9 @@ import (
func TestDynamoDB(t *testing.T) { TestingT(t) }
type DynamoDBSuite struct {
bk *DynamoDBBackend
suite test.BackendSuite
cfg backend.Config
bk *DynamoDBBackend
suite test.BackendSuite
tableName string
}
var _ = Suite(&DynamoDBSuite{})
@ -43,55 +42,22 @@ func (s *DynamoDBSuite) SetUpSuite(c *C) {
utils.InitLoggerForTests()
var err error
s.cfg.Type = "dynamodb"
s.cfg.Tablename = "teleport.dynamo.test"
s.bk, err = New(&s.cfg)
s.tableName = "teleport.dynamo.test"
bk, err := New(map[string]interface{}{
"table_name": s.tableName,
})
c.Assert(err, IsNil)
s.bk = bk.(*DynamoDBBackend)
c.Assert(err, IsNil)
s.suite.B = s.bk
}
func (s *DynamoDBSuite) TearDownSuite(c *C) {
if s.bk != nil && s.bk.svc != nil {
s.bk.deleteTable(s.cfg.Tablename, false)
s.bk.deleteTable(s.tableName, false)
}
}
func (s *DynamoDBSuite) TestMigration(c *C) {
s.cfg.Type = "dynamodb"
s.cfg.Tablename = "teleport.dynamo.test"
// migration uses its own instance of the backend:
bk, err := New(&s.cfg)
c.Assert(err, IsNil)
var (
legacytable = "legacy.teleport.t"
nonExistingTable = "nonexisting.teleport.t"
)
bk.deleteTable(legacytable, true)
bk.deleteTable(legacytable+".bak", false)
defer bk.deleteTable(legacytable, false)
defer bk.deleteTable(legacytable+".bak", false)
status, err := bk.getTableStatus(nonExistingTable)
c.Assert(err, IsNil)
c.Assert(status, Equals, tableStatus(tableStatusMissing))
err = bk.createTable(legacytable, oldPathAttr)
c.Assert(err, IsNil)
status, err = bk.getTableStatus(legacytable)
c.Assert(err, IsNil)
c.Assert(status, Equals, tableStatus(tableStatusNeedsMigration))
err = bk.migrate(legacytable)
c.Assert(err, IsNil)
status, err = bk.getTableStatus(legacytable)
c.Assert(err, IsNil)
c.Assert(status, Equals, tableStatus(tableStatusOK))
}
func (s *DynamoDBSuite) TearDownTest(c *C) {
c.Assert(s.bk.Close(), IsNil)
}
@ -100,6 +66,14 @@ func (s *DynamoDBSuite) TestBasicCRUD(c *C) {
s.suite.BasicCRUD(c)
}
func (s *DynamoDBSuite) TestBatchCRUD(c *C) {
s.suite.BatchCRUD(c)
}
func (s *DynamoDBSuite) TestDirectories(c *C) {
s.suite.Directories(c)
}
func (s *DynamoDBSuite) TestExpiration(c *C) {
s.suite.Expiration(c)
}

View file

@ -102,7 +102,7 @@ func (s *BackendSuite) BasicCRUD(c *C) {
c.Assert(string(out), Equals, "val-updated")
c.Assert(s.B.DeleteKey([]string{"a", "b"}, "bkey"), IsNil)
c.Assert(trace.IsNotFound(s.B.DeleteKey([]string{"a", "b"}, "bkey")), Equals, true)
c.Assert(trace.IsNotFound(s.B.DeleteKey([]string{"a", "b"}, "bkey")), Equals, true, Commentf("%#v", err))
_, err = s.B.GetVal([]string{"a", "b"}, "bkey")
c.Assert(trace.IsNotFound(err), Equals, true, Commentf("%#v", err))
@ -115,6 +115,39 @@ func (s *BackendSuite) BasicCRUD(c *C) {
c.Assert(trace.IsNotFound(err), Equals, true, Commentf("%#v", err))
}
// BatchCRUD tests batch CRUD operations if supported by the backend
func (s *BackendSuite) BatchCRUD(c *C) {
getter, ok := s.B.(backend.ItemsGetter)
if !ok {
c.Skip("backend does not support batch get")
return
}
c.Assert(s.B.UpsertVal([]string{"a", "b"}, "bkey", []byte("val1"), 0), IsNil)
c.Assert(s.B.UpsertVal([]string{"a", "b"}, "akey", []byte("val2"), 0), IsNil)
items, err := getter.GetItems([]string{"a", "b"})
c.Assert(err, IsNil)
c.Assert(len(items), Equals, 2)
c.Assert(string(items[0].Value), Equals, "val2")
c.Assert(items[0].Key, Equals, "akey")
c.Assert(string(items[1].Value), Equals, "val1")
c.Assert(items[1].Key, Equals, "bkey")
}
// Directories checks directories access
func (s *BackendSuite) Directories(c *C) {
bucket := []string{"level1", "level2", "level3"}
c.Assert(s.B.UpsertVal(bucket, "key", []byte("val"), 0), IsNil)
keys, err := s.B.GetKeys(bucket[:2])
c.Assert(err, IsNil)
c.Assert(keys, DeepEquals, []string{"level3"})
keys, err = s.B.GetKeys(bucket[:1])
c.Assert(err, IsNil)
c.Assert(keys, DeepEquals, []string{"level2"})
}
func (s *BackendSuite) Expiration(c *C) {
bucket := []string{"one", "two"}
c.Assert(s.B.UpsertVal(bucket, "bkey", []byte("val1"), time.Second), IsNil)

View file

@ -47,88 +47,90 @@ var (
// true = non-scalar
// false = scalar
validKeys = map[string]bool{
"namespace": true,
"cluster_name": true,
"trusted_clusters": true,
"pid_file": true,
"cert_file": true,
"private_key_file": true,
"cert": true,
"private_key": true,
"checking_keys": true,
"checking_key_files": true,
"signing_keys": true,
"signing_key_files": true,
"allowed_logins": true,
"teleport": true,
"enabled": true,
"ssh_service": true,
"proxy_service": true,
"auth_service": true,
"auth_token": true,
"auth_servers": true,
"domain_name": true,
"storage": false,
"nodename": true,
"log": true,
"period": true,
"connection_limits": true,
"max_connections": true,
"max_users": true,
"rates": true,
"commands": true,
"labels": false,
"output": true,
"severity": true,
"role": true,
"name": true,
"type": true,
"data_dir": true,
"web_listen_addr": true,
"tunnel_listen_addr": true,
"ssh_listen_addr": true,
"listen_addr": true,
"https_key_file": true,
"https_cert_file": true,
"advertise_ip": true,
"authorities": true,
"keys": true,
"reverse_tunnels": true,
"addresses": true,
"oidc_connectors": true,
"id": true,
"issuer_url": true,
"client_id": true,
"client_secret": true,
"redirect_url": true,
"acr_values": true,
"provider": true,
"tokens": true,
"region": true,
"table_name": true,
"access_key": true,
"secret_key": true,
"u2f": true,
"app_id": true,
"facets": true,
"authentication": true,
"second_factor": false,
"oidc": true,
"display": false,
"scope": false,
"claims_to_roles": true,
"dynamic_config": false,
"seed_config": false,
"public_addr": false,
"cache": true,
"ttl": false,
"issuer": false,
"permit_user_env": false,
"ciphers": false,
"kex_algos": false,
"mac_algos": false,
"connector_name": false,
"session_recording": false,
"namespace": true,
"cluster_name": true,
"trusted_clusters": true,
"pid_file": true,
"cert_file": true,
"private_key_file": true,
"cert": true,
"private_key": true,
"checking_keys": true,
"checking_key_files": true,
"signing_keys": true,
"signing_key_files": true,
"allowed_logins": true,
"teleport": true,
"enabled": true,
"ssh_service": true,
"proxy_service": true,
"auth_service": true,
"auth_token": true,
"auth_servers": true,
"domain_name": true,
"storage": false,
"nodename": true,
"log": true,
"period": true,
"connection_limits": true,
"max_connections": true,
"max_users": true,
"rates": true,
"commands": true,
"labels": false,
"output": true,
"severity": true,
"role": true,
"name": true,
"type": true,
"data_dir": true,
"web_listen_addr": true,
"tunnel_listen_addr": true,
"ssh_listen_addr": true,
"listen_addr": true,
"https_key_file": true,
"https_cert_file": true,
"advertise_ip": true,
"authorities": true,
"keys": true,
"reverse_tunnels": true,
"addresses": true,
"oidc_connectors": true,
"id": true,
"issuer_url": true,
"client_id": true,
"client_secret": true,
"redirect_url": true,
"acr_values": true,
"provider": true,
"tokens": true,
"region": true,
"table_name": true,
"access_key": true,
"secret_key": true,
"u2f": true,
"app_id": true,
"facets": true,
"authentication": true,
"second_factor": false,
"oidc": true,
"display": false,
"scope": false,
"claims_to_roles": true,
"dynamic_config": false,
"seed_config": false,
"public_addr": false,
"cache": true,
"ttl": false,
"issuer": false,
"permit_user_env": false,
"ciphers": false,
"kex_algos": false,
"mac_algos": false,
"connector_name": false,
"session_recording": false,
"read_capacity_units": false,
"write_capacity_units": false,
}
)

View file

@ -19,6 +19,7 @@ package local
import (
"encoding/json"
"sort"
"time"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/services"
@ -30,12 +31,20 @@ import (
// PresenceService records and reports the presence of all components
// of the cluster - Nodes, Proxies and SSH nodes
type PresenceService struct {
*log.Entry
backend.Backend
// fast getter of the items extension
getter backend.ItemsGetter
}
// NewPresenceService returns new presence service instance
func NewPresenceService(backend backend.Backend) *PresenceService {
return &PresenceService{Backend: backend}
func NewPresenceService(b backend.Backend) *PresenceService {
getter, _ := b.(backend.ItemsGetter)
return &PresenceService{
Entry: log.WithFields(log.Fields{trace.Component: "Presence"}),
Backend: b,
getter: getter,
}
}
// UpsertLocalClusterName upserts local domain
@ -163,6 +172,10 @@ func (s *PresenceService) GetNodes(namespace string) ([]services.Server, error)
if namespace == "" {
return nil, trace.BadParameter("missing namespace value")
}
if s.getter != nil {
return s.batchGetNodes(namespace)
}
start := time.Now()
keys, err := s.GetKeys([]string{namespacesPrefix, namespace, nodesPrefix})
if err != nil {
return nil, trace.Wrap(err)
@ -182,6 +195,30 @@ func (s *PresenceService) GetNodes(namespace string) ([]services.Server, error)
}
servers = append(servers, server)
}
s.Infof("GetServers(%v) in %v", len(servers), time.Now().Sub(start))
// sorting helps with tests and makes it all deterministic
sort.Sort(services.SortedServers(servers))
return servers, nil
}
// batchGetNodes returns a list of registered servers by using fast batch get
func (s *PresenceService) batchGetNodes(namespace string) ([]services.Server, error) {
start := time.Now()
bucket := []string{namespacesPrefix, namespace, nodesPrefix}
items, err := s.getter.GetItems(bucket)
if err != nil {
return nil, trace.Wrap(err)
}
servers := make([]services.Server, len(items))
for i, item := range items {
server, err := services.GetServerMarshaler().UnmarshalServer(item.Value, services.KindNode)
if err != nil {
return nil, trace.Wrap(err)
}
servers[i] = server
}
s.Infof("GetServers(%v) in %v", len(servers), time.Now().Sub(start))
// sorting helps with tests and makes it all deterministic
sort.Sort(services.SortedServers(servers))
return servers, nil

View file

@ -59,6 +59,10 @@ func InitLogger(purpose LoggingPurpose, level log.Level) {
if val {
return
}
val, _ = strconv.ParseBool(os.Getenv(teleport.DebugEnvVar))
if val {
return
}
log.SetLevel(log.WarnLevel)
log.SetOutput(ioutil.Discard)
}

8
modules/README.md Normal file
View file

@ -0,0 +1,8 @@
# Terraform Modules
Modules is a managed collection of Teleport modules
useful for AWS deployments of Teleport.
* [dynamoautoscale](dynamoautoscale) enables DynamoDB table autoscaling

View file

@ -0,0 +1,117 @@
data "aws_caller_identity" "current" {}
data "aws_region" "current" {
current = true
}
resource "aws_iam_role" "autoscaler" {
name = "${var.table_name}-autoscaler"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "application-autoscaling.amazonaws.com"},
"Action": "sts:AssumeRole"
}
]
}
EOF
}
resource "aws_iam_role_policy" "autoscaler_dynamo" {
name = "${var.table_name}-autoscaler-dynamo"
role = "${aws_iam_role.autoscaler.id}"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:DescribeTable",
"dynamodb:UpdateTable"
],
"Resource": "arn:aws:dynamodb:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:table/${var.table_name}"
}
]
}
EOF
}
resource "aws_iam_role_policy" "autoscaler_cloudwatch" {
name = "${var.table_name}-autoscaler-cloudwatch"
role = "${aws_iam_role.autoscaler.id}"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricAlarm",
"cloudwatch:DescribeAlarms",
"cloudwatch:DeleteAlarms"
],
"Resource": "*"
}
]
}
EOF
}
resource "aws_appautoscaling_target" "read_target" {
max_capacity = "${var.autoscale_max_read_capacity}"
min_capacity = "${var.autoscale_min_read_capacity}"
resource_id = "table/${var.table_name}"
role_arn = "${aws_iam_role.autoscaler.arn}"
scalable_dimension = "dynamodb:table:ReadCapacityUnits"
service_namespace = "dynamodb"
}
resource "aws_appautoscaling_policy" "read_policy" {
name = "DynamoDBReadCapacityUtilization:${aws_appautoscaling_target.read_target.resource_id}"
policy_type = "TargetTrackingScaling"
resource_id = "${aws_appautoscaling_target.read_target.resource_id}"
scalable_dimension = "${aws_appautoscaling_target.read_target.scalable_dimension}"
service_namespace = "${aws_appautoscaling_target.read_target.service_namespace}"
target_tracking_scaling_policy_configuration {
predefined_metric_specification {
predefined_metric_type = "DynamoDBReadCapacityUtilization"
}
target_value = "${var.autoscale_read_target}"
}
}
resource "aws_appautoscaling_target" "write_target" {
max_capacity = "${var.autoscale_max_write_capacity}"
min_capacity = "${var.autoscale_min_write_capacity}"
resource_id = "table/${var.table_name}"
role_arn = "${aws_iam_role.autoscaler.arn}"
scalable_dimension = "dynamodb:table:WriteCapacityUnits"
service_namespace = "dynamodb"
}
resource "aws_appautoscaling_policy" "write_policy" {
name = "DynamoDBWriteCapacityUtilization:${aws_appautoscaling_target.write_target.resource_id}"
policy_type = "TargetTrackingScaling"
resource_id = "${aws_appautoscaling_target.write_target.resource_id}"
scalable_dimension = "${aws_appautoscaling_target.write_target.scalable_dimension}"
service_namespace = "${aws_appautoscaling_target.write_target.service_namespace}"
target_tracking_scaling_policy_configuration {
predefined_metric_specification {
predefined_metric_type = "DynamoDBWriteCapacityUtilization"
}
target_value = "${var.autoscale_write_target}"
}
}

View file

@ -0,0 +1,33 @@
variable "table_name" {
type = "string"
}
variable "autoscale_write_target" {
type = "string"
default = 50
}
variable "autoscale_read_target" {
type = "string"
default = 50
}
variable "autoscale_min_read_capacity" {
type = "string"
default = 5
}
variable "autoscale_max_read_capacity" {
type = "string"
default = 100
}
variable "autoscale_min_write_capacity" {
type = "string"
default = 5
}
variable "autoscale_max_write_capacity" {
type = "string"
default = 100
}