ElastiCache support - the basics (#12209)

This commit is contained in:
STeve (Xin) Huang 2022-05-13 11:09:20 -04:00 committed by GitHub
parent 3de0f5dfda
commit 5c6deb7d9d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
37 changed files with 3287 additions and 1197 deletions

View file

@ -809,6 +809,7 @@ and non interactive tsh bench loads.
- [ ] AWS Aurora Postgres.
- [ ] AWS Aurora MySQL.
- [ ] AWS Redshift.
- [ ] AWS ElastiCache.
- [ ] GCP Cloud SQL Postgres.
- [ ] GCP Cloud SQL MySQL.
- [ ] Connect to a database within a remote cluster via a trusted cluster.
@ -820,6 +821,7 @@ and non interactive tsh bench loads.
- [ ] AWS Aurora Postgres.
- [ ] AWS Aurora MySQL.
- [ ] AWS Redshift.
- [ ] AWS ElastiCache.
- [ ] GCP Cloud SQL Postgres.
- [ ] GCP Cloud SQL MySQL.
- [ ] Verify audit events.
@ -843,6 +845,7 @@ and non interactive tsh bench loads.
- [ ] Can detect and register RDS instances.
- [ ] Can detect and register Aurora clusters, and their reader and custom endpoints.
- [ ] Can detect and register Redshift clusters.
- [ ] Can detect and register ElastiCache Redis clusters.
- [ ] Test Databases screen in the web UI (tab is located on left side nav on dashboard):
- [ ] Verify that all dbs registered are shown with correct `name`, `description`, `type`, and `labels`
- [ ] Verify that clicking on a rows connect button renders a dialogue on manual instructions with `Step 2` login value matching the rows `name` column

View file

@ -22,11 +22,13 @@ import (
"strings"
"time"
"github.com/gogo/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/gravitational/teleport/api/utils"
awsutils "github.com/gravitational/teleport/api/utils/aws"
"github.com/gogo/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/gravitational/trace"
"github.com/sirupsen/logrus"
)
// Database represents a database proxied by a database server.
@ -94,7 +96,11 @@ type Database interface {
IsCloudSQL() bool
// IsAzure returns true if this is an Azure database.
IsAzure() bool
// IsCloudHosted returns true if database is hosted in the cloud (AWS RDS/Aurora/Redshift, Azure or Cloud SQL).
// IsElastiCache returns true if this is an AWS ElastiCache database.
IsElastiCache() bool
// IsAWSHosted returns true if database is hosted by AWS.
IsAWSHosted() bool
// IsCloudHosted returns true if database is hosted in the cloud (AWS, Azure or Cloud SQL).
IsCloudHosted() bool
// Copy returns a copy of this database resource.
Copy() *DatabaseV3
@ -331,9 +337,20 @@ func (d *DatabaseV3) IsAzure() bool {
return d.GetType() == DatabaseTypeAzure
}
// IsCloudHosted returns true if database is hosted in the cloud (AWS RDS/Aurora/Redshift, Azure or Cloud SQL).
// IsElastiCache returns true if this is an AWS ElastiCache database.
func (d *DatabaseV3) IsElastiCache() bool {
return d.GetType() == DatabaseTypeElastiCache
}
// IsAWSHosted returns true if database is hosted by AWS.
func (d *DatabaseV3) IsAWSHosted() bool {
return d.IsRDS() || d.IsRedshift() || d.IsElastiCache()
}
// IsCloudHosted returns true if database is hosted in the cloud (AWS, Azure or
// Cloud SQL).
func (d *DatabaseV3) IsCloudHosted() bool {
return d.IsRDS() || d.IsRedshift() || d.IsCloudSQL() || d.IsAzure()
return d.IsAWSHosted() || d.IsCloudSQL() || d.IsAzure()
}
// GetType returns the database type.
@ -341,6 +358,9 @@ func (d *DatabaseV3) GetType() string {
if d.GetAWS().Redshift.ClusterID != "" {
return DatabaseTypeRedshift
}
if d.GetAWS().ElastiCache.ReplicationGroupID != "" {
return DatabaseTypeElastiCache
}
if d.GetAWS().Region != "" || d.GetAWS().RDS.InstanceID != "" || d.GetAWS().RDS.ClusterID != "" {
return DatabaseTypeRDS
}
@ -431,6 +451,20 @@ func (d *DatabaseV3) CheckAndSetDefaults() error {
if d.Spec.AWS.Region == "" {
d.Spec.AWS.Region = region
}
case awsutils.IsElastiCacheEndpoint(d.Spec.URI):
endpointInfo, err := awsutils.ParseElastiCacheEndpoint(d.Spec.URI)
if err != nil {
logrus.WithError(err).Warnf("Failed to parse %v as ElastiCache endpoint", d.Spec.URI)
break
}
if d.Spec.AWS.ElastiCache.ReplicationGroupID == "" {
d.Spec.AWS.ElastiCache.ReplicationGroupID = endpointInfo.ID
}
if d.Spec.AWS.Region == "" {
d.Spec.AWS.Region = endpointInfo.Region
}
d.Spec.AWS.ElastiCache.TransitEncryptionEnabled = endpointInfo.TransitEncryptionEnabled
d.Spec.AWS.ElastiCache.EndpointType = endpointInfo.EndpointType
case strings.Contains(d.Spec.URI, AzureEndpointSuffix):
name, err := parseAzureEndpoint(d.Spec.URI)
if err != nil {
@ -550,6 +584,8 @@ const (
DatabaseTypeCloudSQL = "gcp"
// DatabaseTypeAzure is Azure-hosted database.
DatabaseTypeAzure = "azure"
// DatabaseTypeElastiCache is AWS-hosted ElastiCache database.
DatabaseTypeElastiCache = "elasticache"
)
// DeduplicateDatabases deduplicates databases by name.

View file

@ -77,6 +77,55 @@ func TestDatabaseStatus(t *testing.T) {
require.Equal(t, awsMeta, database.GetAWS())
}
func TestDatabaseElastiCacheEndpoint(t *testing.T) {
t.Run("valid URI", func(t *testing.T) {
database, err := NewDatabaseV3(Metadata{
Name: "elasticache",
}, DatabaseSpecV3{
Protocol: "redis",
URI: "clustercfg.my-redis-cluster.xxxxxx.cac1.cache.amazonaws.com:6379",
})
require.NoError(t, err)
require.Equal(t, AWS{
Region: "ca-central-1",
ElastiCache: ElastiCache{
ReplicationGroupID: "my-redis-cluster",
TransitEncryptionEnabled: true,
EndpointType: "configuration",
},
}, database.GetAWS())
require.True(t, database.IsElastiCache())
require.True(t, database.IsAWSHosted())
require.True(t, database.IsCloudHosted())
})
t.Run("invalid URI", func(t *testing.T) {
database, err := NewDatabaseV3(Metadata{
Name: "elasticache",
}, DatabaseSpecV3{
Protocol: "redis",
URI: "some.endpoint.cache.amazonaws.com:6379",
AWS: AWS{
Region: "us-east-5",
ElastiCache: ElastiCache{
ReplicationGroupID: "some-id",
},
},
})
// A warning is logged, no error is returned, and AWS metadata is not
// updated.
require.NoError(t, err)
require.Equal(t, AWS{
Region: "us-east-5",
ElastiCache: ElastiCache{
ReplicationGroupID: "some-id",
},
}, database.GetAWS())
})
}
func TestMySQLVersionValidation(t *testing.T) {
t.Parallel()

File diff suppressed because it is too large Load diff

View file

@ -288,6 +288,9 @@ message AWS {
RDS RDS = 3 [ (gogoproto.nullable) = false, (gogoproto.jsontag) = "rds,omitempty" ];
// AccountID is the AWS account ID this database belongs to.
string AccountID = 4 [ (gogoproto.jsontag) = "account_id,omitempty" ];
// ElastiCache contains AWS ElastiCache Redis specific metadata.
ElastiCache ElastiCache = 5
[ (gogoproto.nullable) = false, (gogoproto.jsontag) = "elasticache,omitempty" ];
}
// Redshift contains AWS Redshift specific database metadata.
@ -308,6 +311,19 @@ message RDS {
bool IAMAuth = 4 [ (gogoproto.jsontag) = "iam_auth" ];
}
// ElastiCache contains AWS ElastiCache Redis specific metadata.
message ElastiCache {
// ReplicationGroupID is the Redis replication group ID.
string ReplicationGroupID = 1 [ (gogoproto.jsontag) = "replication_group_id,omitempty" ];
// UserGroupIDs is a list of user group IDs.
repeated string UserGroupIDs = 2 [ (gogoproto.jsontag) = "user_group_ids,omitempty" ];
// TransitEncryptionEnabled indicates whether in-transit encryption (TLS) is enabled.
bool TransitEncryptionEnabled = 3
[ (gogoproto.jsontag) = "transit_encryption_enabled,omitempty" ];
// EndpointType is the type of the endpoint.
string EndpointType = 4 [ (gogoproto.jsontag) = "endpoint_type,omitempty" ];
}
// GCPCloudSQL contains parameters specific to GCP Cloud SQL databases.
message GCPCloudSQL {
// ProjectID is the GCP project ID the Cloud SQL instance resides in.

View file

@ -18,6 +18,8 @@ package aws
import (
"net"
"net/url"
"strconv"
"strings"
"github.com/gravitational/trace"
@ -46,6 +48,13 @@ func IsRedshiftEndpoint(uri string) bool {
IsAWSEndpoint(uri)
}
// IsElastiCacheEndpoint returns true if the input URI is an ElastiCache
// endpoint.
func IsElastiCacheEndpoint(uri string) bool {
return strings.Contains(uri, ElastiCacheSubdomain) &&
IsAWSEndpoint(uri)
}
// ParseRDSEndpoint extracts the identifier and region from the provided RDS
// endpoint.
func ParseRDSEndpoint(endpoint string) (id, region string, err error) {
@ -130,6 +139,210 @@ func parseRedshiftCNEndpoint(endpoint string) (clusterID, region string, err err
return parts[0], parts[3], nil
}
// RedisEndpointInfo describes details extracted from a ElastiCache or MemoryDB
// Redis endpoint.
type RedisEndpointInfo struct {
// ID is the identifier of the endpoint.
ID string
// Region is the AWS region for the endpoint.
Region string
// TransitEncryptionEnabled specifies if in-transit encryption (TLS) is
// enabled.
TransitEncryptionEnabled bool
// EndpointType specifies the type of the endpoint.
EndpointType string
}
const (
// ElastiCacheConfigurationEndpoint is the configuration endpoint that used
// for cluster mode connection.
ElastiCacheConfigurationEndpoint = "configuration"
// ElastiCachePrimaryEndpoint is the endpoint of the primary node in the
// node group.
ElastiCachePrimaryEndpoint = "primary"
// ElastiCachePrimaryEndpoint is the endpoint of the replica nodes in the
// node group.
ElastiCacheReaderEndpoint = "reader"
// ElastiCacheNodeEndpoint is the endpoint that used to connect to an
// individual node.
ElastiCacheNodeEndpoint = "node"
)
// ParseElastiCacheEndpoint extracts the details from the provided
// ElastiCache Redis endpoint.
//
// https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/GettingStarted.ConnectToCacheNode.html
func ParseElastiCacheEndpoint(endpoint string) (*RedisEndpointInfo, error) {
// Remove schema and port.
if !strings.Contains(endpoint, "://") {
endpoint = "redis://" + endpoint
}
parsedURL, err := url.Parse(endpoint)
if err != nil {
return nil, trace.Wrap(err)
}
endpoint = parsedURL.Hostname()
// Remove partition suffix. Note that endpoints for CN regions use the same
// format except they end with AWSCNEndpointSuffix.
endpointWithoutSuffix := ""
switch {
case strings.HasSuffix(endpoint, AWSEndpointSuffix):
endpointWithoutSuffix = strings.TrimSuffix(endpoint, AWSEndpointSuffix)
case strings.HasSuffix(endpoint, AWSCNEndpointSuffix):
endpointWithoutSuffix = strings.TrimSuffix(endpoint, AWSCNEndpointSuffix)
default:
return nil, trace.BadParameter("%v is not a valid AWS endpoint", endpoint)
}
// Split into parts to extract details. They look like this in general:
// <part>.<part>.<part>.<short-region>.cache
//
// Note that ElastiCache uses short region codes like "use1".
//
// For Redis with cluster mode enabled, users can connect through either
// "configuration" endpoint or individual "node" endpoints.
// For Redis with cluster mode disabled, users can connect through either
// "primary", "reader", or individual "node" endpoints.
parts := strings.Split(endpointWithoutSuffix, ".")
if len(parts) == 5 && parts[4] == ElastiCacheServiceName {
region, ok := ShortRegionToRegion(parts[3])
if !ok {
return nil, trace.BadParameter("%v is not a valid region", parts[3])
}
// Configuration endpoint for Redis with TLS enabled looks like:
// clustercfg.my-redis-shards.xxxxxx.use1.cache.<suffix>:6379
if parts[0] == "clustercfg" {
return &RedisEndpointInfo{
ID: parts[1],
Region: region,
TransitEncryptionEnabled: true,
EndpointType: ElastiCacheConfigurationEndpoint,
}, nil
}
// Configuration endpoint for Redis with TLS disabled looks like:
// my-redis-shards.xxxxxx.clustercfg.use1.cache.<suffix>:6379
if parts[2] == "clustercfg" {
return &RedisEndpointInfo{
ID: parts[0],
Region: region,
TransitEncryptionEnabled: false,
EndpointType: ElastiCacheConfigurationEndpoint,
}, nil
}
// Node endpoint for Redis with TLS disabled looks like:
// my-redis-cluster-001.xxxxxx.0001.use0.cache.<suffix>:6379
// my-redis-shards-0001-001.xxxxxx.0001.use0.cache.<suffix>:6379
if isElasticCacheShardID(parts[2]) {
return &RedisEndpointInfo{
ID: trimElastiCacheShardAndNodeID(parts[0]),
Region: region,
TransitEncryptionEnabled: false,
EndpointType: ElastiCacheNodeEndpoint,
}, nil
}
// Node, primary, reader endpoints for Redis with TLS enabled look like:
// my-redis-cluster-001.my-redis-cluster.xxxxxx.use1.cache.<suffix>:6379
// my-redis-shards-0001-001.my-redis-shards.xxxxxx.use1.cache.<suffix>:6379
// master.my-redis-cluster.xxxxxx.use1.cache.<suffix>:6379
// replica.my-redis-cluster.xxxxxx.use1.cache.<suffix>:6379
var endpointType string
switch strings.ToLower(parts[0]) {
case "master":
endpointType = ElastiCachePrimaryEndpoint
case "replica":
endpointType = ElastiCacheReaderEndpoint
default:
endpointType = ElastiCacheNodeEndpoint
}
return &RedisEndpointInfo{
ID: parts[1],
Region: region,
TransitEncryptionEnabled: true,
EndpointType: endpointType,
}, nil
}
// Primary and reader endpoints for Redis with TLS disabled have an extra
// shard ID in the endpoints, and they look like:
// my-redis-cluster.xxxxxx.ng.0001.use1.cache.<suffix>:6379
// my-redis-cluster-ro.xxxxxx.ng.0001.use1.cache.<suffix>:6379
if len(parts) == 6 && parts[5] == ElastiCacheServiceName && isElasticCacheShardID(parts[3]) {
region, ok := ShortRegionToRegion(parts[4])
if !ok {
return nil, trace.BadParameter("%v is not a valid region", parts[4])
}
// Remove "-ro" from reader endpoint.
if strings.HasSuffix(parts[0], "-ro") {
return &RedisEndpointInfo{
ID: strings.TrimSuffix(parts[0], "-ro"),
Region: region,
TransitEncryptionEnabled: false,
EndpointType: ElastiCacheReaderEndpoint,
}, nil
}
return &RedisEndpointInfo{
ID: parts[0],
Region: region,
TransitEncryptionEnabled: false,
EndpointType: ElastiCachePrimaryEndpoint,
}, nil
}
return nil, trace.BadParameter("unknown ElastiCache Redis endpoint format")
}
// isElasticCacheShardID returns true if the input part is in shard ID format.
// The shard ID is a 4 character string of an integer left padded with zeros
// (e.g. 0001).
func isElasticCacheShardID(part string) bool {
if len(part) != 4 {
return false
}
_, err := strconv.Atoi(part)
return err == nil
}
// isElasticCacheNodeID returns true if the input part is in node ID format.
// The node ID is a 3 character string of an integer left padded with zeros
// (e.g. 001).
func isElasticCacheNodeID(part string) bool {
if len(part) != 3 {
return false
}
_, err := strconv.Atoi(part)
return err == nil
}
// trimElastiCacheShardAndNodeID trims shard and node ID suffix from input.
func trimElastiCacheShardAndNodeID(input string) string {
// input can be one of:
// <replication-group-id>
// <replication-group-id>-<node-id>
// <replication-group-id>-<shard-id>-<node-id>
parts := strings.Split(input, "-")
if len(parts) > 0 {
if isElasticCacheNodeID(parts[len(parts)-1]) {
parts = parts[:len(parts)-1]
}
}
if len(parts) > 0 {
if isElasticCacheShardID(parts[len(parts)-1]) {
parts = parts[:len(parts)-1]
}
}
return strings.Join(parts, "-")
}
const (
// AWSEndpointSuffix is the endpoint suffix for AWS Standard and AWS US
// GovCloud regions.
@ -149,9 +362,15 @@ const (
// RedshiftServiceName is the service name for AWS Redshift.
RedshiftServiceName = "redshift"
// ElastiCacheServiceName is the service name for AWS ElastiCache.
ElastiCacheServiceName = "cache"
// RDSEndpointSubdomain is the RDS/Aurora subdomain.
RDSEndpointSubdomain = "." + RDSServiceName + "."
// RedshiftEndpointSubdomain is the Redshift endpoint subdomain.
RedshiftEndpointSubdomain = "." + RedshiftServiceName + "."
// ElastiCacheSubdomain is the ElastiCache endpoint subdomain.
ElastiCacheSubdomain = "." + ElastiCacheServiceName + "."
)

View file

@ -136,3 +136,135 @@ func TestParseRedshiftEndpoint(t *testing.T) {
})
}
}
func TestParseElastiCacheEndpoint(t *testing.T) {
tests := []struct {
name string
inputURI string
expectInfo *RedisEndpointInfo
expectError bool
}{
{
name: "configuration endpoint, TLS enabled",
inputURI: "clustercfg.my-redis-shards.xxxxxx.use1.cache.amazonaws.com:6379",
expectInfo: &RedisEndpointInfo{
ID: "my-redis-shards",
Region: "us-east-1",
TransitEncryptionEnabled: true,
EndpointType: ElastiCacheConfigurationEndpoint,
},
},
{
name: "primary endpoint, TLS enabled",
inputURI: "master.my-redis-cluster.xxxxxx.cac1.cache.amazonaws.com:6379",
expectInfo: &RedisEndpointInfo{
ID: "my-redis-cluster",
Region: "ca-central-1",
TransitEncryptionEnabled: true,
EndpointType: ElastiCachePrimaryEndpoint,
},
},
{
name: "reader endpoint, TLS enabled",
inputURI: "replica.my-redis-cluster.xxxxxx.cac1.cache.amazonaws.com:6379",
expectInfo: &RedisEndpointInfo{
ID: "my-redis-cluster",
Region: "ca-central-1",
TransitEncryptionEnabled: true,
EndpointType: ElastiCacheReaderEndpoint,
},
},
{
name: "node endpoint, TLS enabled",
inputURI: "my-redis-shards-0002-001.my-redis-shards.xxxxxx.cac1.cache.amazonaws.com:6379",
expectInfo: &RedisEndpointInfo{
ID: "my-redis-shards",
Region: "ca-central-1",
TransitEncryptionEnabled: true,
EndpointType: ElastiCacheNodeEndpoint,
},
},
{
name: "configuration endpoint, TLS disabled",
inputURI: "my-redis-shards.xxxxxx.clustercfg.use1.cache.amazonaws.com:6379",
expectInfo: &RedisEndpointInfo{
ID: "my-redis-shards",
Region: "us-east-1",
EndpointType: ElastiCacheConfigurationEndpoint,
},
},
{
name: "primary endpiont, TLS disabled",
inputURI: "my-redis-cluster.xxxxxx.ng.0001.cac1.cache.amazonaws.com:6379",
expectInfo: &RedisEndpointInfo{
ID: "my-redis-cluster",
Region: "ca-central-1",
EndpointType: ElastiCachePrimaryEndpoint,
},
},
{
name: "reader endpiont, TLS disabled",
inputURI: "my-redis-cluster-ro.xxxxxx.ng.0001.cac1.cache.amazonaws.com:6379",
expectInfo: &RedisEndpointInfo{
ID: "my-redis-cluster",
Region: "ca-central-1",
EndpointType: ElastiCacheReaderEndpoint,
},
},
{
name: "node endpoint, TLS disabled",
inputURI: "my-redis-shards-0001-001.xxxxxx.0001.cac1.cache.amazonaws.com:6379",
expectInfo: &RedisEndpointInfo{
ID: "my-redis-shards",
Region: "ca-central-1",
EndpointType: ElastiCacheNodeEndpoint,
},
},
{
name: "CN endpoint",
inputURI: "replica.my-redis-cluster.xxxxxx.cnn1.cache.amazonaws.com.cn:6379",
expectInfo: &RedisEndpointInfo{
ID: "my-redis-cluster",
Region: "cn-north-1",
TransitEncryptionEnabled: true,
EndpointType: ElastiCacheReaderEndpoint,
},
},
{
name: "endpoint with schema and parameters",
inputURI: "redis://my-redis-cluster.xxxxxx.ng.0001.cac1.cache.amazonaws.com:6379?a=b&c=d",
expectInfo: &RedisEndpointInfo{
ID: "my-redis-cluster",
Region: "ca-central-1",
EndpointType: ElastiCachePrimaryEndpoint,
},
},
{
name: "invalid suffix",
inputURI: "replica.my-redis-cluster.xxxxxx.cac1.cache.amazonaws.ca:6379",
expectError: true,
},
{
name: "invalid url",
inputURI: "://replica.my-redis-cluster.xxxxxx.cac1.cache.amazonaws.com:6379",
expectError: true,
},
{
name: "invalid format",
inputURI: "my-redis-cluster.cac1.cache.amazonaws.com:6379",
expectError: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actualInfo, err := ParseElastiCacheEndpoint(test.inputURI)
if test.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, test.expectInfo, actualInfo)
}
})
}
}

View file

@ -16,7 +16,11 @@ limitations under the License.
package aws
import "strings"
import (
"fmt"
"strconv"
"strings"
)
// IsCNRegion returns true if the region is an AWS China region.
func IsCNRegion(region string) bool {
@ -28,6 +32,80 @@ func IsUSGovRegion(region string) bool {
return strings.HasPrefix(region, USGovRegionPrefix)
}
// ShortRegionToRegion converts short region codes to regular region names. For
// example, a short region "use1" maps to region "us-east-1".
//
// There is no official documentation on this mapping. Here is gist of others
// collecting these naming schemes:
// https://gist.github.com/colinvh/14e4b7fb6b66c29f79d3
//
// This function currently does not support regions in secert partitions.
func ShortRegionToRegion(shortRegion string) (string, bool) {
var prefix, direction string
// Determine region prefix.
remain := strings.ToLower(shortRegion)
switch {
case strings.HasPrefix(remain, "usg"):
prefix = USGovRegionPrefix
remain = remain[3:]
case strings.HasPrefix(remain, "cn"):
prefix = CNRegionPrefix
remain = remain[2:]
default:
// For regions in standard partition, the first two letters is the
// continent or country code (e.g. "eu" for Europe, "us" for US).
if len(remain) < 2 {
return "", false
}
prefix = remain[:2] + "-"
remain = remain[2:]
}
// Map direction codes.
switch {
case strings.HasPrefix(remain, "nw"):
direction = "northwest"
remain = remain[2:]
case strings.HasPrefix(remain, "ne"):
direction = "northeast"
remain = remain[2:]
case strings.HasPrefix(remain, "se"):
direction = "southeast"
remain = remain[2:]
case strings.HasPrefix(remain, "sw"):
direction = "southwest"
remain = remain[2:]
case strings.HasPrefix(remain, "n"):
direction = "north"
remain = remain[1:]
case strings.HasPrefix(remain, "e"):
direction = "east"
remain = remain[1:]
case strings.HasPrefix(remain, "w"):
direction = "west"
remain = remain[1:]
case strings.HasPrefix(remain, "s"):
direction = "south"
remain = remain[1:]
case strings.HasPrefix(remain, "c"):
direction = "central"
remain = remain[1:]
default:
return "", false
}
// Remain should be a number.
if _, err := strconv.Atoi(remain); err != nil {
return "", false
}
return fmt.Sprintf("%s%s-%s", prefix, direction, remain), true
}
const (
// CNRegionPrefix is the prefix for all AWS China regions.
CNRegionPrefix = "cn-"

View file

@ -0,0 +1,88 @@
/*
Copyright 2022 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aws
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestShortRegionToRegion(t *testing.T) {
t.Run("valid regions", func(t *testing.T) {
t.Parallel()
validRegionMap := map[string]string{
"use1": "us-east-1",
"use2": "us-east-2",
"usw1": "us-west-1",
"usw2": "us-west-2",
"cac1": "ca-central-1",
"euw1": "eu-west-1",
"euw2": "eu-west-2",
"euw3": "eu-west-3",
"euc1": "eu-central-1",
"eus1": "eu-south-1",
"eun1": "eu-north-1",
"apse1": "ap-southeast-1",
"apse2": "ap-southeast-2",
"aps1": "ap-south-1",
"apne1": "ap-northeast-1",
"apne2": "ap-northeast-2",
"ape1": "ap-east-1",
"sae1": "sa-east-1",
"afs1": "af-south-1",
"usgw1": "us-gov-west-1",
"usge1": "us-gov-east-1",
"cnn1": "cn-north-1",
"cnnw1": "cn-northwest-1",
}
for shortRegion, expectRegion := range validRegionMap {
actualRegion, ok := ShortRegionToRegion(shortRegion)
require.True(t, ok)
require.Equal(t, expectRegion, actualRegion)
}
})
invalidTests := []struct {
name string
input string
}{
{
name: "invalid prefix",
input: "u",
},
{
name: "not ended in number",
input: "use1b",
},
{
name: "invalid direction",
input: "usx1",
},
}
for _, test := range invalidTests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
_, ok := ShortRegionToRegion(test.input)
require.False(t, ok)
})
}
}

View file

@ -133,7 +133,7 @@ type CommandLineFlags struct {
DatabaseAWSRegion string
// DatabaseAWSRedshiftClusterID is Redshift cluster identifier.
DatabaseAWSRedshiftClusterID string
// DatabaseAWSRDSClusterID is RDS instance identifier.
// DatabaseAWSRDSInstanceID is RDS instance identifier.
DatabaseAWSRDSInstanceID string
// DatabaseAWSRDSClusterID is RDS cluster (Aurora) cluster identifier.
DatabaseAWSRDSClusterID string
@ -1133,6 +1133,9 @@ func applyDatabasesConfig(fc *FileConfig, cfg *service.Config) error {
InstanceID: database.AWS.RDS.InstanceID,
ClusterID: database.AWS.RDS.ClusterID,
},
ElastiCache: service.DatabaseAWSElastiCache{
ReplicationGroupID: database.AWS.ElastiCache.ReplicationGroupID,
},
},
GCP: service.DatabaseGCP{
ProjectID: database.GCP.ProjectID,

View file

@ -34,6 +34,7 @@ var databaseConfigTemplateFuncs = template.FuncMap{
}
// databaseAgentConfigurationTemplate database configuration template.
// TODO(greedy52) add documentation link to ElastiCache page.
var databaseAgentConfigurationTemplate = template.Must(template.New("").Funcs(databaseConfigTemplateFuncs).Parse(`#
# Teleport database agent configuration file.
# Configuration reference: https://goteleport.com/docs/database-access/reference/configuration/
@ -89,6 +90,18 @@ db_service:
tags:
"*": "*"
{{- end }}
{{- if .ElastiCacheDiscoveryRegions }}
# ElastiCache databases auto-discovery.
- types: ["elasticache"]
# AWS regions to register databases from.
regions:
{{- range .ElastiCacheDiscoveryRegions }}
- {{ . }}
{{- end }}
# AWS resource tags to match when registering databases.
tags:
"*": "*"
{{- end }}
# Lists statically registered databases proxied by this agent.
{{- if .StaticDatabaseName }}
databases:
@ -162,6 +175,20 @@ db_service:
# redshift:
# # Redshift Cluster ID.
# cluster_id: redshift-cluster-example-1
# # ElastiCache database static configuration.
# - name: elasticache
# description: AWS ElastiCache cluster configuration example.
# protocol: redis
# # Database connection endpoint. Must be reachable from Database service.
# uri: master.redis-cluster-example.abcdef.usw1.cache.amazonaws.com:6379
# # AWS specific configuration.
# aws:
# # Region the database is deployed in.
# region: us-west-1
# # ElastiCache specific configuration.
# elasticache:
# # ElastiCache replication group ID.
# replication_group_id: redis-cluster-example
# # Self-hosted static configuration.
# - name: self-hosted
# description: Self-hosted database configuration.
@ -205,13 +232,16 @@ type DatabaseSampleFlags struct {
AuthToken string
// CAPins are the SKPI hashes of the CAs used to verify the Auth Server.
CAPins []string
// RDSDiscoveryRegions list of regions the RDS auto-discovery is
// RDSDiscoveryRegions is a list of regions the RDS auto-discovery is
// configured.
RDSDiscoveryRegions []string
// RedshiftDiscoveryRegions list of regions the Redshift auto-discovery is
// configured.
// RedshiftDiscoveryRegions is a list of regions the Redshift
// auto-discovery is configured.
RedshiftDiscoveryRegions []string
// DatabaseProtocols list of database protocols supported.
// ElastiCacheDiscoveryRegions is a list of regions the ElastiCache
// auto-discovery is configured.
ElastiCacheDiscoveryRegions []string
// DatabaseProtocols is a list of database protocols supported.
DatabaseProtocols []string
}

View file

@ -1151,7 +1151,7 @@ type ResourceMatcher struct {
// AWSMatcher matches AWS databases.
type AWSMatcher struct {
// Types are AWS database types to match, "rds" or "redshift".
// Types are AWS database types to match, "rds", "redshift", or "elasticache".
Types []string `yaml:"types,omitempty"`
// Regions are AWS regions to query for databases.
Regions []string `yaml:"regions,omitempty"`
@ -1226,6 +1226,8 @@ type DatabaseAWS struct {
Redshift DatabaseAWSRedshift `yaml:"redshift"`
// RDS contains RDS specific settings.
RDS DatabaseAWSRDS `yaml:"rds"`
// ElastiCache contains ElastiCache specific settings.
ElastiCache DatabaseAWSElastiCache `yaml:"elasticache"`
}
// DatabaseAWSRedshift contains AWS Redshift specific settings.
@ -1242,6 +1244,12 @@ type DatabaseAWSRDS struct {
ClusterID string `yaml:"cluster_id,omitempty"`
}
// DatabaseAWSElastiCache contains settings for ElastiCache databases.
type DatabaseAWSElastiCache struct {
// ReplicationGroupID is the ElastiCache replication group ID.
ReplicationGroupID string `yaml:"replication_group_id,omitempty"`
}
// DatabaseGCP contains GCP specific settings for Cloud SQL databases.
type DatabaseGCP struct {
// ProjectID is the GCP project ID where the database is deployed.

View file

@ -18,10 +18,10 @@ import (
"context"
"fmt"
"github.com/gravitational/teleport/api/types"
awsutils "github.com/gravitational/teleport/api/utils/aws"
awslib "github.com/gravitational/teleport/lib/cloud/aws"
"github.com/gravitational/teleport/lib/config"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/trace"
"github.com/aws/aws-sdk-go/aws/arn"
@ -55,15 +55,23 @@ var (
roleBaseActions = []string{"iam:GetRolePolicy", "iam:PutRolePolicy", "iam:DeleteRolePolicy"}
// rdsActions list of actions used when giving RDS permissions.
rdsActions = []string{"rds:DescribeDBInstances", "rds:ModifyDBInstance"}
// auroraActions list of acions used when giving RDS Aurora permissions.
// auroraActions list of actions used when giving RDS Aurora permissions.
auroraActions = []string{"rds:DescribeDBClusters", "rds:ModifyDBCluster"}
// redshiftActions list of actions used when giving Redshift auto-discovery
// permissions.
redshiftActions = []string{"redshift:DescribeClusters"}
// boundaryRDSAuroraActions aditional actions added to the policy boundary
// elastiCacheActions is a list of actions used for ElastiCache
// auto-discovery and metadata update.
elastiCacheActions = []string{
"elasticache:ListTagsForResource",
"elasticache:DescribeReplicationGroups",
"elasticache:DescribeCacheClusters",
"elasticache:DescribeCacheSubnetGroups",
}
// boundaryRDSAuroraActions additional actions added to the policy boundary
// when policy has RDS auto-discovery.
boundaryRDSAuroraActions = []string{"rds-db:connect"}
// boundaryRedshiftActions aditional actions added to the policy boundary
// boundaryRedshiftActions additional actions added to the policy boundary
// when policy has Redshift auto-discovery.
boundaryRedshiftActions = []string{"redshift:GetClusterCredentials"}
)
@ -154,7 +162,7 @@ func (a *awsConfigurator) IsEmpty() bool {
return len(a.actions) == 0
}
// Name returns humam-readable configurator name.
// Name returns human-readable configurator name.
func (a *awsConfigurator) Name() string {
return "AWS"
}
@ -182,7 +190,7 @@ func (a *awsPolicyCreator) Description() string {
return fmt.Sprintf("Create IAM Policy %q", a.policy.Name)
}
// Details returnst the policy document that will be created.
// Details returns the policy document that will be created.
func (a *awsPolicyCreator) Details() string {
return a.formattedPolicy
}
@ -359,15 +367,20 @@ func buildPolicyDocument(flags BootstrapFlags, fileConfig *config.FileConfig, ta
statements = append(statements, buildRedshiftStatements()...)
}
// ElastiCache does not require permissions to edit user/role IAM policy.
if hasElastiCacheDatabases(flags, fileConfig) {
statements = append(statements, buildElastiCacheStatements()...)
}
// If RDS the auto discovery is enabled or there are Redshift databases, we
// need permission to edit the target user/role.
if rdsAutoDiscovery || redshiftDatabases {
targetStaments, err := buildIAMEditStatements(target)
targetStatements, err := buildIAMEditStatements(target)
if err != nil {
return nil, trace.Wrap(err)
}
statements = append(statements, targetStaments...)
statements = append(statements, targetStatements...)
}
document := awslib.NewPolicyDocument()
@ -394,15 +407,20 @@ func buildPolicyBoundaryDocument(flags BootstrapFlags, fileConfig *config.FileCo
statements = append(statements, buildRedshiftBoundaryStatements()...)
}
// ElastiCache does not require permissions to edit user/role IAM policy.
if hasElastiCacheDatabases(flags, fileConfig) {
statements = append(statements, buildElastiCacheBoundaryStatements()...)
}
// If RDS the auto discovery is enabled or there are Redshift databases, we
// need permission to edit the target user/role.
if rdsAutoDiscovery || redshiftDatabases {
targetStaments, err := buildIAMEditStatements(target)
targetStatements, err := buildIAMEditStatements(target)
if err != nil {
return nil, trace.Wrap(err)
}
statements = append(statements, targetStaments...)
statements = append(statements, targetStatements...)
}
document := awslib.NewPolicyDocument()
@ -422,15 +440,7 @@ func isRDSAutoDiscoveryEnabled(flags BootstrapFlags, fileConfig *config.FileConf
return true
}
for _, matcher := range fileConfig.Databases.AWSMatchers {
for _, databaseType := range matcher.Types {
if databaseType == types.DatabaseTypeRDS {
return true
}
}
}
return false
return isAutoDiscoveryEnabledForMatcher(fileConfig, services.AWSMatcherRDS)
}
// hasRedshiftDatabases checks if the agent needs permission for
@ -440,22 +450,42 @@ func hasRedshiftDatabases(flags BootstrapFlags, fileConfig *config.FileConfig) b
return true
}
// Check if Redshift auto-discovery is enabled.
return isAutoDiscoveryEnabledForMatcher(fileConfig, services.AWSMatcherRedshift) ||
findEndpointIs(fileConfig, awsutils.IsRedshiftEndpoint)
}
// hasElastiCacheDatabases checks if the agent needs permission for
// ElastiCache databases.
func hasElastiCacheDatabases(flags BootstrapFlags, fileConfig *config.FileConfig) bool {
if flags.ForceElastiCachePermissions {
return true
}
return isAutoDiscoveryEnabledForMatcher(fileConfig, services.AWSMatcherElastiCache) ||
findEndpointIs(fileConfig, awsutils.IsElastiCacheEndpoint)
}
// isAutoDiscoveryEnabledForMatcher returns true if provided AWS matcher type
// is found.
func isAutoDiscoveryEnabledForMatcher(fileConfig *config.FileConfig, matcherType string) bool {
for _, matcher := range fileConfig.Databases.AWSMatchers {
for _, databaseType := range matcher.Types {
if databaseType == types.DatabaseTypeRedshift {
if databaseType == matcherType {
return true
}
}
}
return false
}
// Check if there is any static Redshift database configured.
// findEndpointIs returns true if provided check returns true for any static
// endpoint.
func findEndpointIs(fileConfig *config.FileConfig, endpointIs func(string) bool) bool {
for _, database := range fileConfig.Databases.Databases {
if awsutils.IsRedshiftEndpoint(database.URI) {
if endpointIs(database.URI) {
return true
}
}
return false
}
@ -526,3 +556,21 @@ func buildRedshiftBoundaryStatements() []*awslib.Statement {
},
}
}
// buildElastiCacheStatements returns IAM statements necessary for ElastiCache
// databases.
func buildElastiCacheStatements() []*awslib.Statement {
return []*awslib.Statement{
{
Effect: awslib.EffectAllow,
Actions: elastiCacheActions,
Resources: []string{"*"},
},
}
}
// buildElastiCacheBoundaryStatements returns IAM boundary statements necessary
// for ElastiCache databases.
func buildElastiCacheBoundaryStatements() []*awslib.Statement {
return buildElastiCacheStatements()
}

View file

@ -190,6 +190,61 @@ func TestAWSIAMDocuments(t *testing.T) {
}},
},
},
"ElastiCache auto discovery": {
target: roleTarget,
fileConfig: &config.FileConfig{
Databases: config.Databases{
AWSMatchers: []config.AWSMatcher{
{Types: []string{types.DatabaseTypeElastiCache}, Regions: []string{"us-west-2"}},
},
},
},
statements: []*awslib.Statement{
{Effect: awslib.EffectAllow, Resources: []string{"*"}, Actions: []string{
"elasticache:ListTagsForResource",
"elasticache:DescribeReplicationGroups",
"elasticache:DescribeCacheClusters",
"elasticache:DescribeCacheSubnetGroups",
}},
},
boundaryStatements: []*awslib.Statement{
{Effect: awslib.EffectAllow, Resources: []string{"*"}, Actions: []string{
"elasticache:ListTagsForResource",
"elasticache:DescribeReplicationGroups",
"elasticache:DescribeCacheClusters",
"elasticache:DescribeCacheSubnetGroups",
}},
},
},
"ElastiCache static database": {
target: roleTarget,
fileConfig: &config.FileConfig{
Databases: config.Databases{
Databases: []*config.Database{
{
Name: "redis-1",
URI: "clustercfg.redis1.xxxxxx.usw2.cache.amazonaws.com:6379",
},
},
},
},
statements: []*awslib.Statement{
{Effect: awslib.EffectAllow, Resources: []string{"*"}, Actions: []string{
"elasticache:ListTagsForResource",
"elasticache:DescribeReplicationGroups",
"elasticache:DescribeCacheClusters",
"elasticache:DescribeCacheSubnetGroups",
}},
},
boundaryStatements: []*awslib.Statement{
{Effect: awslib.EffectAllow, Resources: []string{"*"}, Actions: []string{
"elasticache:ListTagsForResource",
"elasticache:DescribeReplicationGroups",
"elasticache:DescribeCacheClusters",
"elasticache:DescribeCacheSubnetGroups",
}},
},
},
"AutoDiscoveryUnknownIdentity": {
returnError: true,
target: unknownIdentity,

View file

@ -38,8 +38,10 @@ type BootstrapFlags struct {
AttachToRole string
// ForceRDSPermissions forces the presence of RDS permissions.
ForceRDSPermissions bool
// ForceAuroraPermissions forces the presence of Redshift permissions.
// ForceRedshiftPermissions forces the presence of Redshift permissions.
ForceRedshiftPermissions bool
// ForceElastiCachePermissions forces the presence of ElastiCache permissions.
ForceElastiCachePermissions bool
}
// Configurator responsible for generating a list of actions that needs to be

View file

@ -704,6 +704,8 @@ type DatabaseAWS struct {
Redshift DatabaseAWSRedshift
// RDS contains RDS specific settings.
RDS DatabaseAWSRDS
// ElastiCache contains ElastiCache specific settings.
ElastiCache DatabaseAWSElastiCache
}
// DatabaseAWSRedshift contains AWS Redshift specific settings.
@ -720,6 +722,12 @@ type DatabaseAWSRDS struct {
ClusterID string
}
// DatabaseAWSElastiCache contains settings for ElastiCache databases.
type DatabaseAWSElastiCache struct {
// ReplicationGroupID is the ElastiCache replication group ID.
ReplicationGroupID string
}
// DatabaseGCP contains GCP specific settings for Cloud SQL databases.
type DatabaseGCP struct {
// ProjectID is the GCP project ID where the database is deployed.

View file

@ -119,6 +119,9 @@ func (process *TeleportProcess) initDatabaseService() (retErr error) {
InstanceID: db.AWS.RDS.InstanceID,
ClusterID: db.AWS.RDS.ClusterID,
},
ElastiCache: types.ElastiCache{
ReplicationGroupID: db.AWS.ElastiCache.ReplicationGroupID,
},
},
GCP: types.GCPCloudSQL{
ProjectID: db.GCP.ProjectID,

View file

@ -29,6 +29,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/aws/aws-sdk-go/service/elasticache"
"github.com/aws/aws-sdk-go/service/rds"
"github.com/aws/aws-sdk-go/service/redshift"
@ -239,6 +240,63 @@ func NewDatabaseFromRedshiftCluster(cluster *redshift.Cluster) (types.Database,
})
}
// NewDatabaseFromElastiCacheConfigurationEndpoint creates a database resource
// from ElastiCache configuration endpoint.
func NewDatabaseFromElastiCacheConfigurationEndpoint(cluster *elasticache.ReplicationGroup, extraLabels map[string]string) (types.Database, error) {
if cluster.ConfigurationEndpoint == nil {
return nil, trace.BadParameter("missing configuration endpoint")
}
return newElastiCacheDatabase(cluster, cluster.ConfigurationEndpoint, awsutils.ElastiCacheConfigurationEndpoint, extraLabels)
}
// NewDatabasesFromElastiCacheNodeGroups creates database resources from
// ElastiCache node groups.
func NewDatabasesFromElastiCacheNodeGroups(cluster *elasticache.ReplicationGroup, extraLabels map[string]string) (types.Databases, error) {
var databases types.Databases
for _, nodeGroup := range cluster.NodeGroups {
if nodeGroup.PrimaryEndpoint != nil {
database, err := newElastiCacheDatabase(cluster, nodeGroup.PrimaryEndpoint, awsutils.ElastiCachePrimaryEndpoint, extraLabels)
if err != nil {
return nil, trace.Wrap(err)
}
databases = append(databases, database)
}
if nodeGroup.ReaderEndpoint != nil {
database, err := newElastiCacheDatabase(cluster, nodeGroup.ReaderEndpoint, awsutils.ElastiCacheReaderEndpoint, extraLabels)
if err != nil {
return nil, trace.Wrap(err)
}
databases = append(databases, database)
}
}
return databases, nil
}
// newElastiCacheDatabase returns a new ElastiCache database.
func newElastiCacheDatabase(cluster *elasticache.ReplicationGroup, endpoint *elasticache.Endpoint, endpointType string, extraLabels map[string]string) (types.Database, error) {
metadata, err := MetadataFromElastiCacheCluster(cluster, endpointType)
if err != nil {
return nil, trace.Wrap(err)
}
name := aws.StringValue(cluster.ReplicationGroupId)
if endpointType == awsutils.ElastiCacheReaderEndpoint {
name = fmt.Sprintf("%s-%s", name, endpointType)
}
return types.NewDatabaseV3(types.Metadata{
Name: name,
Description: fmt.Sprintf("ElastiCache cluster in %v (%v endpoint)", metadata.Region, endpointType),
Labels: labelsFromElastiCacheCluster(metadata, endpointType, extraLabels),
}, types.DatabaseSpecV3{
Protocol: defaults.ProtocolRedis,
URI: fmt.Sprintf("%v:%v", aws.StringValue(endpoint.Address), aws.Int64Value(endpoint.Port)),
AWS: *metadata,
})
}
// MetadataFromRDSInstance creates AWS metadata from the provided RDS instance.
func MetadataFromRDSInstance(rdsInstance *rds.DBInstance) (*types.AWS, error) {
parsedARN, err := arn.Parse(aws.StringValue(rdsInstance.DBInstanceArn))
@ -289,6 +347,67 @@ func MetadataFromRedshiftCluster(cluster *redshift.Cluster) (*types.AWS, error)
}, nil
}
// MetadataFromElastiCacheCluster creates AWS metadata for the provided
// ElastiCache cluster.
func MetadataFromElastiCacheCluster(cluster *elasticache.ReplicationGroup, endpointType string) (*types.AWS, error) {
parsedARN, err := arn.Parse(aws.StringValue(cluster.ARN))
if err != nil {
return nil, trace.Wrap(err)
}
return &types.AWS{
Region: parsedARN.Region,
AccountID: parsedARN.AccountID,
ElastiCache: types.ElastiCache{
ReplicationGroupID: aws.StringValue(cluster.ReplicationGroupId),
UserGroupIDs: aws.StringValueSlice(cluster.UserGroupIds),
TransitEncryptionEnabled: aws.BoolValue(cluster.TransitEncryptionEnabled),
EndpointType: endpointType,
},
}, nil
}
// ExtraElastiCacheLabels returns a list of extra labels for provided
// ElastiCache cluster.
func ExtraElastiCacheLabels(cluster *elasticache.ReplicationGroup, tags []*elasticache.Tag, allNodes []*elasticache.CacheCluster, allSubnetGroups []*elasticache.CacheSubnetGroup) map[string]string {
replicationGroupID := aws.StringValue(cluster.ReplicationGroupId)
subnetGroupName := ""
labels := make(map[string]string)
// Add AWS resource tags.
for _, tag := range tags {
key := aws.StringValue(tag.Key)
if types.IsValidLabelKey(key) {
labels[key] = aws.StringValue(tag.Value)
} else {
log.Debugf("Skipping ElastiCache tag %q, not a valid label key.", key)
}
}
// Find any node belongs to this cluster and set engine version label.
for _, node := range allNodes {
if aws.StringValue(node.ReplicationGroupId) == replicationGroupID {
subnetGroupName = aws.StringValue(node.CacheSubnetGroupName)
labels[labelEngineVersion] = aws.StringValue(node.EngineVersion)
break
}
}
// Find the subnet group used by this cluster and set VPC ID label.
//
// ElastiCache servers do not have public IPs so they are usually only
// accessible within the same VPC. Having a VPC ID label can be very useful
// for filtering.
for _, subnetGroup := range allSubnetGroups {
if aws.StringValue(subnetGroup.CacheSubnetGroupName) == subnetGroupName {
labels[labelVPCID] = aws.StringValue(subnetGroup.VpcId)
break
}
}
return labels
}
// engineToProtocol converts RDS instance engine to the database protocol.
func engineToProtocol(engine string) string {
switch engine {
@ -331,6 +450,8 @@ func labelsFromRedshiftCluster(cluster *redshift.Cluster, meta *types.AWS) map[s
key := aws.StringValue(tag.Key)
if types.IsValidLabelKey(key) {
labels[key] = aws.StringValue(tag.Value)
} else {
log.Debugf("Skipping Redshift tag %q, not a valid label key.", key)
}
}
labels[types.OriginLabel] = types.OriginCloud
@ -339,6 +460,20 @@ func labelsFromRedshiftCluster(cluster *redshift.Cluster, meta *types.AWS) map[s
return labels
}
// labelsFromElastiCacheCluster creates database labels for the provided
// ElastiCache cluster.
func labelsFromElastiCacheCluster(meta *types.AWS, endpointType string, extraLabels map[string]string) map[string]string {
labels := make(map[string]string)
for key, value := range extraLabels {
labels[key] = value
}
labels[types.OriginLabel] = types.OriginCloud
labels[labelAccountID] = meta.AccountID
labels[labelRegion] = meta.Region
labels[labelEndpointType] = endpointType
return labels
}
// rdsTagsToLabels converts RDS tags to a labels map.
func rdsTagsToLabels(tags []*rds.Tag) map[string]string {
labels := make(map[string]string)
@ -379,8 +514,7 @@ func IsRDSInstanceSupported(instance *rds.DBInstance) bool {
return !ver.LessThan(*minIAMSupportedVer)
}
// IsRDSClusterSupported checks whether the aurora cluster is supported and logs
// related info if not.
// IsRDSClusterSupported checks whether the Aurora cluster is supported.
func IsRDSClusterSupported(cluster *rds.DBCluster) bool {
switch aws.StringValue(cluster.EngineMode) {
// Aurora Serverless (v1 and v2) does not support IAM authentication
@ -400,6 +534,12 @@ func IsRDSClusterSupported(cluster *rds.DBCluster) bool {
return true
}
// IsElastiCacheClusterSupported checks whether the ElastiCache cluster is
// supported.
func IsElastiCacheClusterSupported(cluster *elasticache.ReplicationGroup) bool {
return aws.BoolValue(cluster.TransitEncryptionEnabled)
}
// IsRDSInstanceAvailable checks if the RDS instance is available.
func IsRDSInstanceAvailable(instance *rds.DBInstance) bool {
// For a full list of status values, see:
@ -501,6 +641,26 @@ func IsRedshiftClusterAvailable(cluster *redshift.Cluster) bool {
}
}
// IsElastiCacheClusterAvailable checks if the ElastiCache cluster is
// available.
func IsElastiCacheClusterAvailable(cluster *elasticache.ReplicationGroup) bool {
switch aws.StringValue(cluster.Status) {
case "available", "modifying", "snapshotting":
return true
case "creating", "deleting", "create-failed":
return false
default:
log.Warnf("Unknown status type: %q. Assuming ElastiCache %q is available.",
aws.StringValue(cluster.Status),
aws.StringValue(cluster.ReplicationGroupId),
)
return true
}
}
// auroraMySQLVersion extracts aurora mysql version from engine version
func auroraMySQLVersion(cluster *rds.DBCluster) string {
// version guide: https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/AuroraMySQL.Updates.Versions.html
@ -548,6 +708,8 @@ const (
labelEngineVersion = "engine-version"
// labelEndpointType is the label key containing the RDS endpoint type.
labelEndpointType = "endpoint-type"
// labelVPCID is the label key containing the VPC ID.
labelVPCID = "vpc-id"
)
const (
@ -565,7 +727,7 @@ const (
RDSEngineAuroraPostgres = "aurora-postgresql"
)
// RDSEndpointType specifies the endpoint type
// RDSEndpointType specifies the endpoint type for RDS clusters.
type RDSEndpointType string
const (

View file

@ -24,12 +24,14 @@ import (
"github.com/google/uuid"
"github.com/gravitational/teleport/api/types"
awsutils "github.com/gravitational/teleport/api/utils/aws"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/fixtures"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/trace"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/elasticache"
"github.com/aws/aws-sdk-go/service/rds"
"github.com/aws/aws-sdk-go/service/redshift"
"github.com/stretchr/testify/require"
@ -486,6 +488,252 @@ func TestDatabaseFromRedshiftCluster(t *testing.T) {
})
}
func TestDatabaseFromElastiCacheConfigurationEndpoint(t *testing.T) {
cluster := &elasticache.ReplicationGroup{
ARN: aws.String("arn:aws:elasticache:us-east-1:1234567890:replicationgroup:my-cluster"),
ReplicationGroupId: aws.String("my-cluster"),
Status: aws.String("available"),
TransitEncryptionEnabled: aws.Bool(true),
ClusterEnabled: aws.Bool(true),
ConfigurationEndpoint: &elasticache.Endpoint{
Address: aws.String("configuration.localhost"),
Port: aws.Int64(6379),
},
UserGroupIds: []*string{aws.String("my-user-group")},
NodeGroups: []*elasticache.NodeGroup{
{
NodeGroupId: aws.String("0001"),
NodeGroupMembers: []*elasticache.NodeGroupMember{
{
CacheClusterId: aws.String("my-cluster-0001-001"),
},
{
CacheClusterId: aws.String("my-cluster-0001-002"),
},
},
},
{
NodeGroupId: aws.String("0002"),
NodeGroupMembers: []*elasticache.NodeGroupMember{
{
CacheClusterId: aws.String("my-cluster-0002-001"),
},
{
CacheClusterId: aws.String("my-cluster-0002-002"),
},
},
},
},
}
extraLabels := map[string]string{"key": "value"}
expected, err := types.NewDatabaseV3(types.Metadata{
Name: "my-cluster",
Description: "ElastiCache cluster in us-east-1 (configuration endpoint)",
Labels: map[string]string{
types.OriginLabel: types.OriginCloud,
labelAccountID: "1234567890",
labelRegion: "us-east-1",
labelEndpointType: "configuration",
"key": "value",
},
}, types.DatabaseSpecV3{
Protocol: defaults.ProtocolRedis,
URI: "configuration.localhost:6379",
AWS: types.AWS{
AccountID: "1234567890",
Region: "us-east-1",
ElastiCache: types.ElastiCache{
ReplicationGroupID: "my-cluster",
UserGroupIDs: []string{"my-user-group"},
TransitEncryptionEnabled: true,
EndpointType: awsutils.ElastiCacheConfigurationEndpoint,
},
},
})
require.NoError(t, err)
actual, err := NewDatabaseFromElastiCacheConfigurationEndpoint(cluster, extraLabels)
require.NoError(t, err)
require.Equal(t, expected, actual)
}
func TestDatabaseFromElastiCacheNodeGroups(t *testing.T) {
cluster := &elasticache.ReplicationGroup{
ARN: aws.String("arn:aws:elasticache:us-east-1:1234567890:replicationgroup:my-cluster"),
ReplicationGroupId: aws.String("my-cluster"),
Status: aws.String("available"),
TransitEncryptionEnabled: aws.Bool(true),
ClusterEnabled: aws.Bool(false),
UserGroupIds: []*string{aws.String("my-user-group")},
NodeGroups: []*elasticache.NodeGroup{
{
NodeGroupId: aws.String("0001"),
PrimaryEndpoint: &elasticache.Endpoint{
Address: aws.String("primary.localhost"),
Port: aws.Int64(6379),
},
ReaderEndpoint: &elasticache.Endpoint{
Address: aws.String("reader.localhost"),
Port: aws.Int64(6379),
},
},
},
}
extraLabels := map[string]string{"key": "value"}
expectedPrimary, err := types.NewDatabaseV3(types.Metadata{
Name: "my-cluster",
Description: "ElastiCache cluster in us-east-1 (primary endpoint)",
Labels: map[string]string{
types.OriginLabel: types.OriginCloud,
labelAccountID: "1234567890",
labelRegion: "us-east-1",
labelEndpointType: "primary",
"key": "value",
},
}, types.DatabaseSpecV3{
Protocol: defaults.ProtocolRedis,
URI: "primary.localhost:6379",
AWS: types.AWS{
AccountID: "1234567890",
Region: "us-east-1",
ElastiCache: types.ElastiCache{
ReplicationGroupID: "my-cluster",
UserGroupIDs: []string{"my-user-group"},
TransitEncryptionEnabled: true,
EndpointType: awsutils.ElastiCachePrimaryEndpoint,
},
},
})
require.NoError(t, err)
expectedReader, err := types.NewDatabaseV3(types.Metadata{
Name: "my-cluster-reader",
Description: "ElastiCache cluster in us-east-1 (reader endpoint)",
Labels: map[string]string{
types.OriginLabel: types.OriginCloud,
labelAccountID: "1234567890",
labelRegion: "us-east-1",
labelEndpointType: "reader",
"key": "value",
},
}, types.DatabaseSpecV3{
Protocol: defaults.ProtocolRedis,
URI: "reader.localhost:6379",
AWS: types.AWS{
AccountID: "1234567890",
Region: "us-east-1",
ElastiCache: types.ElastiCache{
ReplicationGroupID: "my-cluster",
UserGroupIDs: []string{"my-user-group"},
TransitEncryptionEnabled: true,
EndpointType: awsutils.ElastiCacheReaderEndpoint,
},
},
})
require.NoError(t, err)
actual, err := NewDatabasesFromElastiCacheNodeGroups(cluster, extraLabels)
require.NoError(t, err)
require.Equal(t, types.Databases{expectedPrimary, expectedReader}, actual)
}
func TestExtraElastiCacheLabels(t *testing.T) {
cluster := &elasticache.ReplicationGroup{
ReplicationGroupId: aws.String("my-redis"),
}
tags := []*elasticache.Tag{
{Key: aws.String("key1"), Value: aws.String("value1")},
{Key: aws.String("key2"), Value: aws.String("value2")},
}
nodes := []*elasticache.CacheCluster{
{
ReplicationGroupId: aws.String("some-other-redis"),
EngineVersion: aws.String("8.8.8"),
CacheSubnetGroupName: aws.String("some-other-subnet-group"),
},
{
ReplicationGroupId: aws.String("my-redis"),
EngineVersion: aws.String("6.6.6"),
CacheSubnetGroupName: aws.String("my-subnet-group"),
},
}
subnetGroups := []*elasticache.CacheSubnetGroup{
{
CacheSubnetGroupName: aws.String("some-other-subnet-group"),
VpcId: aws.String("some-other-vpc"),
},
{
CacheSubnetGroupName: aws.String("my-subnet-group"),
VpcId: aws.String("my-vpc"),
},
}
tests := []struct {
name string
inputTags []*elasticache.Tag
inputNodes []*elasticache.CacheCluster
inputSubnetGroups []*elasticache.CacheSubnetGroup
expectLabels map[string]string
}{
{
name: "all tags",
inputTags: tags,
inputNodes: nodes,
inputSubnetGroups: subnetGroups,
expectLabels: map[string]string{
"key1": "value1",
"key2": "value2",
"engine-version": "6.6.6",
"vpc-id": "my-vpc",
},
},
{
name: "no resource tags",
inputTags: nil,
inputNodes: nodes,
inputSubnetGroups: subnetGroups,
expectLabels: map[string]string{
"engine-version": "6.6.6",
"vpc-id": "my-vpc",
},
},
{
name: "no nodes",
inputTags: tags,
inputNodes: nil,
inputSubnetGroups: subnetGroups,
// Without subnet group name from nodes, VPC ID cannot be found.
expectLabels: map[string]string{
"key1": "value1",
"key2": "value2",
},
},
{
name: "no subnet groups",
inputTags: tags,
inputNodes: nodes,
inputSubnetGroups: nil,
expectLabels: map[string]string{
"key1": "value1",
"key2": "value2",
"engine-version": "6.6.6",
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actualLabels := ExtraElastiCacheLabels(cluster, test.inputTags, test.inputNodes, test.inputSubnetGroups)
require.Equal(t, test.expectLabels, actualLabels)
})
}
}
func TestGetLabelEngineVersion(t *testing.T) {
t.Parallel()

View file

@ -180,4 +180,6 @@ const (
AWSMatcherRDS = "rds"
// AWSMatcherRedshift is the AWS matcher type for Redshift databases.
AWSMatcherRedshift = "redshift"
// AWSMatcherElastiCache is the AWS matcher type for ElastiCache databases.
AWSMatcherElastiCache = "elasticache"
)

View file

@ -45,6 +45,7 @@ func (s *Server) initCACert(ctx context.Context, database types.Database) error
switch database.GetType() {
case types.DatabaseTypeRDS,
types.DatabaseTypeRedshift,
types.DatabaseTypeElastiCache,
types.DatabaseTypeCloudSQL,
types.DatabaseTypeAzure:
default:
@ -103,16 +104,30 @@ func (s *Server) getCACert(ctx context.Context, database types.Database) ([]byte
// getCACertPath returns the path where automatically downloaded root certificate
// for the provided database is stored in the filesystem.
func (s *Server) getCACertPath(database types.Database) (string, error) {
// All RDS and Redshift instances share the same root CA which can be
// downloaded from a well-known URL (sometimes region-specific). Each
// Cloud SQL instance has its own CA.
switch database.GetType() {
// All RDS instances share the same root CA (per AWS region) which can be
// downloaded from a well-known URL.
case types.DatabaseTypeRDS:
return filepath.Join(s.cfg.DataDir, filepath.Base(rdsCAURLForDatabase(database))), nil
// All Redshift instances share the same root CA which can be downloaded
// from a well-known URL.
case types.DatabaseTypeRedshift:
return filepath.Join(s.cfg.DataDir, filepath.Base(redshiftCAURLForDatabase(database))), nil
// ElastiCache databases are signed with Amazon root CA. In most cases,
// x509.SystemCertPool should be sufficient to verify ElastiCache servers.
// However, x509.SystemCertPool does not support windows for go versions
// older than 1.18. In addition, system cert path can be overridden by
// environment variables on many OSes. Therefore, Amazon root CA is
// downloaded here to be safe.
case types.DatabaseTypeElastiCache:
return filepath.Join(s.cfg.DataDir, filepath.Base(amazonRootCA1URL)), nil
// Each Cloud SQL instance has its own CA.
case types.DatabaseTypeCloudSQL:
return filepath.Join(s.cfg.DataDir, fmt.Sprintf("%v-root.pem", database.GetName())), nil
case types.DatabaseTypeAzure:
return filepath.Join(s.cfg.DataDir, filepath.Base(azureCAURL)), nil
}
@ -141,6 +156,8 @@ func (d *realDownloader) Download(ctx context.Context, database types.Database)
return d.downloadFromURL(rdsCAURLForDatabase(database))
case types.DatabaseTypeRedshift:
return d.downloadFromURL(redshiftCAURLForDatabase(database))
case types.DatabaseTypeElastiCache:
return d.downloadFromURL(amazonRootCA1URL)
case types.DatabaseTypeCloudSQL:
return d.downloadForCloudSQL(ctx, database)
case types.DatabaseTypeAzure:
@ -243,6 +260,10 @@ const (
//
// https://docs.amazonaws.cn/redshift/latest/mgmt/connecting-ssl-support.html
redshiftCNRegionCAURL = "https://s3.cn-north-1.amazonaws.com.cn/redshift-downloads-cn/amazon-trust-ca-bundle.crt"
// amazonRootCA1URL is the root CA for many Amazon websites and services.
//
// https://www.amazontrust.com/repository/
amazonRootCA1URL = "https://www.amazontrust.com/repository/AmazonRootCA1.pem"
// azureCAURL is the URL of the CA certificate for validating certificates
// presented by Azure hosted databases. See:

View file

@ -25,6 +25,8 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/aws/aws-sdk-go/service/elasticache"
"github.com/aws/aws-sdk-go/service/elasticache/elasticacheiface"
"github.com/aws/aws-sdk-go/service/rds"
"github.com/aws/aws-sdk-go/service/rds/rdsiface"
"github.com/aws/aws-sdk-go/service/redshift"
@ -68,31 +70,31 @@ func NewMetadata(config MetadataConfig) (*Metadata, error) {
// Update updates cloud metadata of the provided database.
func (m *Metadata) Update(ctx context.Context, database types.Database) error {
if database.IsRDS() {
metadata, err := m.fetchRDSMetadata(ctx, database)
if err != nil {
if trace.IsAccessDenied(err) { // Permission errors are expected.
m.log.Debugf("No permissions to fetch RDS metadata for %q: %v.", database.GetName(), err)
return nil
}
return trace.Wrap(err)
}
m.log.Debugf("Fetched RDS metadata for %q: %v.", database.GetName(), metadata)
database.SetStatusAWS(*metadata)
return m.updateAWS(ctx, database, m.fetchRDSMetadata)
} else if database.IsRedshift() {
metadata, err := m.fetchRedshiftMetadata(ctx, database)
if err != nil {
if trace.IsAccessDenied(err) { // Permission errros are expected.
m.log.Debugf("No permissions to fetch Redshift metadata for %q: %v.", database.GetName(), err)
return nil
}
return trace.Wrap(err)
}
m.log.Debugf("Fetched Redshift metadata for %q: %v.", database.GetName(), metadata)
database.SetStatusAWS(*metadata)
return m.updateAWS(ctx, database, m.fetchRedshiftMetadata)
} else if database.IsElastiCache() {
return m.updateAWS(ctx, database, m.fetchElastiCacheMetadata)
}
return nil
}
// updateAWS updates cloud metadata of the provided AWS database.
func (m *Metadata) updateAWS(ctx context.Context, database types.Database, fetchFn func(context.Context, types.Database) (*types.AWS, error)) error {
metadata, err := fetchFn(ctx, database)
if err != nil {
if trace.IsAccessDenied(err) { // Permission errors are expected.
m.log.WithError(err).Debugf("No permissions to fetch metadata for %q.", database)
return nil
}
return trace.Wrap(err)
}
m.log.Debugf("Fetched metadata for %q: %v.", database, metadata)
database.SetStatusAWS(*metadata)
return nil
}
// fetchRDSMetadata fetches metadata for the provided RDS or Aurora database.
func (m *Metadata) fetchRDSMetadata(ctx context.Context, database types.Database) (*types.AWS, error) {
rds, err := m.cfg.Clients.GetAWSRDSClient(database.GetAWS().Region)
@ -144,6 +146,22 @@ func (m *Metadata) fetchRedshiftMetadata(ctx context.Context, database types.Dat
}, nil
}
// fetchElastiCacheMetadata fetches metadata for the provided ElastiCache database.
func (m *Metadata) fetchElastiCacheMetadata(ctx context.Context, database types.Database) (*types.AWS, error) {
elastiCacheClient, err := m.cfg.Clients.GetAWSElastiCacheClient(database.GetAWS().Region)
if err != nil {
return nil, trace.Wrap(err)
}
cluster, err := describeElastiCacheCluster(ctx, elastiCacheClient, database.GetAWS().ElastiCache.ReplicationGroupID)
if err != nil {
return nil, trace.Wrap(err)
}
// Endpoint type does not change.
endpointType := database.GetAWS().ElastiCache.EndpointType
return services.MetadataFromElastiCacheCluster(cluster, endpointType)
}
// fetchRDSInstanceMetadata fetches metadata about specified RDS instance.
func fetchRDSInstanceMetadata(ctx context.Context, rdsClient rdsiface.RDSAPI, instanceID string) (*types.AWS, error) {
rdsInstance, err := describeRDSInstance(ctx, rdsClient, instanceID)
@ -162,7 +180,7 @@ func describeRDSInstance(ctx context.Context, rdsClient rdsiface.RDSAPI, instanc
return nil, common.ConvertError(err)
}
if len(out.DBInstances) != 1 {
return nil, trace.BadParameter("expected 1 RDS instance for %v, got %s", instanceID, out.DBInstances)
return nil, trace.BadParameter("expected 1 RDS instance for %v, got %+v", instanceID, out.DBInstances)
}
return out.DBInstances[0], nil
}
@ -185,7 +203,7 @@ func describeRDSCluster(ctx context.Context, rdsClient rdsiface.RDSAPI, clusterI
return nil, common.ConvertError(err)
}
if len(out.DBClusters) != 1 {
return nil, trace.BadParameter("expected 1 RDS cluster for %v, got %s", clusterID, out.DBClusters)
return nil, trace.BadParameter("expected 1 RDS cluster for %v, got %+v", clusterID, out.DBClusters)
}
return out.DBClusters[0], nil
}
@ -199,7 +217,22 @@ func describeRedshiftCluster(ctx context.Context, redshiftClient redshiftiface.R
return nil, common.ConvertError(err)
}
if len(out.Clusters) != 1 {
return nil, trace.BadParameter("expected 1 Redshift cluster for %v, got %s", clusterID, out.Clusters)
return nil, trace.BadParameter("expected 1 Redshift cluster for %v, got %+v", clusterID, out.Clusters)
}
return out.Clusters[0], nil
}
// describeElastiCacheCluster returns AWS ElastiCache Redis cluster for the
// specified ID.
func describeElastiCacheCluster(ctx context.Context, elastiCacheClient elasticacheiface.ElastiCacheAPI, replicationGroupID string) (*elasticache.ReplicationGroup, error) {
out, err := elastiCacheClient.DescribeReplicationGroupsWithContext(ctx, &elasticache.DescribeReplicationGroupsInput{
ReplicationGroupId: aws.String(replicationGroupID),
})
if err != nil {
return nil, common.ConvertError(err)
}
if len(out.ReplicationGroups) != 1 {
return nil, trace.BadParameter("expected 1 ElastiCache cluster for %v, got %+v", replicationGroupID, out.ReplicationGroups)
}
return out.ReplicationGroups[0], nil
}

View file

@ -25,6 +25,7 @@ import (
"github.com/gravitational/teleport/lib/srv/db/common"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/elasticache"
"github.com/aws/aws-sdk-go/service/rds"
"github.com/aws/aws-sdk-go/service/redshift"
"github.com/stretchr/testify/require"
@ -73,11 +74,25 @@ func TestAWSMetadata(t *testing.T) {
},
}
// Configure ElastiCache API mock.
elasticache := &ElastiCacheMock{
ReplicationGroups: []*elasticache.ReplicationGroup{
{
ARN: aws.String("arn:aws:elasticache:us-west-1:123456789:replicationgroup:my-redis"),
ReplicationGroupId: aws.String("my-redis"),
ClusterEnabled: aws.Bool(true),
TransitEncryptionEnabled: aws.Bool(true),
UserGroupIds: []*string{aws.String("my-user-group")},
},
},
}
// Create metadata fetcher.
metadata, err := NewMetadata(MetadataConfig{
Clients: &common.TestCloudClients{
RDS: rds,
Redshift: redshift,
RDS: rds,
Redshift: redshift,
ElastiCache: elasticache,
},
})
require.NoError(t, err)
@ -166,6 +181,25 @@ func TestAWSMetadata(t *testing.T) {
},
},
},
{
name: "ElastiCache",
inAWS: types.AWS{
ElastiCache: types.ElastiCache{
ReplicationGroupID: "my-redis",
EndpointType: "configuration",
},
},
outAWS: types.AWS{
AccountID: "123456789",
Region: "us-west-1",
ElastiCache: types.ElastiCache{
ReplicationGroupID: "my-redis",
UserGroupIDs: []string{"my-user-group"},
TransitEncryptionEnabled: true,
EndpointType: "configuration",
},
},
},
}
ctx := context.Background()

View file

@ -23,6 +23,8 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/elasticache"
"github.com/aws/aws-sdk-go/service/elasticache/elasticacheiface"
"github.com/aws/aws-sdk-go/service/iam"
"github.com/aws/aws-sdk-go/service/iam/iamiface"
"github.com/aws/aws-sdk-go/service/rds"
@ -353,3 +355,49 @@ func (g *GCPSQLAdminClientMock) GetDatabaseInstance(ctx context.Context, session
func (g *GCPSQLAdminClientMock) GenerateEphemeralCert(ctx context.Context, sessionCtx *common.Session) (*tls.Certificate, error) {
return g.EphemeralCert, nil
}
// ElastiCache mocks AWS ElastiCache API.
type ElastiCacheMock struct {
elasticacheiface.ElastiCacheAPI
ReplicationGroups []*elasticache.ReplicationGroup
TagsByARN map[string][]*elasticache.Tag
}
func (m *ElastiCacheMock) DescribeReplicationGroupsWithContext(_ aws.Context, input *elasticache.DescribeReplicationGroupsInput, opts ...request.Option) (*elasticache.DescribeReplicationGroupsOutput, error) {
for _, replicationGroup := range m.ReplicationGroups {
if aws.StringValue(replicationGroup.ReplicationGroupId) == aws.StringValue(input.ReplicationGroupId) {
return &elasticache.DescribeReplicationGroupsOutput{
ReplicationGroups: []*elasticache.ReplicationGroup{replicationGroup},
}, nil
}
}
return nil, trace.NotFound("ElastiCache %v not found", aws.StringValue(input.ReplicationGroupId))
}
func (m *ElastiCacheMock) DescribeReplicationGroupsPagesWithContext(_ aws.Context, _ *elasticache.DescribeReplicationGroupsInput, fn func(*elasticache.DescribeReplicationGroupsOutput, bool) bool, _ ...request.Option) error {
fn(&elasticache.DescribeReplicationGroupsOutput{
ReplicationGroups: m.ReplicationGroups,
}, true)
return nil
}
func (m *ElastiCacheMock) DescribeCacheClustersPagesWithContext(aws.Context, *elasticache.DescribeCacheClustersInput, func(*elasticache.DescribeCacheClustersOutput, bool) bool, ...request.Option) error {
return trace.AccessDenied("unauthorized")
}
func (m *ElastiCacheMock) DescribeCacheSubnetGroupsPagesWithContext(aws.Context, *elasticache.DescribeCacheSubnetGroupsInput, func(*elasticache.DescribeCacheSubnetGroupsOutput, bool) bool, ...request.Option) error {
return trace.AccessDenied("unauthorized")
}
func (m *ElastiCacheMock) ListTagsForResourceWithContext(_ aws.Context, input *elasticache.ListTagsForResourceInput, _ ...request.Option) (*elasticache.TagListMessage, error) {
if m.TagsByARN == nil {
return nil, trace.NotFound("no tags")
}
tags, ok := m.TagsByARN[aws.StringValue(input.ResourceName)]
if !ok {
return nil, trace.NotFound("no tags")
}
return &elasticache.TagListMessage{
TagList: tags,
}, nil
}

View file

@ -0,0 +1,252 @@
/*
Copyright 2022 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watchers
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/elasticache"
"github.com/aws/aws-sdk-go/service/elasticache/elasticacheiface"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/srv/db/common"
"github.com/gravitational/trace"
"github.com/sirupsen/logrus"
)
// elastiCacheFetcherConfig is the ElastiCache databases fetcher configuration.
type elastiCacheFetcherConfig struct {
// Labels is a selector to match cloud databases.
Labels types.Labels
// ElastiCache is the ElastiCache API client.
ElastiCache elasticacheiface.ElastiCacheAPI
// Region is the AWS region to query databases in.
Region string
}
// CheckAndSetDefaults validates the config and sets defaults.
func (c *elastiCacheFetcherConfig) CheckAndSetDefaults() error {
if len(c.Labels) == 0 {
return trace.BadParameter("missing parameter Labels")
}
if c.ElastiCache == nil {
return trace.BadParameter("missing parameter ElastiCache")
}
if c.Region == "" {
return trace.BadParameter("missing parameter Region")
}
return nil
}
// elastiCacheFetcher retrieves ElastiCache Redis databases.
type elastiCacheFetcher struct {
cfg elastiCacheFetcherConfig
log logrus.FieldLogger
}
// newElastiCacheFetcher returns a new ElastiCache databases fetcher instance.
func newElastiCacheFetcher(config elastiCacheFetcherConfig) (Fetcher, error) {
if err := config.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
return &elastiCacheFetcher{
cfg: config,
log: logrus.WithFields(logrus.Fields{
trace.Component: "watch:elasticache",
"labels": config.Labels,
"region": config.Region,
}),
}, nil
}
// Get returns ElastiCache Redis databases matching the watcher's selectors.
//
// TODO(greedy52) support ElastiCache global datastore.
func (f *elastiCacheFetcher) Get(ctx context.Context) (types.Databases, error) {
clusters, err := getElastiCacheClusters(ctx, f.cfg.ElastiCache)
if err != nil {
return nil, trace.Wrap(err)
}
var eligibleClusters []*elasticache.ReplicationGroup
for _, cluster := range clusters {
if !services.IsElastiCacheClusterSupported(cluster) {
f.log.Debugf("ElastiCache cluster %q is not supported. Skipping.", aws.StringValue(cluster.ReplicationGroupId))
continue
}
if !services.IsElastiCacheClusterAvailable(cluster) {
f.log.Debugf("The current status of ElastiCache cluster %q is %q. Skipping.",
aws.StringValue(cluster.ReplicationGroupId),
aws.StringValue(cluster.Status))
continue
}
eligibleClusters = append(eligibleClusters, cluster)
}
if len(eligibleClusters) == 0 {
return types.Databases{}, nil
}
// Fetch more information to provide extra labels. Do not fail because some
// of these labels are missing.
allNodes, err := getElastiCacheNodes(ctx, f.cfg.ElastiCache)
if err != nil {
if trace.IsAccessDenied(err) {
f.log.WithError(err).Debug("No permissions to describe nodes")
} else {
f.log.WithError(err).Info("Failed to describe nodes.")
}
}
allSubnetGroups, err := getElastiCacheSubnetGroups(ctx, f.cfg.ElastiCache)
if err != nil {
if trace.IsAccessDenied(err) {
f.log.WithError(err).Debug("No permissions to describe subnet groups")
} else {
f.log.WithError(err).Info("Failed to describe subnet groups.")
}
}
var databases types.Databases
for _, cluster := range eligibleClusters {
// Resource tags are not found in elasticache.ReplicationGroup but can
// be on obtained by elasticache.ListTagsForResource (one call per
// resource).
tags, err := getElastiCacheResourceTags(ctx, f.cfg.ElastiCache, cluster.ARN)
if err != nil {
if trace.IsAccessDenied(err) {
f.log.WithError(err).Debug("No permissions to list resource tags")
} else {
f.log.WithError(err).Infof("Failed to list resource tags for ElastiCache cluster %q.", aws.StringValue(cluster.ReplicationGroupId))
}
}
extraLabels := services.ExtraElastiCacheLabels(cluster, tags, allNodes, allSubnetGroups)
// Create database using configuration endpoint for Redis with cluster
// mode enabled.
if aws.BoolValue(cluster.ClusterEnabled) {
if database, err := services.NewDatabaseFromElastiCacheConfigurationEndpoint(cluster, extraLabels); err != nil {
f.log.Infof("Could not convert ElastiCache cluster %q configuration endpoint to database resource: %v.",
aws.StringValue(cluster.ReplicationGroupId), err)
} else {
databases = append(databases, database)
}
continue
}
// Create databases using primary and reader endpoints for Redis with
// cluster mode disabled. When cluster mode is disabled, it is expected
// there is only one node group (aka shard) with one primary endpoint
// and one reader endpoint.
if databasesFromNodeGroups, err := services.NewDatabasesFromElastiCacheNodeGroups(cluster, extraLabels); err != nil {
f.log.Infof("Could not convert ElastiCache cluster %q node groups to database resources: %v.",
aws.StringValue(cluster.ReplicationGroupId), err)
} else {
databases = append(databases, databasesFromNodeGroups...)
}
}
return filterDatabasesByLabels(databases, f.cfg.Labels, f.log), nil
}
// String returns the fetcher's string description.
func (f *elastiCacheFetcher) String() string {
return fmt.Sprintf("elastiCacheFetcher(Region=%v, Labels=%v)",
f.cfg.Region, f.cfg.Labels)
}
// getElastiCacheClusters fetches all ElastiCache replication groups.
func getElastiCacheClusters(ctx context.Context, client elasticacheiface.ElastiCacheAPI) ([]*elasticache.ReplicationGroup, error) {
var clusters []*elasticache.ReplicationGroup
var pageNum int
err := client.DescribeReplicationGroupsPagesWithContext(
ctx,
&elasticache.DescribeReplicationGroupsInput{},
func(page *elasticache.DescribeReplicationGroupsOutput, lastPage bool) bool {
pageNum++
clusters = append(clusters, page.ReplicationGroups...)
return pageNum <= maxPages
},
)
return clusters, common.ConvertError(err)
}
// getElastiCacheNodes fetches all ElastiCache nodes that associated with a
// replication group.
func getElastiCacheNodes(ctx context.Context, client elasticacheiface.ElastiCacheAPI) ([]*elasticache.CacheCluster, error) {
var nodes []*elasticache.CacheCluster
var pageNum int
err := client.DescribeCacheClustersPagesWithContext(
ctx,
&elasticache.DescribeCacheClustersInput{},
func(page *elasticache.DescribeCacheClustersOutput, lastPage bool) bool {
pageNum++
// There are three types of elasticache.CacheCluster:
// 1) a Memcache cluster.
// 2) a Redis node belongs to a single node deployment (legacy, no TLS support).
// 3) a Redis node belongs to a Redis replication group.
// Only the ones belong to replication groups are wanted.
for _, cacheCluster := range page.CacheClusters {
if cacheCluster.ReplicationGroupId != nil {
nodes = append(nodes, cacheCluster)
}
}
return pageNum <= maxPages
},
)
return nodes, common.ConvertError(err)
}
// getElastiCacheSubnetGroups fetches all ElastiCache subnet groups.
func getElastiCacheSubnetGroups(ctx context.Context, client elasticacheiface.ElastiCacheAPI) ([]*elasticache.CacheSubnetGroup, error) {
var subnetGroups []*elasticache.CacheSubnetGroup
var pageNum int
err := client.DescribeCacheSubnetGroupsPagesWithContext(
ctx,
&elasticache.DescribeCacheSubnetGroupsInput{},
func(page *elasticache.DescribeCacheSubnetGroupsOutput, lastPage bool) bool {
pageNum++
subnetGroups = append(subnetGroups, page.CacheSubnetGroups...)
return pageNum <= maxPages
},
)
return subnetGroups, common.ConvertError(err)
}
// getElastiCacheResourceTags fetches resource tags for provided ElastiCache
// replication group.
func getElastiCacheResourceTags(ctx context.Context, client elasticacheiface.ElastiCacheAPI, resourceName *string) ([]*elasticache.Tag, error) {
input := &elasticache.ListTagsForResourceInput{
ResourceName: resourceName,
}
output, err := client.ListTagsForResourceWithContext(ctx, input)
if err != nil {
return nil, common.ConvertError(err)
}
return output.TagList, nil
}

View file

@ -84,18 +84,7 @@ func (f *rdsDBInstancesFetcher) Get(ctx context.Context) (types.Databases, error
return nil, trace.Wrap(err)
}
var result types.Databases
for _, database := range rdsDatabases {
match, _, err := services.MatchLabels(f.cfg.Labels, database.GetAllLabels())
if err != nil {
f.log.Warnf("Failed to match %v against selector: %v.", database, err)
} else if match {
result = append(result, database)
} else {
f.log.Debugf("%v doesn't match selector.", database)
}
}
return result, nil
return filterDatabasesByLabels(rdsDatabases, f.cfg.Labels, f.log), nil
}
// getRDSDatabases returns a list of database resources representing RDS instances.
@ -180,18 +169,7 @@ func (f *rdsAuroraClustersFetcher) Get(ctx context.Context) (types.Databases, er
return nil, trace.Wrap(err)
}
var result types.Databases
for _, database := range auroraDatabases {
match, _, err := services.MatchLabels(f.cfg.Labels, database.GetAllLabels())
if err != nil {
f.log.Warnf("Failed to match %v against selector: %v.", database, err)
} else if match {
result = append(result, database)
} else {
f.log.Debugf("%v doesn't match selector.", database)
}
}
return result, nil
return filterDatabasesByLabels(auroraDatabases, f.cfg.Labels, f.log), nil
}
// getAuroraDatabases returns a list of database resources representing RDS clusters.
@ -302,3 +280,19 @@ func auroraFilters() []*rds.Filter {
// maxPages is the maximum number of pages to iterate over when fetching databases.
const maxPages = 10
// filterDatabasesByLabels filters input databases with provided labels.
func filterDatabasesByLabels(databases types.Databases, labels types.Labels, log logrus.FieldLogger) types.Databases {
var matchedDatabases types.Databases
for _, database := range databases {
match, _, err := services.MatchLabels(labels, database.GetAllLabels())
if err != nil {
log.Warnf("Failed to match %v against selector: %v.", database, err)
} else if match {
matchedDatabases = append(matchedDatabases, database)
} else {
log.Debugf("%v doesn't match selector.", database)
}
}
return matchedDatabases
}

View file

@ -100,16 +100,9 @@ func (f *redshiftFetcher) Get(ctx context.Context) (types.Databases, error) {
continue
}
match, _, err := services.MatchLabels(f.cfg.Labels, database.GetAllLabels())
if err != nil {
f.log.Warnf("Failed to match %v against selector: %v.", database, err)
} else if match {
databases = append(databases, database)
} else {
f.log.Debugf("%v doesn't match selector.", database)
}
databases = append(databases, database)
}
return databases, nil
return filterDatabasesByLabels(databases, f.cfg.Labels, f.log), nil
}
// String returns the fetcher's string description.

View file

@ -161,6 +161,14 @@ func makeFetchers(clients common.CloudClients, matchers []services.AWSMatcher) (
}
result = append(result, fetcher)
}
if utils.SliceContainsStr(matcher.Types, services.AWSMatcherElastiCache) {
fetcher, err := makeElastiCacheFetcher(clients, region, matcher.Tags)
if err != nil {
return nil, trace.Wrap(err)
}
result = append(result, fetcher)
}
}
}
return result, nil
@ -204,3 +212,16 @@ func makeRedshiftFetcher(clients common.CloudClients, region string, tags types.
Redshift: redshift,
})
}
// makeElastiCacheFetcher returns ElastiCache fetcher for the provided region and tags.
func makeElastiCacheFetcher(clients common.CloudClients, region string, tags types.Labels) (Fetcher, error) {
elastiCache, err := clients.GetAWSElastiCacheClient(region)
if err != nil {
return nil, trace.Wrap(err)
}
return newElastiCacheFetcher(elastiCacheFetcherConfig{
Region: region,
Labels: tags,
ElastiCache: elastiCache,
})
}

View file

@ -28,6 +28,7 @@ import (
"github.com/gravitational/teleport/lib/srv/db/common"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/elasticache"
"github.com/aws/aws-sdk-go/service/rds"
"github.com/aws/aws-sdk-go/service/rds/rdsiface"
"github.com/aws/aws-sdk-go/service/redshift"
@ -59,6 +60,23 @@ func TestWatcher(t *testing.T) {
redshiftUse1Unavailable, _ := makeRedshiftCluster(t, "us-east-1", "qa", withRedshiftStatus("paused"))
redshiftUse1UnknownStatus, redshiftDatabaseUnknownStatus := makeRedshiftCluster(t, "us-east-1", "test", withRedshiftStatus("status-does-not-exist"))
elasticacheProd, elasticacheDatabaseProd, elasticacheProdTags := makeElastiCacheCluster(t, "ec1", "us-east-1", "prod")
elasticacheQA, elasticacheDatabaseQA, elasticacheQATags := makeElastiCacheCluster(t, "ec2", "us-east-1", "qa", withElastiCacheConfigurationEndpoint())
elasticacheTest, _, elasticacheTestTags := makeElastiCacheCluster(t, "ec3", "us-east-1", "test")
elasticacheUnavailable, _, elasticacheUnavailableTags := makeElastiCacheCluster(t, "ec4", "us-east-1", "prod", func(cluster *elasticache.ReplicationGroup) {
cluster.Status = aws.String("deleting")
})
elasticacheUnsupported, _, elasticacheUnsupportedTags := makeElastiCacheCluster(t, "ec5", "us-east-1", "prod", func(cluster *elasticache.ReplicationGroup) {
cluster.TransitEncryptionEnabled = aws.Bool(false)
})
elasticacheTagsByARN := map[string][]*elasticache.Tag{
aws.StringValue(elasticacheProd.ARN): elasticacheProdTags,
aws.StringValue(elasticacheQA.ARN): elasticacheQATags,
aws.StringValue(elasticacheTest.ARN): elasticacheTestTags,
aws.StringValue(elasticacheUnavailable.ARN): elasticacheUnavailableTags,
aws.StringValue(elasticacheUnsupported.ARN): elasticacheUnsupportedTags,
}
tests := []struct {
name string
awsMatchers []services.AWSMatcher
@ -178,11 +196,34 @@ func TestWatcher(t *testing.T) {
},
expectedDatabases: types.Databases{redshiftDatabaseUse1Prod, redshiftDatabaseUnknownStatus},
},
{
name: "ElastiCache",
awsMatchers: []services.AWSMatcher{
{
Types: []string{services.AWSMatcherElastiCache},
Regions: []string{"us-east-1"},
Tags: types.Labels{"env": []string{"prod", "qa"}},
},
},
clients: &common.TestCloudClients{
ElastiCache: &cloud.ElastiCacheMock{
ReplicationGroups: []*elasticache.ReplicationGroup{
elasticacheProd, // labels match
elasticacheQA, // labels match
elasticacheTest, // labels do not match
elasticacheUnavailable,
elasticacheUnsupported,
},
TagsByARN: elasticacheTagsByARN,
},
},
expectedDatabases: types.Databases{elasticacheDatabaseProd, elasticacheDatabaseQA},
},
{
name: "matcher with multiple types",
awsMatchers: []services.AWSMatcher{
{
Types: []string{services.AWSMatcherRedshift, services.AWSMatcherRDS},
Types: []string{services.AWSMatcherRedshift, services.AWSMatcherRDS, services.AWSMatcherElastiCache},
Regions: []string{"us-east-1"},
Tags: types.Labels{"env": []string{"prod"}},
},
@ -194,8 +235,12 @@ func TestWatcher(t *testing.T) {
Redshift: &cloud.RedshiftMock{
Clusters: []*redshift.Cluster{redshiftUse1Prod},
},
ElastiCache: &cloud.ElastiCacheMock{
ReplicationGroups: []*elasticache.ReplicationGroup{elasticacheProd},
TagsByARN: elasticacheTagsByARN,
},
},
expectedDatabases: types.Databases{auroraDatabase1, redshiftDatabaseUse1Prod},
expectedDatabases: types.Databases{auroraDatabase1, redshiftDatabaseUse1Prod, elasticacheDatabaseProd},
},
}
@ -312,6 +357,44 @@ func makeRDSClusterWithExtraEndpoints(t *testing.T, name, region string, labels
return cluster, append(types.Databases{primaryDatabase, readerDatabase}, customDatabases...)
}
func makeElastiCacheCluster(t *testing.T, name, region, env string, opts ...func(*elasticache.ReplicationGroup)) (*elasticache.ReplicationGroup, types.Database, []*elasticache.Tag) {
cluster := &elasticache.ReplicationGroup{
ARN: aws.String(fmt.Sprintf("arn:aws:elasticache:%s:123456789:replicationgroup:%s", region, name)),
ReplicationGroupId: aws.String(name),
Status: aws.String("available"),
TransitEncryptionEnabled: aws.Bool(true),
// Default has one primary endpoint in the only node group.
NodeGroups: []*elasticache.NodeGroup{{
PrimaryEndpoint: &elasticache.Endpoint{
Address: aws.String("primary.localhost"),
Port: aws.Int64(6379),
},
}},
}
for _, opt := range opts {
opt(cluster)
}
tags := []*elasticache.Tag{{
Key: aws.String("env"),
Value: aws.String(env),
}}
extraLabels := services.ExtraElastiCacheLabels(cluster, tags, nil, nil)
if aws.BoolValue(cluster.ClusterEnabled) {
database, err := services.NewDatabaseFromElastiCacheConfigurationEndpoint(cluster, extraLabels)
require.NoError(t, err)
return cluster, database, tags
}
databases, err := services.NewDatabasesFromElastiCacheNodeGroups(cluster, extraLabels)
require.NoError(t, err)
require.Len(t, databases, 1)
return cluster, databases[0], tags
}
// withRDSInstanceStatus returns an option function for makeRDSInstance to overwrite status.
func withRDSInstanceStatus(status string) func(*rds.DBInstance) {
return func(instance *rds.DBInstance) {
@ -340,6 +423,18 @@ func withRedshiftStatus(status string) func(*redshift.Cluster) {
}
}
// withElastiCacheConfigurationEndpoint returns an option function for
// makeElastiCacheCluster to set a configuration endpoint.
func withElastiCacheConfigurationEndpoint() func(*elasticache.ReplicationGroup) {
return func(cluster *elasticache.ReplicationGroup) {
cluster.ClusterEnabled = aws.Bool(true)
cluster.ConfigurationEndpoint = &elasticache.Endpoint{
Address: aws.String("configuration.localhost"),
Port: aws.Int64(6379),
}
}
}
func labelsToTags(labels map[string]string) (tags []*rds.Tag) {
for key, val := range labels {
tags = append(tags, &rds.Tag{

View file

@ -367,8 +367,8 @@ func (a *dbAuth) getTLSConfigVerifyFull(ctx context.Context, sessionCtx *Session
tlsConfig.ServerName = dbTLSConfig.ServerName
}
// RDS/Aurora/Redshift and Cloud SQL auth is done with an auth token so
// don't generate a client certificate and exit here.
// RDS/Aurora/Redshift/ElastiCache and Cloud SQL auth is done with an auth
// token so don't generate a client certificate and exit here.
if sessionCtx.Database.IsCloudHosted() {
return tlsConfig, nil
}
@ -438,6 +438,7 @@ func appendCAToRoot(tlsConfig *tls.Config, sessionCtx *Session) (*tls.Config, er
return nil, trace.BadParameter("invalid server CA certificate")
}
}
return tlsConfig, nil
}

View file

@ -26,6 +26,8 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/aws/aws-sdk-go/aws"
awssession "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/elasticache"
"github.com/aws/aws-sdk-go/service/elasticache/elasticacheiface"
"github.com/aws/aws-sdk-go/service/iam"
"github.com/aws/aws-sdk-go/service/iam/iamiface"
"github.com/aws/aws-sdk-go/service/rds"
@ -50,6 +52,8 @@ type CloudClients interface {
GetAWSRDSClient(region string) (rdsiface.RDSAPI, error)
// GetAWSRedshiftClient returns AWS Redshift client for the specified region.
GetAWSRedshiftClient(region string) (redshiftiface.RedshiftAPI, error)
// GetAWSElastiCacheClient returns AWS ElastiCache client for the specified region.
GetAWSElastiCacheClient(region string) (elasticacheiface.ElastiCacheAPI, error)
// GetAWSIAMClient returns AWS IAM client for the specified region.
GetAWSIAMClient(region string) (iamiface.IAMAPI, error)
// GetAWSSTSClient returns AWS STS client for the specified region.
@ -113,6 +117,15 @@ func (c *cloudClients) GetAWSRedshiftClient(region string) (redshiftiface.Redshi
return redshift.New(session), nil
}
// GetAWSElastiCacheClient returns AWS ElastiCache client for the specified region.
func (c *cloudClients) GetAWSElastiCacheClient(region string) (elasticacheiface.ElastiCacheAPI, error) {
session, err := c.GetAWSSession(region)
if err != nil {
return nil, trace.Wrap(err)
}
return elasticache.New(session), nil
}
// GetAWSIAMClient returns AWS IAM client for the specified region.
func (c *cloudClients) GetAWSIAMClient(region string) (iamiface.IAMAPI, error) {
session, err := c.GetAWSSession(region)
@ -164,7 +177,7 @@ func (c *cloudClients) GetAzureCredential() (azcore.TokenCredential, error) {
return c.initAzureCredential()
}
// Closes closes all initialized clients.
// Close closes all initialized clients.
func (c *cloudClients) Close() (err error) {
c.mtx.Lock()
defer c.mtx.Unlock()
@ -245,6 +258,7 @@ type TestCloudClients struct {
RDS rdsiface.RDSAPI
RDSPerRegion map[string]rdsiface.RDSAPI
Redshift redshiftiface.RedshiftAPI
ElastiCache elasticacheiface.ElastiCacheAPI
IAM iamiface.IAMAPI
STS stsiface.STSAPI
GCPSQL GCPSQLAdminClient
@ -268,6 +282,11 @@ func (c *TestCloudClients) GetAWSRedshiftClient(region string) (redshiftiface.Re
return c.Redshift, nil
}
// GetAWSElastiCacheClient returns AWS ElastiCache client for the specified region.
func (c *TestCloudClients) GetAWSElastiCacheClient(region string) (elasticacheiface.ElastiCacheAPI, error) {
return c.ElastiCache, nil
}
// GetAWSIAMClient returns AWS IAM client for the specified region.
func (c *TestCloudClients) GetAWSIAMClient(region string) (iamiface.IAMAPI, error) {
return c.IAM, nil

View file

@ -91,7 +91,6 @@ type clusterClient struct {
// an error is returned.
func newClient(ctx context.Context, connectionOptions *ConnectionOptions, tlsConfig *tls.Config, username, password string) (redis.UniversalClient, error) {
connectionAddr := net.JoinHostPort(connectionOptions.address, connectionOptions.port)
// TODO(jakub): Use system CA bundle if connecting to AWS.
// TODO(jakub): Investigate Redis Sentinel.
switch connectionOptions.mode {
case Standalone:

View file

@ -69,6 +69,13 @@ type ConnectionOptions struct {
// Incorrect input:
// redis.example.com:6379?mode=cluster
func ParseRedisAddress(addr string) (*ConnectionOptions, error) {
// Default to the single mode.
return ParseRedisAddressWithDefaultMode(addr, Standalone)
}
// ParseRedisAddressWithDefaultMode parses a Redis connection string and uses
// the provided default mode if mode is not specified in the address.
func ParseRedisAddressWithDefaultMode(addr string, defaultMode ConnectionMode) (*ConnectionOptions, error) {
if addr == "" {
return nil, trace.BadParameter("Redis address is empty")
}
@ -124,8 +131,7 @@ func ParseRedisAddress(addr string) (*ConnectionOptions, error) {
values := redisURL.Query()
// Get additional connections options
// Default to the single mode.
mode := Standalone
mode := defaultMode
if values.Has("mode") {
connMode := strings.ToLower(values.Get("mode"))
switch ConnectionMode(connMode) {

View file

@ -23,6 +23,7 @@ import (
"net"
"github.com/go-redis/redis/v8"
apiawsutils "github.com/gravitational/teleport/api/utils/aws"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/srv/db/common"
@ -167,7 +168,7 @@ func (e *Engine) HandleConnection(ctx context.Context, sessionCtx *common.Sessio
e.Audit.OnSessionStart(e.Context, sessionCtx, nil)
defer e.Audit.OnSessionEnd(e.Context, sessionCtx)
if err := e.process(ctx); err != nil {
if err := e.process(ctx, sessionCtx); err != nil {
return trace.Wrap(err)
}
@ -181,7 +182,14 @@ func (e *Engine) getNewClientFn(ctx context.Context, sessionCtx *common.Session)
return nil, trace.Wrap(err)
}
connectionOptions, err := ParseRedisAddress(sessionCtx.Database.GetURI())
// Set default mode. Default mode can be overridden by URI parameters.
defaultMode := Standalone
if sessionCtx.Database.IsElastiCache() &&
sessionCtx.Database.GetAWS().ElastiCache.EndpointType == apiawsutils.ElastiCacheConfigurationEndpoint {
defaultMode = Cluster
}
connectionOptions, err := ParseRedisAddressWithDefaultMode(sessionCtx.Database.GetURI(), defaultMode)
if err != nil {
return nil, trace.BadParameter("Redis connection string is incorrect %q: %v", sessionCtx.Database.GetURI(), err)
}
@ -214,7 +222,7 @@ func (e *Engine) reconnect(username, password string) (redis.UniversalClient, er
// process is the main processing function for Redis. It reads commands from connected client and passes them to
// a Redis instance. This function returns when a server closes a connection or in case of connection error.
func (e *Engine) process(ctx context.Context) error {
func (e *Engine) process(ctx context.Context, sessionCtx *common.Session) error {
for {
// Read commands from connected client.
cmd, err := e.readClientCmd(ctx)
@ -228,8 +236,12 @@ func (e *Engine) process(ctx context.Context) error {
// Function below maps errors that should be returned to the
// client as value or return them as err if we should terminate
// the session.
value, err := processSeverResponse(cmd, err)
value, err := processServerResponse(cmd, err, sessionCtx)
if err != nil {
// Send server error to client before closing.
if sendError := e.sendToClient(err); sendError != nil {
return trace.NewAggregate(err, sendError)
}
return trace.Wrap(err)
}
@ -255,11 +267,11 @@ func (e *Engine) readClientCmd(ctx context.Context) (*redis.Cmd, error) {
return redis.NewCmd(ctx, val...), nil
}
// processSeverResponse takes server response and an error returned from go-redis and returns
// processServerResponse takes server response and an error returned from go-redis and returns
// "terminal" errors as second value (connection should be terminated when this happens)
// or returns error/value as the first value. Then value should be sent back to
// the client without terminating the connection.
func processSeverResponse(cmd *redis.Cmd, err error) (interface{}, error) {
func processServerResponse(cmd *redis.Cmd, err error, sessionCtx *common.Session) (interface{}, error) {
value, cmdErr := cmd.Result()
if err == nil {
// If the server didn't return any error use cmd.Err() as server error.
@ -274,6 +286,10 @@ func processSeverResponse(cmd *redis.Cmd, err error) (interface{}, error) {
// Teleport errors should be returned to the client.
return err, nil
case errors.Is(err, context.DeadlineExceeded):
if sessionCtx.Database.IsElastiCache() && !sessionCtx.Database.GetAWS().ElastiCache.TransitEncryptionEnabled {
return nil, trace.ConnectionProblem(err, "Connection timeout on ElastiCache database. Please verify if in-transit encryption is enabled on the server.")
}
// Do not return Deadline Exceeded to the client as it's not very self-explanatory.
// Return "connection timeout" as this is what most likely happened.
return nil, trace.ConnectionProblem(err, "connection timeout")

View file

@ -30,7 +30,7 @@ import (
)
// awsDatabaseTypes list of databases supported on the configurator.
var awsDatabaseTypes = []string{types.DatabaseTypeRDS, types.DatabaseTypeRedshift}
var awsDatabaseTypes = []string{types.DatabaseTypeRDS, types.DatabaseTypeRedshift, types.DatabaseTypeElastiCache}
type createDatabaseConfigFlags struct {
config.DatabaseSampleFlags
@ -83,7 +83,7 @@ func onConfigureDatabaseBootstrap(flags configureDatabaseBootstrapFlags) error {
fmt.Printf("Reading configuration at %q...\n\n", flags.config.ConfigPath)
if len(configurators) == 0 {
fmt.Println("The agent doesnt require any extra configuration.")
fmt.Println("The agent doesn't require any extra configuration.")
return nil
}
@ -178,6 +178,10 @@ func buildAWSConfigurator(manual bool, flags configureDatabaseAWSFlags) (dbconfi
switch dbType {
case types.DatabaseTypeRDS:
configuratorFlags.ForceRDSPermissions = true
case types.DatabaseTypeRedshift:
configuratorFlags.ForceRedshiftPermissions = true
case types.DatabaseTypeElastiCache:
configuratorFlags.ForceElastiCachePermissions = true
}
}
@ -202,7 +206,7 @@ func onConfigureDatabasesAWSPrint(flags configureDatabaseAWSPrintFlags) error {
// Check if configurator actions is empty.
if configurator.IsEmpty() {
fmt.Println("The agent doesnt require any extra configuration.")
fmt.Println("The agent doesn't require any extra configuration.")
return nil
}
@ -257,7 +261,7 @@ func onConfigureDatabasesAWSCreate(flags configureDatabaseAWSCreateFlags) error
// Check if configurator actions is empty.
if configurator.IsEmpty() {
fmt.Println("The agent doesnt require any extra configuration.")
fmt.Println("The agent doesn't require any extra configuration.")
return nil
}
@ -281,7 +285,7 @@ func printDBConfiguratorActions(actions []dbconfigurators.ConfiguratorAction) {
}
}
// executeDBConfiguratorActions iterate over all actions, executing and priting
// executeDBConfiguratorActions iterate over all actions, executing and printing
// their results.
func executeDBConfiguratorActions(ctx context.Context, configuratorName string, actions []dbconfigurators.ConfiguratorAction) error {
actionContext := &dbconfigurators.ConfiguratorActionContext{}

View file

@ -162,7 +162,7 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con
"Database CA certificate path.").Hidden().
StringVar(&ccf.DatabaseCACertFile)
start.Flag("db-aws-region",
"AWS region RDS, Aurora or Redshift database instance is running in.").Hidden().
"AWS region RDS, Aurora, Redshift or ElastiCache database instance is running in.").Hidden().
StringVar(&ccf.DatabaseAWSRegion)
// define start's usage info (we use kingpin's "alias" field for this)
@ -204,7 +204,7 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con
dbStartCmd.Flag("protocol", fmt.Sprintf("Proxied database protocol. Supported are: %v.", defaults.DatabaseProtocols)).StringVar(&ccf.DatabaseProtocol)
dbStartCmd.Flag("uri", "Address the proxied database is reachable at.").StringVar(&ccf.DatabaseURI)
dbStartCmd.Flag("ca-cert", "Database CA certificate path.").StringVar(&ccf.DatabaseCACertFile)
dbStartCmd.Flag("aws-region", "(Only for RDS, Aurora or Redshift) AWS region RDS, Aurora or Redshift database instance is running in.").StringVar(&ccf.DatabaseAWSRegion)
dbStartCmd.Flag("aws-region", "(Only for RDS, Aurora, Redshift or ElastiCache) AWS region RDS, Aurora, Redshift or ElastiCache database instance is running in.").StringVar(&ccf.DatabaseAWSRegion)
dbStartCmd.Flag("aws-redshift-cluster-id", "(Only for Redshift) Redshift database cluster identifier.").StringVar(&ccf.DatabaseAWSRedshiftClusterID)
dbStartCmd.Flag("aws-rds-instance-id", "(Only for RDS) RDS instance identifier.").StringVar(&ccf.DatabaseAWSRDSInstanceID)
dbStartCmd.Flag("aws-rds-cluster-id", "(Only for Aurora) Aurora cluster identifier.").StringVar(&ccf.DatabaseAWSRDSClusterID)
@ -226,6 +226,7 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con
dbConfigureCreate.Flag("token", "Invitation token to register with an auth server [none].").Default("/tmp/token").StringVar(&dbConfigCreateFlags.AuthToken)
dbConfigureCreate.Flag("rds-discovery", "List of AWS regions the agent will discover for RDS/Aurora instances.").StringsVar(&dbConfigCreateFlags.RDSDiscoveryRegions)
dbConfigureCreate.Flag("redshift-discovery", "List of AWS regions the agent will discover for Redshift instances.").StringsVar(&dbConfigCreateFlags.RedshiftDiscoveryRegions)
dbConfigureCreate.Flag("elasticache-discovery", "List of AWS regions the agent will discover for ElastiCache Redis clusters.").StringsVar(&dbConfigCreateFlags.ElastiCacheDiscoveryRegions)
dbConfigureCreate.Flag("ca-pin", "CA pin to validate the auth server (can be repeated for multiple pins).").StringsVar(&dbConfigCreateFlags.CAPins)
dbConfigureCreate.Flag("name", "Name of the proxied database.").StringVar(&dbConfigCreateFlags.StaticDatabaseName)
dbConfigureCreate.Flag("protocol", fmt.Sprintf("Proxied database protocol. Supported are: %v.", defaults.DatabaseProtocols)).StringVar(&dbConfigCreateFlags.StaticDatabaseProtocol)

View file

@ -1613,7 +1613,8 @@ func TestSerializeDatabases(t *testing.T) {
"redshift": {},
"rds": {
"iam_auth": false
}
},
"elasticache": {}
},
"mysql": {},
"gcp": {},
@ -1633,7 +1634,8 @@ func TestSerializeDatabases(t *testing.T) {
"redshift": {},
"rds": {
"iam_auth": false
}
},
"elasticache": {}
}
}
}]