instance heartbeats

This commit is contained in:
Forrest Marshall 2022-12-02 21:51:13 +00:00 committed by Forrest
parent ff9065b2b5
commit ecef3465b6
27 changed files with 5721 additions and 2196 deletions

View file

@ -25,6 +25,8 @@ import (
"github.com/gravitational/trace/trail"
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/internalutils/stream"
"github.com/gravitational/teleport/api/types"
)
// DownstreamInventoryControlStream is the client/agent side of a bidirectional stream established
@ -221,6 +223,29 @@ func (c *Client) PingInventory(ctx context.Context, req proto.InventoryPingReque
return *rsp, nil
}
func (c *Client) GetInstances(ctx context.Context, filter types.InstanceFilter) stream.Stream[types.Instance] {
// set up cancelable context so that Stream.Done can close the stream if the caller
// halts early.
ctx, cancel := context.WithCancel(ctx)
instances, err := c.grpc.GetInstances(ctx, &filter, c.callOpts...)
if err != nil {
cancel()
return stream.Fail[types.Instance](trail.FromGRPC(err))
}
return stream.Func[types.Instance](func() (types.Instance, error) {
instance, err := instances.Recv()
if err != nil {
if trace.IsEOF(err) {
// io.EOF signals that stream has completed successfully
return nil, io.EOF
}
return nil, trail.FromGRPC(err)
}
return instance, nil
}, cancel)
}
func newDownstreamInventoryControlStream(stream proto.AuthService_InventoryControlStreamClient, cancel context.CancelFunc) DownstreamInventoryControlStream {
ics := &downstreamICS{
sendC: make(chan upstreamSend),

File diff suppressed because it is too large Load diff

View file

@ -1899,10 +1899,11 @@ message UpstreamInventoryHello {
// its auth token. i.e. Services is the subset of SystemRoles that are currently
// active.
repeated string Services = 3 [(gogoproto.casttype) = "github.com/gravitational/teleport/api/types.SystemRole"];
// TODO(fspmarshall): look into what other info can safely be stated here once, instead of
// being repeatedly announced (e.g. addrs, static labels, etc). may be able to achieve a
// non-trivial reduction in network usage by doing this.
// Hostname is the hostname associated with the instance. This value is not required or guaranteed
// to be unique and its validity is not enforceable (i.e. join tokens do not constrian what an
// instance can claim its hostname to be). This value exists only to assist users in correlating
// instance resources with hosts.
string Hostname = 4;
}
// DownstreamInventoryHello is the hello message sent down the inventory control stream.
@ -1939,7 +1940,13 @@ message InventoryStatusSummary {
// InventoryPingRequest is used to request that the specified server be sent an inventory ping
// if it has a control stream registered.
message InventoryPingRequest {
// ServerID is the ID of the instance to ping.
string ServerID = 1;
// ControlLog forces the ping to use the standard "commit then act" model of control log synchronization
// for the ping. This significantly increases the amount of time it takes for the ping request to
// complete, but is useful for testing/debugging control log issues.
bool ControlLog = 2;
}
// InventoryPingResponse returns the result of an inventory ping initiated via an
@ -1991,6 +1998,9 @@ service AuthService {
// PingInventory attempts to trigger a downstream inventory ping (used in testing/debug).
rpc PingInventory(InventoryPingRequest) returns (InventoryPingResponse);
// GetInstances streams all instances matching the specified filter.
rpc GetInstances(types.InstanceFilter) returns (stream types.InstanceV1);
// GetClusterAlerts loads cluster-level alert messages.
rpc GetClusterAlerts(types.GetClusterAlertsRequest) returns (GetClusterAlertsResponse);

View file

@ -546,6 +546,111 @@ message MySQLOptions {
string ServerVersion = 1 [(gogoproto.jsontag) = "server_version,omitempty"];
}
// InstanceV1 represents the state of a running teleport instance independent
// of the specific services that instance exposes.
message InstanceV1 {
ResourceHeader Header = 1 [
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "",
(gogoproto.embed) = true
];
InstanceSpecV1 Spec = 2 [
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "spec"
];
}
message InstanceSpecV1 {
// Version is the version of teleport this instance most recently advertised.
string Version = 1 [(gogoproto.jsontag) = "version,omitempty"];
// Services is the list of active services this instance most recently advertised.
repeated string Services = 2 [
(gogoproto.casttype) = "SystemRole",
(gogoproto.jsontag) = "services,omitemtpy"
];
// Hostname is the hostname this instance most recently advertised.
string Hostname = 3 [(gogoproto.jsontag) = "hostname,omitempty"];
// AuthID is the ID of the auth server that most recently observed this instance.
string AuthID = 4 [(gogoproto.jsontag) = "auth_id,omitempty"];
// LastSeen is the last time an auth server reported observing this instance.
google.protobuf.Timestamp LastSeen = 5 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "last_seen,omitempty"
];
// ControlLog is the log of recent important instance control events related to this instance. See comments
// on the InstanceControlLogEntry type for details.
repeated InstanceControlLogEntry ControlLog = 6 [
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "control_log,omitempty"
];
}
// InstanceControlLogEntry represents an entry in a given instance's control log. The control log of
// an instance is protected by CompareAndSwap semantics, allowing entries to function as a means of
// synchronization as well as recordkeeping. For example, an auth server intending to trigger an upgrade
// for a given instance can check its control log for 'upgrade-attempt' entries. If no such entry exists,
// it can attempt to write an 'upgrade-attempt' entry of its own. If that entry successfully writes without
// hiting a CompareFailed, the auth server knows that no other auth servers will make concurrent upgrade
// attempts while that entry persists.
//
// NOTE: Due to resource size and backend throughput limitations, care should be taken to minimize the
// use and size of instance control log entries.
//
message InstanceControlLogEntry {
// Type represents the type of control log entry this is (e.g. 'upgrade-attempt').
string Type = 1 [(gogoproto.jsontag) = "type,omitempty"];
// ID is a random identifier used to assist in uniquely identifying entries. This value may
// be unique, or it may be used to associate a collection of related entries (e.g. an upgrade
// attempt entry may use the same ID as an associated upgrade failure entry if appropriate).
uint64 ID = 2 [(gogoproto.jsontag) = "id,omitempty"];
// Time is the time at which the event represented by this entry occurred (used in determining
// ordering and expiry).
google.protobuf.Timestamp Time = 3 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "time,omitempty"
];
// TTL is an optional custom time to live for this control log entry. Some control log entries
// (e.g. an upgrade failure) may require longer than normal TTLs in order to ensure visibility.
// If a log entry's TTL results in it having an intended expiry further in the future than the
// expiry of the enclosing Instance resource, the instance resource's expiry will be bumped
// to accommodate preservation of the log. Because of this fact, custom entry TTLs should be
// used sparingly, as excess usage could result in unexpected backend growth for high churn
// clusters.
int64 TTL = 4 [
(gogoproto.jsontag) = "ttl,omitempty",
(gogoproto.casttype) = "time.Duration"
];
// Labels is an arbitrary collection of key-value pairs. The expected labels are determined by the
// type of the entry. Use of labels is preferable to adding new fields in some cases in order to
// preserve fields across auth downgrades (this is mostly relevant for the version-control system).
map<string, string> Labels = 5 [(gogoproto.jsontag) = "labels,omitempty"];
}
// InstanceFilter matches instance resources.
message InstanceFilter {
// ServerID matches exactly one instance by server ID if specified.
string ServerID = 1;
// Version matches instance version if specified.
string Version = 2;
// Services matches the instance services if specified. Note that this field matches all instances which
// expose *at least* one of the listed services. This is in contrast to service matching in version
// directives which match instances that expose a *at most* the listed services.
repeated string Services = 3 [(gogoproto.casttype) = "SystemRole"];
}
// ServerV2 represents a Node, App, Database, Proxy or Auth server in a Teleport cluster.
message ServerV2 {
option (gogoproto.goproto_stringer) = false;

View file

@ -286,6 +286,9 @@ const (
// KindHostCert, this kind is not backed by a real resource.
KindUsageEvent = "usage_event"
// KindInstance represents a teleport instance independent of any specific service.
KindInstance = "instance"
// V5 is the fifth version of resources.
V5 = "v5"

256
api/types/instance.go Normal file
View file

@ -0,0 +1,256 @@
/*
Copyright 2022 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/gravitational/trace"
"golang.org/x/exp/slices"
"github.com/gravitational/teleport/api/defaults"
)
// Match checks if the given instance appears to match this filter.
func (f InstanceFilter) Match(i Instance) bool {
if f.ServerID != "" && f.ServerID != i.GetName() {
return false
}
if f.Version != "" && f.Version != i.GetTeleportVersion() {
// TODO(fspmarshall): move some of the lib/versioncontrol helpers to
// the api package and finalize version matching syntax so that we
// can do normalization and wildcard matching.
return false
}
// if Services was specified, ensure instance has at least one of the listed services.
if len(f.Services) != 0 && slices.IndexFunc(f.Services, i.HasService) == -1 {
return false
}
return true
}
// Instance describes the configuration/status of a unique teleport server identity. Each
// instance may be running one or more teleport services, and may have multiple processes
// associated with it.
type Instance interface {
Resource
// GetTeleportVersion gets the teleport version reported by the instance.
GetTeleportVersion() string
// GetServices gets the running services reported by the instance. This list is not
// guaranteed to consist only of valid teleport services. Invalid/unexpected services
// should be ignored.
GetServices() []SystemRole
// HasService checks if this instance advertises the specified service.
HasService(SystemRole) bool
// GetHostname gets the hostname reported by the instance.
GetHostname() string
// GetAuthID gets the server ID of the auth server that most recently reported
// having observed this instance.
GetAuthID() string
// GetLastSeen gets the most recent time that an auth server reported having
// seen this instance.
GetLastSeen() time.Time
// SetLastSeen sets the most recent time that an auth server reported having
// seen this instance. Generally, if this value is being updated, the caller
// should follow up by calling SyncLogAndResourceExpiry so that the control log
// and resource-level expiry values can be reevaluated.
SetLastSeen(time.Time)
// SyncLogAndResourceExpiry filters expired entries from the control log and updates
// the resource-level expiry. All calculations are performed relative to the value of
// the LastSeen field, and the supplied TTL is used only as a default. The actual TTL
// of an instance resource may be longer than the supplied TTL if one or more control
// log entries use a custom TTL.
SyncLogAndResourceExpiry(ttl time.Duration)
// GetControlLog gets the instance control log entries associated with this instance.
// The control log is a log of recent events related to an auth server's administration
// of an instance's state. Auth servers generally ensure that they have successfully
// written to the log *prior* to actually attempting the planned action. As a result,
// the log may contain things that never actually happened.
GetControlLog() []InstanceControlLogEntry
// AppendControlLog appends entries to the control log. The control log is sorted by time,
// so appends do not need to be performed in any particular order.
AppendControlLog(entries ...InstanceControlLogEntry)
// Clone performs a deep copy on this instance.
Clone() Instance
}
// NewInstance assembles a new instance resource.
func NewInstance(serverID string, spec InstanceSpecV1) (Instance, error) {
instance := &InstanceV1{
ResourceHeader: ResourceHeader{
Metadata: Metadata{
Name: serverID,
},
},
Spec: spec,
}
if err := instance.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
return instance, nil
}
func (i *InstanceV1) CheckAndSetDefaults() error {
i.setStaticFields()
if err := i.ResourceHeader.CheckAndSetDefaults(); err != nil {
return trace.Wrap(err)
}
if i.Version != V1 {
return trace.BadParameter("unsupported instance resource version: %s", i.Version)
}
if i.Kind != KindInstance {
return trace.BadParameter("unexpected resource kind: %q (expected %s)", i.Kind, KindInstance)
}
if i.Metadata.Namespace != "" && i.Metadata.Namespace != defaults.Namespace {
return trace.BadParameter("invalid namespace %q (namespaces are deprecated)", i.Metadata.Namespace)
}
return nil
}
func (i *InstanceV1) setStaticFields() {
if i.Version == "" {
i.Version = V1
}
if i.Kind == "" {
i.Kind = KindInstance
}
}
func (i *InstanceV1) SyncLogAndResourceExpiry(ttl time.Duration) {
// expire control log entries relative to LastSeen.
logExpiry := i.expireControlLog(i.Spec.LastSeen, ttl)
// calculate the default resource expiry.
resourceExpiry := i.Spec.LastSeen.Add(ttl)
// if one or more log entries want to outlive the default resource
// expiry, we bump the resource expiry to match.
if logExpiry.After(resourceExpiry) {
resourceExpiry = logExpiry
}
i.Metadata.SetExpiry(resourceExpiry.UTC())
}
func (i *InstanceV1) GetTeleportVersion() string {
return i.Spec.Version
}
func (i *InstanceV1) GetServices() []SystemRole {
return i.Spec.Services
}
func (i *InstanceV1) HasService(s SystemRole) bool {
return slices.Contains(i.Spec.Services, s)
}
func (i *InstanceV1) GetHostname() string {
return i.Spec.Hostname
}
func (i *InstanceV1) GetAuthID() string {
return i.Spec.AuthID
}
func (i *InstanceV1) GetLastSeen() time.Time {
return i.Spec.LastSeen
}
func (i *InstanceV1) SetLastSeen(t time.Time) {
i.Spec.LastSeen = t.UTC()
}
func (i *InstanceV1) GetControlLog() []InstanceControlLogEntry {
return i.Spec.ControlLog
}
func (i *InstanceV1) AppendControlLog(entries ...InstanceControlLogEntry) {
n := len(i.Spec.ControlLog)
i.Spec.ControlLog = append(i.Spec.ControlLog, entries...)
for idx, entry := range i.Spec.ControlLog[n:] {
// ensure that all provided timestamps are UTC (non-UTC timestamps can cause
// panics in proto logic).
i.Spec.ControlLog[idx].Time = entry.Time.UTC()
}
slices.SortFunc(i.Spec.ControlLog, func(a, b InstanceControlLogEntry) bool {
return a.Time.Before(b.Time)
})
}
// expireControlLog removes expired entries from the control log relative to the supplied
// "now" value. The supplied ttl is used as the default ttl for entries that do not specify
// a custom ttl value. The returned timestamp is the observed expiry that was furthest in
// the future.
func (i *InstanceV1) expireControlLog(now time.Time, ttl time.Duration) time.Time {
now = now.UTC()
filtered := i.Spec.ControlLog[:0]
var latestExpiry time.Time
for _, entry := range i.Spec.ControlLog {
entryTTL := entry.TTL
if entryTTL == 0 {
entryTTL = ttl
}
if entry.Time.IsZero() {
entry.Time = now
}
expiry := entry.Time.Add(entryTTL)
if now.After(expiry) {
continue
}
if expiry.After(latestExpiry) {
latestExpiry = expiry
}
filtered = append(filtered, entry)
}
// ensure that we don't preserve pointers in the now out of
// range portion of the control log by zeroing the diff.
for idx := len(filtered); idx < len(i.Spec.ControlLog); idx++ {
i.Spec.ControlLog[idx] = InstanceControlLogEntry{}
}
i.Spec.ControlLog = filtered
return latestExpiry
}
func (i *InstanceV1) Clone() Instance {
return proto.Clone(i).(*InstanceV1)
}
func (e *InstanceControlLogEntry) Clone() InstanceControlLogEntry {
e.Time = e.Time.UTC()
return *proto.Clone(e).(*InstanceControlLogEntry)
}

View file

@ -0,0 +1,83 @@
/*
Copyright 2022 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestInstanceControlLogExpiry(t *testing.T) {
const ttl = time.Minute
now := time.Now()
instance, err := NewInstance("test-instance", InstanceSpecV1{
LastSeen: now,
})
require.NoError(t, err)
instance.AppendControlLog(
InstanceControlLogEntry{
Type: "foo",
Time: now,
TTL: ttl,
},
InstanceControlLogEntry{
Type: "bar",
Time: now.Add(-ttl / 2),
TTL: ttl,
},
InstanceControlLogEntry{
Type: "bin",
Time: now.Add(-ttl * 2),
TTL: ttl,
},
InstanceControlLogEntry{
Type: "baz",
Time: now,
TTL: time.Hour,
},
)
require.Len(t, instance.GetControlLog(), 4)
instance.SyncLogAndResourceExpiry(ttl)
require.Len(t, instance.GetControlLog(), 3)
require.Equal(t, now.Add(time.Hour).UTC(), instance.Expiry())
instance.SetLastSeen(now.Add(ttl))
instance.SyncLogAndResourceExpiry(ttl)
require.Len(t, instance.GetControlLog(), 2)
require.Equal(t, now.Add(time.Hour).UTC(), instance.Expiry())
instance.AppendControlLog(
InstanceControlLogEntry{
Type: "long-lived",
Time: now,
TTL: time.Hour * 2,
},
)
instance.SyncLogAndResourceExpiry(ttl)
require.Len(t, instance.GetControlLog(), 3)
require.Equal(t, now.Add(time.Hour*2).UTC(), instance.Expiry())
}

File diff suppressed because it is too large Load diff

View file

@ -226,7 +226,7 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) {
services := &Services{
Trust: cfg.Trust,
Presence: cfg.Presence,
PresenceInternal: cfg.Presence,
Provisioner: cfg.Provisioner,
Identity: cfg.Identity,
Access: cfg.Access,
@ -263,7 +263,7 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) {
Services: services,
Cache: services,
keyStore: keyStore,
inventory: inventory.NewController(cfg.Presence),
inventory: inventory.NewController(cfg.Presence, inventory.WithAuthServerID(cfg.HostUUID)),
traceClient: cfg.TraceClient,
fips: cfg.FIPS,
loadAllCAs: cfg.LoadAllCAs,
@ -307,7 +307,7 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) {
type Services struct {
services.Trust
services.Presence
services.PresenceInternal
services.Provisioner
services.Identity
services.Access
@ -2939,16 +2939,94 @@ func (a *Server) GetInventoryStatus(ctx context.Context, req proto.InventoryStat
}
func (a *Server) PingInventory(ctx context.Context, req proto.InventoryPingRequest) (proto.InventoryPingResponse, error) {
const pingAttempt = "ping-attempt"
const pingSuccess = "ping-success"
const maxAttempts = 16
stream, ok := a.inventory.GetControlStream(req.ServerID)
if !ok {
return proto.InventoryPingResponse{}, trace.NotFound("no control stream found for server %q", req.ServerID)
}
d, err := stream.Ping(ctx)
id := insecurerand.Uint64()
if !req.ControlLog {
// this ping doesn't pass through the control log, so just execute it immediately.
d, err := stream.Ping(ctx, id)
return proto.InventoryPingResponse{
Duration: d,
}, trace.Wrap(err)
}
// matchEntry is used to check if our log entry has been included
// in the control log.
matchEntry := func(entry types.InstanceControlLogEntry) bool {
return entry.Type == pingAttempt && entry.ID == id
}
var included bool
for i := 1; i <= maxAttempts; i++ {
stream.VisitInstanceState(func(ref inventory.InstanceStateRef) (update inventory.InstanceStateUpdate) {
// check if we've already successfully included the ping entry
if ref.LastHeartbeat != nil {
if slices.IndexFunc(ref.LastHeartbeat.GetControlLog(), matchEntry) >= 0 {
included = true
return
}
}
// if the entry pending already, we just need to wait
if slices.IndexFunc(ref.QualifiedPendingControlLog, matchEntry) >= 0 {
return
}
// either this is the first iteration, or the pending control log was reset.
update.QualifiedPendingControlLog = append(update.QualifiedPendingControlLog, types.InstanceControlLogEntry{
Type: pingAttempt,
ID: id,
Time: time.Now(),
})
stream.HeartbeatInstance()
return
})
if included {
// entry appeared in control log
break
}
// pause briefly, then re-sync our state. note that this strategy is not scalable. control log usage is intended only
// for periodic operations. control-log based pings are a mechanism for testing/debugging only, hence the use of a
// simple sleep loop.
select {
case <-time.After(time.Millisecond * 100 * time.Duration(i)):
case <-stream.Done():
return proto.InventoryPingResponse{}, trace.Errorf("control stream closed during ping attempt")
case <-ctx.Done():
return proto.InventoryPingResponse{}, trace.Wrap(ctx.Err())
}
}
if !included {
return proto.InventoryPingResponse{}, trace.LimitExceeded("failed to include ping %d in control log for instance %q (max attempts exceeded)", id, req.ServerID)
}
d, err := stream.Ping(ctx, id)
if err != nil {
return proto.InventoryPingResponse{}, trace.Wrap(err)
}
stream.VisitInstanceState(func(_ inventory.InstanceStateRef) (update inventory.InstanceStateUpdate) {
update.UnqualifiedPendingControlLog = append(update.UnqualifiedPendingControlLog, types.InstanceControlLogEntry{
Type: pingSuccess,
ID: id,
Labels: map[string]string{
"duration": d.String(),
},
})
return
})
stream.HeartbeatInstance()
return proto.InventoryPingResponse{
Duration: d,
}, nil

View file

@ -2522,7 +2522,7 @@ func newTestServices(t *testing.T) Services {
return Services{
Trust: local.NewCAService(bk),
Presence: local.NewPresenceService(bk),
PresenceInternal: local.NewPresenceService(bk),
Provisioner: local.NewProvisioningService(bk),
Identity: local.NewIdentityService(bk),
Access: local.NewAccessService(bk),

View file

@ -36,6 +36,7 @@ import (
"github.com/gravitational/teleport/api/constants"
apidefaults "github.com/gravitational/teleport/api/defaults"
devicepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/devicetrust/v1"
"github.com/gravitational/teleport/api/internalutils/stream"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/api/types/wrappers"
@ -769,38 +770,43 @@ func (a *ServerWithRoles) UnstableAssertSystemRole(ctx context.Context, req prot
return a.authServer.UnstableAssertSystemRole(ctx, req)
}
func (a *ServerWithRoles) RegisterInventoryControlStream(ics client.UpstreamInventoryControlStream) error {
// RegisterInventoryControlStream handles the upstream half of the control stream handshake, then passes the control stream to
// the auth server's main control logic. We also return the post-auth hello message back up to the grpcserver layer in order to
// use it for metrics purposes.
func (a *ServerWithRoles) RegisterInventoryControlStream(ics client.UpstreamInventoryControlStream) (proto.UpstreamInventoryHello, error) {
// this value gets set further down
var hello proto.UpstreamInventoryHello
// Ensure that caller is a teleport server
role, ok := a.context.Identity.(BuiltinRole)
if !ok || !role.IsServer() {
return trace.AccessDenied("inventory control streams can only be created by a teleport built-in server")
return hello, trace.AccessDenied("inventory control streams can only be created by a teleport built-in server")
}
// wait for upstream hello
var upstreamHello proto.UpstreamInventoryHello
select {
case msg := <-ics.Recv():
switch m := msg.(type) {
case proto.UpstreamInventoryHello:
upstreamHello = m
hello = m
default:
return trace.BadParameter("expected upstream hello, got: %T", m)
return hello, trace.BadParameter("expected upstream hello, got: %T", m)
}
case <-ics.Done():
return trace.Wrap(ics.Error())
return hello, trace.Wrap(ics.Error())
case <-a.CloseContext().Done():
return trace.Errorf("auth server shutdown")
return hello, trace.Errorf("auth server shutdown")
}
// verify that server is creating stream on behalf of itself.
if upstreamHello.ServerID != role.GetServerID() {
return trace.AccessDenied("control streams do not support impersonation (%q -> %q)", role.GetServerID(), upstreamHello.ServerID)
if hello.ServerID != role.GetServerID() {
return hello, trace.AccessDenied("control streams do not support impersonation (%q -> %q)", role.GetServerID(), hello.ServerID)
}
// in order to reduce sensitivity to downgrades/misconfigurations, we simply filter out
// services that are unrecognized or unauthorized, rather than rejecting hellos that claim them.
var filteredServices []types.SystemRole
for _, service := range upstreamHello.Services {
for _, service := range hello.Services {
if !a.hasBuiltinRole(service) {
log.Warnf("Omitting service %q for control stream of instance %q (unknown or unauthorized).", service, role.GetServerID())
continue
@ -808,9 +814,9 @@ func (a *ServerWithRoles) RegisterInventoryControlStream(ics client.UpstreamInve
filteredServices = append(filteredServices, service)
}
upstreamHello.Services = filteredServices
hello.Services = filteredServices
return a.authServer.RegisterInventoryControlStream(ics, upstreamHello)
return hello, a.authServer.RegisterInventoryControlStream(ics, hello)
}
func (a *ServerWithRoles) GetInventoryStatus(ctx context.Context, req proto.InventoryStatusRequest) (proto.InventoryStatusSummary, error) {
@ -831,6 +837,14 @@ func (a *ServerWithRoles) PingInventory(ctx context.Context, req proto.Inventory
return a.authServer.PingInventory(ctx, req)
}
func (a *ServerWithRoles) GetInstances(ctx context.Context, filter types.InstanceFilter) stream.Stream[types.Instance] {
if err := a.action(apidefaults.Namespace, types.KindInstance, types.VerbList, types.VerbRead); err != nil {
return stream.Fail[types.Instance](trace.Wrap(err))
}
return a.authServer.GetInstances(ctx, filter)
}
func (a *ServerWithRoles) GetClusterAlerts(ctx context.Context, query types.GetClusterAlertsRequest) ([]types.ClusterAlert, error) {
// unauthenticated clients can never check for alerts. we don't normally explicitly
// check for this kind of thing, but since alerts use an unusual access-control

View file

@ -43,6 +43,7 @@ import (
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/constants"
"github.com/gravitational/teleport/api/metadata"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
@ -499,6 +500,12 @@ func (g *GRPCServer) UnstableAssertSystemRole(ctx context.Context, req *proto.Un
return &emptypb.Empty{}, nil
}
// icsServicesToMetricName is a helper for translating service names to keepalive names for control-stream
// purposes. When new services switch to using control-stream based heartbeats, they should be added here.
var icsServiceToMetricName = map[types.SystemRole]string{
types.RoleNode: constants.KeepAliveNode,
}
func (g *GRPCServer) InventoryControlStream(stream proto.AuthService_InventoryControlStreamServer) error {
auth, err := g.authenticate(stream.Context())
if err != nil {
@ -512,10 +519,34 @@ func (g *GRPCServer) InventoryControlStream(stream proto.AuthService_InventoryCo
ics := client.NewUpstreamInventoryControlStream(stream, p.Addr.String())
if err := auth.RegisterInventoryControlStream(ics); err != nil {
hello, err := auth.RegisterInventoryControlStream(ics)
if err != nil {
return trail.ToGRPC(err)
}
// we use a different name for a service in our metrics than we do in certs/hellos. the subset of
// services that currently use ics for heartbeats are registered in the icsServiceToMetricName
// mapping for translation.
var metricServices []string
for _, service := range hello.Services {
if name, ok := icsServiceToMetricName[service]; ok {
metricServices = append(metricServices, name)
}
}
// the heartbeatConnectionsReceived metric counts individual services as individual connections.
heartbeatConnectionsReceived.Add(float64(len(metricServices)))
for _, service := range metricServices {
connectedResources.WithLabelValues(service).Inc()
}
defer func() {
for _, service := range metricServices {
connectedResources.WithLabelValues(service).Dec()
}
}()
// hold open the stream until it completes
<-ics.Done()
@ -554,6 +585,32 @@ func (g *GRPCServer) PingInventory(ctx context.Context, req *proto.InventoryPing
return &rsp, nil
}
func (g *GRPCServer) GetInstances(filter *types.InstanceFilter, stream proto.AuthService_GetInstancesServer) error {
auth, err := g.authenticate(stream.Context())
if err != nil {
return trace.Wrap(err)
}
instances := auth.GetInstances(stream.Context(), *filter)
for instances.Next() {
instance, ok := instances.Item().(*types.InstanceV1)
if !ok {
log.Warnf("Skipping unexpected instance type %T, expected %T.", instances.Item(), instance)
continue
}
if err := stream.Send(instance); err != nil {
instances.Done()
if trace.IsEOF(err) {
return nil
}
return trace.Wrap(err)
}
}
return trace.Wrap(instances.Done())
}
func (g *GRPCServer) GetClusterAlerts(ctx context.Context, query *types.GetClusterAlertsRequest) (*proto.GetClusterAlertsResponse, error) {
auth, err := g.authenticate(ctx)
if err != nil {

View file

@ -106,7 +106,7 @@ type InitConfig struct {
Trust services.Trust
// Presence service is a discovery and heartbeat tracker
Presence services.Presence
Presence services.PresenceInternal
// Provisioner is a service that keeps track of provisioning tokens
Provisioner services.Provisioner

View file

@ -21,6 +21,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"sort"
"strings"
"time"
@ -28,6 +29,7 @@ import (
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/gravitational/teleport/api/internalutils/stream"
"github.com/gravitational/teleport/api/types"
)
@ -105,6 +107,35 @@ func IterateRange(ctx context.Context, bk Backend, startKey []byte, endKey []byt
}
}
// StreamRange constructs a Stream for the given key range. This helper just
// uses standard pagination under the hood, lazily loading pages as needed. Streams
// are currently only used for periodic operations, but if they become more widely
// used in the future, it may become worthwhile to optimize the streaming of backend
// items further. Two potential improvements of note:
//
// 1. update this helper to concurrently load the next page in the background while
// items from the current page are being yielded.
//
// 2. allow individual backends to expose custom streaming methods s.t. the most performant
// impl for a given backend may be used.
func StreamRange(ctx context.Context, bk Backend, startKey, endKey []byte, pageSize int) stream.Stream[Item] {
return stream.PageFunc[Item](func() ([]Item, error) {
if startKey == nil {
return nil, io.EOF
}
rslt, err := bk.GetRange(ctx, startKey, endKey, pageSize)
if err != nil {
return nil, trace.Wrap(err)
}
if len(rslt.Items) < pageSize {
startKey = nil
} else {
startKey = nextKey(rslt.Items[pageSize-1].Key)
}
return rslt.Items, nil
})
}
// Batch implements some batch methods
// that are not mandatory for all interfaces,
// only the ones used in bulk operations.

View file

@ -27,7 +27,6 @@ import (
"github.com/gravitational/teleport/api/client/proto"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/teleport/lib/utils/interval"
)
@ -36,7 +35,12 @@ import (
// to the controller in order for it to be able to handle control streams.
type Auth interface {
UpsertNode(context.Context, types.Server) (*types.KeepAlive, error)
KeepAliveServer(context.Context, types.KeepAlive) error
GetRawInstance(ctx context.Context, serverID string) (types.Instance, []byte, error)
CompareAndSwapInstance(ctx context.Context, instance types.Instance, expect []byte) ([]byte, error)
}
type testEvent string
@ -51,24 +55,50 @@ const (
sshUpsertRetryOk testEvent = "ssh-upsert-retry-ok"
sshUpsertRetryErr testEvent = "ssh-upsert-retry-err"
instanceHeartbeatOk testEvent = "instance-heartbeat-ok"
instanceHeartbeatErr testEvent = "instance-heartbeat-err"
instanceCompareFailed testEvent = "instance-compare-failed"
handlerStart = "handler-start"
handlerClose = "handler-close"
)
// intervalKey is used to uniquely identify the subintervals registered with the interval.MultiInterval
// instance that we use for managing periodics associated with upstream handles.
type intervalKey int
const (
instanceHeartbeatKey intervalKey = 1 + iota
serverKeepAliveKey
)
type controllerOptions struct {
serverKeepAlive time.Duration
instanceHBInterval time.Duration
testEvents chan testEvent
maxKeepAliveErrs int
authID string
}
func (options *controllerOptions) SetDefaults() {
if options.serverKeepAlive == 0 {
baseKeepAlive := apidefaults.ServerKeepAliveTTL()
if options.serverKeepAlive == 0 {
// use 1.5x standard server keep alive since we use a jitter that
// shortens the actual average interval.
options.serverKeepAlive = baseKeepAlive + (baseKeepAlive / 2)
}
if options.instanceHBInterval == 0 {
// we use 3.5x the server keep alive ttl in order to limit the amount of instance
// hbs since they are potentially more costly. 3.5x is arbitrary, but since server expiry
// is 10x keepalive TTL under standard configuration, this lets us be certain that we'll
// heartbeat at most 3 times within that window, which is a reasonable target
// when trying to strike a balance between reliable heartbeating and minimizing the increased
// resource utilization incurred by this feature.
options.instanceHBInterval = (baseKeepAlive * 3) + (baseKeepAlive / 2)
}
if options.maxKeepAliveErrs == 0 {
// fail on third error. this is arbitrary, but feels reasonable since if we're on our
// third error, 3-4m have gone by, so the problem is almost certainly persistent.
@ -78,12 +108,24 @@ func (options *controllerOptions) SetDefaults() {
type ControllerOption func(c *controllerOptions)
func WithAuthServerID(serverID string) ControllerOption {
return func(opts *controllerOptions) {
opts.authID = serverID
}
}
func withServerKeepAlive(d time.Duration) ControllerOption {
return func(opts *controllerOptions) {
opts.serverKeepAlive = d
}
}
func withInstanceHBInterval(d time.Duration) ControllerOption {
return func(opts *controllerOptions) {
opts.instanceHBInterval = d
}
}
func withTestEventsChannel(ch chan testEvent) ControllerOption {
return func(opts *controllerOptions) {
opts.testEvents = ch
@ -95,8 +137,10 @@ func withTestEventsChannel(ch chan testEvent) ControllerOption {
type Controller struct {
store *Store
auth Auth
authID string
serverKeepAlive time.Duration
serverTTL time.Duration
instanceHBInterval time.Duration
maxKeepAliveErrs int
testEvents chan testEvent
closeContext context.Context
@ -116,8 +160,10 @@ func NewController(auth Auth, opts ...ControllerOption) *Controller {
store: NewStore(),
serverKeepAlive: options.serverKeepAlive,
serverTTL: apidefaults.ServerAnnounceTTL,
instanceHBInterval: options.instanceHBInterval,
maxKeepAliveErrs: options.maxKeepAliveErrs,
auth: auth,
authID: options.authID,
testEvents: options.testEvents,
closeContext: ctx,
cancel: cancel,
@ -126,7 +172,18 @@ func NewController(auth Auth, opts ...ControllerOption) *Controller {
// RegisterControlStream registers a new control stream with the controller.
func (c *Controller) RegisterControlStream(stream client.UpstreamInventoryControlStream, hello proto.UpstreamInventoryHello) {
handle := newUpstreamHandle(stream, hello)
// set up ticker with instance HB sub-interval. additional sub-intervals are added as needed.
// note that we are using fullJitter on the first duration to spread out initial instance heartbeats
// as much as possible. this is intended to mitigate load spikes on auth restart, and is reasonably
// safe to do since the instance resource is not directly relied upon for use of any particular teleport
// service.
ticker := interval.NewMulti(interval.SubInterval[intervalKey]{
Key: instanceHeartbeatKey,
Duration: c.instanceHBInterval,
FirstDuration: fullJitter(c.instanceHBInterval),
Jitter: seventhJitter,
})
handle := newUpstreamHandle(stream, hello, ticker)
c.store.Insert(handle)
go c.handleControlStream(handle)
}
@ -159,14 +216,13 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
defer func() {
c.store.Remove(handle)
handle.Close() // no effect if CloseWithError was called below
handle.ticker.Stop()
c.testEvent(handlerClose)
}()
keepAliveInterval := interval.New(interval.Config{
Duration: c.serverKeepAlive,
FirstDuration: utils.HalfJitter(c.serverKeepAlive),
Jitter: retryutils.NewSeventhJitter(),
})
defer keepAliveInterval.Stop()
// keepAliveInit tracks wether or not we've initialized the server keepalive sub-interval. we do this lazily
// upon receipt of the first heartbeat since not all servers send heartbeats.
var keepAliveInit bool
for {
select {
@ -177,10 +233,20 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
handle.CloseWithError(trace.BadParameter("unexpected upstream hello"))
return
case proto.InventoryHeartbeat:
if err := c.handleHeartbeat(handle, m); err != nil {
if err := c.handleHeartbeatMsg(handle, m); err != nil {
handle.CloseWithError(err)
return
}
if !keepAliveInit {
// this is the first heartbeat, so we need to initialize the keepalive sub-interval
handle.ticker.Push(interval.SubInterval[intervalKey]{
Key: serverKeepAliveKey,
Duration: c.serverKeepAlive,
FirstDuration: halfJitter(c.serverKeepAlive),
Jitter: seventhJitter,
})
keepAliveInit = true
}
case proto.UpstreamInventoryPong:
c.handlePong(handle, m)
default:
@ -188,11 +254,21 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
handle.CloseWithError(trace.BadParameter("unexpected upstream message type %T", m))
return
}
case <-keepAliveInterval.Next():
if err := c.handleKeepAlive(handle); err != nil {
case tick := <-handle.ticker.Next():
switch tick.Key {
case instanceHeartbeatKey:
if err := c.heartbeatInstanceState(handle, tick.Time); err != nil {
handle.CloseWithError(err)
return
}
case serverKeepAliveKey:
if err := c.keepAliveServer(handle, tick.Time); err != nil {
handle.CloseWithError(err)
return
}
default:
log.Warnf("Unexpected sub-interval key '%v' in control stream handler of server %q (this is a bug).", tick.Key, handle.Hello().ServerID)
}
case req := <-handle.pingC:
// pings require multiplexing, so we need to do the sending from this
// goroutine rather than sending directly via the handle.
@ -208,6 +284,117 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
}
}
func (c *Controller) heartbeatInstanceState(handle *upstreamHandle, now time.Time) error {
tracker := &handle.stateTracker
tracker.mu.Lock()
defer tracker.mu.Unlock()
// withoutLock is used to perform backend I/O outside of the tracker lock. use of this
// helper rather than a manual lock/unlock is useful if we ever have a panic in backend logic,
// since it preserves the original normal panic rather than causing the less helpful and unrecoverable
// 'unlock of unlocked mutex' we would otherwise experience.
withoutLock := func(fn func()) {
tracker.mu.Unlock()
defer tracker.mu.Lock()
fn()
}
if tracker.lastHeartbeat == nil {
// tracker stores the most recent heartbeat value, as well as the associated "raw" value to be used
// for CompareAndSwap ops. if we don't have the current heartbeat value, either this is our first
// time heartbeating this instance, or we had to reset due to a concurrent update.
var (
lastHB types.Instance
lastRawHB []byte
err error
)
// perform backend I/O outside of lock
withoutLock(func() {
lastHB, lastRawHB, err = c.auth.GetRawInstance(c.closeContext, handle.Hello().ServerID)
})
if err != nil && !trace.IsNotFound(err) {
log.Warnf("Failed to load resource for instance %q: %v", handle.Hello().ServerID, err)
if !tracker.retryHeartbeat {
// suppress failure and retry exactly once
tracker.retryHeartbeat = true
return nil
}
// we've already retried, problem is less likely to be transient. close the stream.
return trace.Wrap(err)
}
if err == nil {
tracker.lastHeartbeat = lastHB
tracker.lastRawHeartbeat = lastRawHB
// tracker was advertising nil prev state, but prev state was non-nil, so the qualified
// control log is invalidated and must be reset. see comments on InstanceStateTracker.QualifiedPendingControlLog
// for details.
tracker.qualifiedPendingControlLog = nil
}
}
instance, err := tracker.nextHeartbeat(now, handle.Hello(), c.authID)
if err != nil {
log.Warnf("Failed to construct next heartbeat value for instance %q: %v (this is a bug)", handle.Hello().ServerID, err)
return trace.Wrap(err)
}
// update expiry values using serverTTL as default resource/log ttl.
instance.SyncLogAndResourceExpiry(c.serverTTL)
// record the length of the pending control logs at the time our heartbeat was
// built. since I/O is performed outside of the lock, we need to be able to distinguish
// which entries are old when we re-enter the lock.
qn := len(tracker.qualifiedPendingControlLog)
un := len(tracker.unqualifiedPendingControlLog)
var raw []byte
// perform backend I/O outside of lock
withoutLock(func() {
raw, err = c.auth.CompareAndSwapInstance(c.closeContext, instance, tracker.lastRawHeartbeat)
})
if err != nil {
if trace.IsCompareFailed(err) {
// previous hb values are outdated and must be cleared (will be reloaded on next tick).
tracker.lastHeartbeat = nil
tracker.lastRawHeartbeat = nil
// tracker was advertising incorrect prev state, so the qualified control log is invalidated
// and must be reset. see comments on InstanceStateTracker.QualifiedPendingControlLog for details.
tracker.qualifiedPendingControlLog = nil
log.Debugf("Failed to hb instance %q: %v", handle.Hello().ServerID, err)
c.testEvent(instanceCompareFailed)
return nil
}
// all other error variants cause us to drop the control stream, so just log the error and
// don't bother resetting anything.
log.Warnf("Failed to hb instance %q: %v", handle.Hello().ServerID, err)
c.testEvent(instanceHeartbeatErr)
if !tracker.retryHeartbeat {
// suppress failure and retry exactly once
tracker.retryHeartbeat = true
return nil
}
return trace.Wrap(err)
}
// trim pending control log elements that have now been successfully written
tracker.qualifiedPendingControlLog = tracker.qualifiedPendingControlLog[qn:]
tracker.unqualifiedPendingControlLog = tracker.unqualifiedPendingControlLog[un:]
// update 'last heartbeat' values.
tracker.lastHeartbeat = instance
tracker.lastRawHeartbeat = raw
tracker.retryHeartbeat = false
c.testEvent(instanceHeartbeatOk)
return nil
}
func (c *Controller) handlePong(handle *upstreamHandle, msg proto.UpstreamInventoryPong) {
pending, ok := handle.pings[msg.ID]
if !ok {
@ -221,9 +408,8 @@ func (c *Controller) handlePong(handle *upstreamHandle, msg proto.UpstreamInvent
}
func (c *Controller) handlePingRequest(handle *upstreamHandle, req pingRequest) error {
handle.pingCounter++
ping := proto.DownstreamInventoryPing{
ID: handle.pingCounter,
ID: req.id,
}
start := time.Now()
if err := handle.Send(c.closeContext, ping); err != nil {
@ -232,14 +418,18 @@ func (c *Controller) handlePingRequest(handle *upstreamHandle, req pingRequest)
}
return trace.Wrap(err)
}
handle.pings[handle.pingCounter] = pendingPing{
handle.pings[req.id] = pendingPing{
start: start,
rspC: req.rspC,
}
return nil
}
func (c *Controller) handleHeartbeat(handle *upstreamHandle, hb proto.InventoryHeartbeat) error {
func (c *Controller) handleHeartbeatMsg(handle *upstreamHandle, hb proto.InventoryHeartbeat) error {
// XXX: when adding new services to the heartbeat logic, make sure to also update the
// 'icsServiceToMetricName' mapping in auth/grpcserver.go in order to ensure that metrics
// start counting the control stream as a registered keepalive stream for that service.
if hb.SSHServer != nil {
if err := c.handleSSHServerHB(handle, hb.SSHServer); err != nil {
return trace.Wrap(err)
@ -265,7 +455,9 @@ func (c *Controller) handleSSHServerHB(handle *upstreamHandle, sshServer *types.
sshServer.SetAddr(utils.ReplaceLocalhost(sshServer.GetAddr(), handle.PeerAddr()))
}
sshServer.SetExpiry(time.Now().Add(c.serverTTL).UTC())
now := time.Now()
sshServer.SetExpiry(now.Add(c.serverTTL).UTC())
lease, err := c.auth.UpsertNode(c.closeContext, sshServer)
if err == nil {
@ -286,10 +478,10 @@ func (c *Controller) handleSSHServerHB(handle *upstreamHandle, sshServer *types.
return nil
}
func (c *Controller) handleKeepAlive(handle *upstreamHandle) error {
func (c *Controller) keepAliveServer(handle *upstreamHandle, now time.Time) error {
if handle.sshServerLease != nil {
lease := *handle.sshServerLease
lease.Expires = time.Now().Add(c.serverTTL).UTC()
lease.Expires = now.Add(c.serverTTL).UTC()
if err := c.auth.KeepAliveServer(c.closeContext, lease); err != nil {
c.testEvent(sshKeepAliveErr)
handle.sshServerKeepAliveErrs++

View file

@ -17,6 +17,7 @@ limitations under the License.
package inventory
import (
"bytes"
"context"
"sync"
"testing"
@ -29,6 +30,7 @@ import (
"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/utils"
)
type fakeAuth struct {
@ -42,6 +44,12 @@ type fakeAuth struct {
expectAddr string
unexpectedAddrs int
failGetRawInstance int
failCompareAndSwapInstance int
lastInstance types.Instance
lastRawInstance []byte
}
func (a *fakeAuth) UpsertNode(_ context.Context, server types.Server) (*types.KeepAlive, error) {
@ -71,13 +79,49 @@ func (a *fakeAuth) KeepAliveServer(_ context.Context, _ types.KeepAlive) error {
return a.err
}
// TestControllerBasics verifies basic expected behaviors for a single control stream.
func TestControllerBasics(t *testing.T) {
func (a *fakeAuth) GetRawInstance(ctx context.Context, serverID string) (types.Instance, []byte, error) {
a.mu.Lock()
defer a.mu.Unlock()
if a.failGetRawInstance > 0 {
a.failGetRawInstance--
return nil, nil, trace.Errorf("get raw instance failed as test condition")
}
if a.lastRawInstance == nil {
return nil, nil, trace.NotFound("no instance in fake/test auth")
}
return a.lastInstance, a.lastRawInstance, nil
}
func (a *fakeAuth) CompareAndSwapInstance(ctx context.Context, instance types.Instance, expect []byte) ([]byte, error) {
a.mu.Lock()
defer a.mu.Unlock()
if a.failCompareAndSwapInstance > 0 {
a.failCompareAndSwapInstance--
return nil, trace.Errorf("cas instance failed as test condition")
}
if !bytes.Equal(a.lastRawInstance, expect) {
return nil, trace.CompareFailed("expect value does not match")
}
a.lastInstance = instance.Clone()
var err error
a.lastRawInstance, err = utils.FastMarshal(instance)
if err != nil {
panic("fastmarshal of instance should be infallible")
}
return a.lastRawInstance, nil
}
// TestSSHServerBasics verifies basic expected behaviors for a single control stream heartbeating
// an ssh service.
func TestSSHServerBasics(t *testing.T) {
const serverID = "test-server"
const zeroAddr = "0.0.0.0:123"
const peerAddr = "1.2.3.4:456"
const wantAddr = "1.2.3.4:123"
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -174,8 +218,9 @@ func TestControllerBasics(t *testing.T) {
// limit time of ping call
pingCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
// execute ping
_, err = handle.Ping(pingCtx)
_, err = handle.Ping(pingCtx, 1)
require.NoError(t, err)
// set up to induce enough consecutive errors to cause stream closure
@ -223,6 +268,243 @@ func TestControllerBasics(t *testing.T) {
require.Zero(t, unexpectedAddrs)
}
// TestInstanceHeartbeat verifies basic expected behaviors for instance heartbeat.
func TestInstanceHeartbeat(t *testing.T) {
const serverID = "test-instance"
const peerAddr = "1.2.3.4:456"
const includeAttempts = 16
t.Parallel()
events := make(chan testEvent, 1024)
auth := &fakeAuth{}
controller := NewController(
auth,
withInstanceHBInterval(time.Millisecond*200),
withTestEventsChannel(events),
)
defer controller.Close()
// set up fake in-memory control stream
upstream, downstream := client.InventoryControlStreamPipe(client.ICSPipePeerAddr(peerAddr))
controller.RegisterControlStream(upstream, proto.UpstreamInventoryHello{
ServerID: serverID,
Version: teleport.Version,
Services: []types.SystemRole{types.RoleNode},
})
// verify that control stream handle is now accessible
handle, ok := controller.GetControlStream(serverID)
require.True(t, ok)
// verify that instance heartbeat succeeds
awaitEvents(t, events,
expect(instanceHeartbeatOk),
deny(instanceHeartbeatErr, instanceCompareFailed, handlerClose),
)
auth.mu.Lock()
auth.lastInstance.AppendControlLog(types.InstanceControlLogEntry{
Type: "concurrent-test-event",
ID: 1,
Time: time.Now(),
})
auth.lastRawInstance, _ = utils.FastMarshal(auth.lastInstance)
auth.mu.Unlock()
// wait for us to hit CompareFailed
awaitEvents(t, events,
expect(instanceCompareFailed),
deny(instanceHeartbeatErr, handlerClose),
)
// expect that we immediately recover on next iteration
awaitEvents(t, events,
expect(instanceHeartbeatOk),
deny(instanceHeartbeatErr, instanceCompareFailed, handlerClose),
)
// attempt qualified event inclusion
var included bool
for i := 0; i < includeAttempts; i++ {
handle.VisitInstanceState(func(ref InstanceStateRef) (update InstanceStateUpdate) {
// check if we've already successfully included the ping entry
if ref.LastHeartbeat != nil {
for _, entry := range ref.LastHeartbeat.GetControlLog() {
if entry.Type == "qualified" && entry.ID == 2 {
included = true
return
}
}
}
// check if the ping entry is in the pinding log
for _, entry := range ref.QualifiedPendingControlLog {
if entry.Type == "qualified" && entry.ID == 2 {
return
}
}
update.QualifiedPendingControlLog = append(update.QualifiedPendingControlLog, types.InstanceControlLogEntry{
Type: "qualified",
ID: 2,
})
handle.HeartbeatInstance()
return
})
if included {
break
}
awaitEvents(t, events,
expect(instanceHeartbeatOk),
deny(instanceHeartbeatErr, instanceCompareFailed, handlerClose),
)
}
require.True(t, included)
// attempt unqualified event inclusion
handle.VisitInstanceState(func(_ InstanceStateRef) (update InstanceStateUpdate) {
update.UnqualifiedPendingControlLog = append(update.UnqualifiedPendingControlLog, types.InstanceControlLogEntry{
Type: "unqualified",
ID: 3,
})
handle.HeartbeatInstance()
return
})
included = false
for i := 0; i < includeAttempts; i++ {
awaitEvents(t, events,
expect(instanceHeartbeatOk),
deny(instanceHeartbeatErr, instanceCompareFailed, handlerClose),
)
handle.VisitInstanceState(func(ref InstanceStateRef) (_ InstanceStateUpdate) {
if ref.LastHeartbeat != nil {
for _, entry := range ref.LastHeartbeat.GetControlLog() {
if entry.Type == "unqualified" && entry.ID == 3 {
included = true
return
}
}
}
return
})
if included {
break
}
}
require.True(t, included)
// set up single failure of CAS. stream should recover.
auth.mu.Lock()
auth.failCompareAndSwapInstance = 1
auth.mu.Unlock()
// verify that heartbeat error occurs
awaitEvents(t, events,
expect(instanceHeartbeatErr),
deny(instanceCompareFailed, handlerClose),
)
// verify that recovery happens
awaitEvents(t, events,
expect(instanceHeartbeatOk),
deny(instanceHeartbeatErr, instanceCompareFailed, handlerClose),
)
var unqualifiedCount int
// confirm that qualified pending control log is reset on failed CompareAndSwap but
// unqualified pending control log is not.
for i := 0; i < includeAttempts; i++ {
handle.VisitInstanceState(func(ref InstanceStateRef) (update InstanceStateUpdate) {
if i%2 == 0 {
update.QualifiedPendingControlLog = append(update.QualifiedPendingControlLog, types.InstanceControlLogEntry{
Type: "never",
ID: 4,
})
} else {
unqualifiedCount++
update.UnqualifiedPendingControlLog = append(update.UnqualifiedPendingControlLog, types.InstanceControlLogEntry{
Type: "always",
ID: uint64(unqualifiedCount),
})
}
// inject concurrent update to cause CompareAndSwap to fail. we do this while the tracker
// lock is held to prevent concurrent injection of the qualified control log event.
auth.mu.Lock()
auth.lastInstance.AppendControlLog(types.InstanceControlLogEntry{
Type: "concurrent-test-event",
ID: 1,
Time: time.Now(),
})
auth.lastRawInstance, _ = utils.FastMarshal(auth.lastInstance)
auth.mu.Unlock()
handle.HeartbeatInstance()
return
})
// wait to hit CompareFailed.
awaitEvents(t, events,
expect(instanceCompareFailed),
deny(instanceHeartbeatErr, handlerClose),
)
// wait for recovery.
awaitEvents(t, events,
expect(instanceHeartbeatOk),
deny(instanceHeartbeatErr, instanceCompareFailed, handlerClose),
)
}
// verify that none of the qualified events were ever heartbeat because
// a reset always occurred.
var unqualifiedIncludes int
handle.VisitInstanceState(func(ref InstanceStateRef) (_ InstanceStateUpdate) {
require.NotNil(t, ref.LastHeartbeat)
for _, entry := range ref.LastHeartbeat.GetControlLog() {
require.NotEqual(t, entry.Type, "never")
if entry.Type == "always" {
unqualifiedIncludes++
}
}
return
})
require.Equal(t, unqualifiedCount, unqualifiedIncludes)
// set up double failure of CAS. stream should not recover.
auth.mu.Lock()
auth.failCompareAndSwapInstance = 2
auth.mu.Unlock()
// expect failure and handle closure
awaitEvents(t, events,
expect(instanceHeartbeatErr, handlerClose),
)
// verify that closure propagates to server and client side interfaces
closeTimeout := time.After(time.Second * 10)
select {
case <-handle.Done():
case <-closeTimeout:
t.Fatal("timeout waiting for handle closure")
}
select {
case <-downstream.Done():
case <-closeTimeout:
t.Fatal("timeout waiting for handle closure")
}
// verify that control log entries survived the above sequence
auth.mu.Lock()
logSize := len(auth.lastInstance.GetControlLog())
auth.mu.Unlock()
require.Greater(t, logSize, 2)
}
type eventOpts struct {
expect map[testEvent]int
deny map[testEvent]struct{}

View file

@ -29,6 +29,18 @@ import (
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/teleport/lib/utils/interval"
vc "github.com/gravitational/teleport/lib/versioncontrol"
)
// we use dedicated global jitters for all the intervals/retires in this
// package. we do this because our jitter usage in this package can scale by
// the number of concurrent connections to auth, making dedicated jitters a
// poor choice (high memory usage for all the rngs).
var (
seventhJitter = retryutils.NewShardedSeventhJitter()
halfJitter = retryutils.NewShardedHalfJitter()
fullJitter = retryutils.NewShardedFullJitter()
)
// DownstreamCreateFunc is a function that creates a downstream inventory control stream.
@ -260,22 +272,184 @@ type UpstreamHandle interface {
// Hello gets the cached upstream hello that was used to initialize the stream.
Hello() proto.UpstreamInventoryHello
Ping(ctx context.Context) (d time.Duration, err error)
Ping(ctx context.Context, id uint64) (d time.Duration, err error)
// HasService is a helper for checking if a given service is associated with this
// stream.
HasService(types.SystemRole) bool
// VisitInstanceState runs the provided closure against a representation of the most
// recently observed instance state, plus any pending control log entries. The returned
// value may optionally include additional control log entries to add to the pending
// queues. Inputs and outputs are deep copied to avoid concurrency issues. See the InstanceStateTracker
// for an explanation of how this system works.
VisitInstanceState(func(ref InstanceStateRef) InstanceStateUpdate)
// HeartbeatInstance triggers an early instance heartbeat. This function does not
// wait for the instance heartbeat to actually be completed, so calling this and then
// immediately locking the instanceStateTracker will likely result in observing the
// pre-heartbeat state.
HeartbeatInstance()
}
// instanceStateTracker tracks the state of a connected instance from the point of view of
// the inventory controller. Values in this struct tend to be lazily updated/consumed. For
// example, the LastHeartbeat value is nil until the first attempt to heartbeat the instance
// has been made *for this TCP connection*, meaning that you can't distinguish between an instance
// that just joined the cluster from one that just reconnected by looking at this struct. Similarly,
// when using this struct to inject control log entries, said entries won't be included until the next
// normal instance heartbeat (though this may be triggered early via UpstreamHandle.HeartbeatInstance()).
// The primary intended usage pattern is for periodic operations to append entries to the QualifiedPendingControlLog,
// and then observe wether or not said entries end up being included on subsequent iterations. This
// patterns lets us achieve a kind of lazy "locking", whereby complex coordination can occur without
// large spikes in backend load. See the QualifiedPendingControlLog field for an example of this pattern.
type instanceStateTracker struct {
// mu protects all below fields and must be locked during access of any of the
// below fields.
mu sync.Mutex
// qualifiedPendingControlLog encodes sensitive control log entries that should be written to the backend
// on the next heartbeat. Appending a value to this field does not guarantee that it will be written. The
// qualified pending control log is reset if a concurrent create/write occurs. This is a deliberate design
// choice that is intended to force recalculation of the conditions that merited the log entry being applied
// if/when a concurrent update occurs. Take, for example, a log entry indicating that an upgrade will be
// attempted. We don't want to double-attempt an upgrade, so if the underlying resource is concurrently updated,
// we need to reexamine the new state before deciding if an upgrade attempt is still adviseable. This loop may
// continue indefinitely until we either observe that our entry was successfully written, or that the condition
// which originally merited the write no longer holds.
//
// NOTE: Since mu is not held *during* an attempt to write to the backend, it is important
// that this slice is only appended to and never reordered. The heartbeat logic relies upon the ability
// to trim the first N elements upon successful write in order to avoid skipping and/or double-writing
// a given entry.
//
qualifiedPendingControlLog []types.InstanceControlLogEntry
// unqualifiedPendingControlLog is functionally equivalent to QualifiedPendingControlLog except that it is not
// reset upon concurrent create/update. Appending items here is effectively "fire and forget", though items may
// still not make it into the control log of the underlying resource if the instance disconnects or the auth server
// restarts before the next successful write. As a general rule, use the QualifiedPendingControlLog to announce your
// intention to perform an action, and use the UnqualifiedPendingControlLog to store the results of an action.
//
// NOTE: Since mu is not held *during* an attempt to write to the backend, it is important
// that this slice is only appended to and never reordered. The heartbeat logic relies upon the ability
// to trim the first N elements upon successful write in order to avoid skipping and/or double-writing
// a given entry.
unqualifiedPendingControlLog []types.InstanceControlLogEntry
// lastHeartbeat is the last observed heartbeat for this instance. This field is filled lazily and
// will be nil if the instance only recently connected or joined. Operations that expect to be able to
// observe the committed state of the instance control log should skip instances for which this field is nil.
lastHeartbeat types.Instance
// lastRawHeartbeat is the raw backend value associated with LastHeartbeat. Used to
// enabled CompareAndSwap based updates.
lastRawHeartbeat []byte
// retryHeartbeat is set to true if an unexpected error is hit. We retry exactly once, closing
// the stream if the retry does not succeede.
retryHeartbeat bool
}
// InstanceStateRef is a helper used to present a copy of the public subset of instanceStateTracker. Used by
// the VisitInstanceState helper to show callers the current state without risking concurrency issues due
// to misuse.
type InstanceStateRef struct {
QualifiedPendingControlLog []types.InstanceControlLogEntry
UnqualifiedPendingControlLog []types.InstanceControlLogEntry
LastHeartbeat types.Instance
}
// InstanceStateUpdate encodes additional pending control log entries that should be included in future heartbeats. Used by
// the VisitInstanceState helper to provide a mechanism of appending to the primary pending queues without risking
// concurrency issues due to misuse.
type InstanceStateUpdate struct {
QualifiedPendingControlLog []types.InstanceControlLogEntry
UnqualifiedPendingControlLog []types.InstanceControlLogEntry
}
// VisitInstanceState provides a mechanism of viewing and potentially updating the instance control log of a
// given instance. The supplied closure is given a view of the most recent successful heartbeat, as well as
// any existing pending entries. It may then return additional pending entries. This method performs
// significant defensive copying, so care should be taken to limit its use.
func (h *upstreamHandle) VisitInstanceState(fn func(InstanceStateRef) InstanceStateUpdate) {
h.stateTracker.mu.Lock()
defer h.stateTracker.mu.Unlock()
var ref InstanceStateRef
// copy over last heartbeat if set
if h.stateTracker.lastHeartbeat != nil {
ref.LastHeartbeat = h.stateTracker.lastHeartbeat.Clone()
}
// copy over control log entries
ref.QualifiedPendingControlLog = cloneAppendLog(ref.QualifiedPendingControlLog, h.stateTracker.qualifiedPendingControlLog...)
ref.UnqualifiedPendingControlLog = cloneAppendLog(ref.UnqualifiedPendingControlLog, h.stateTracker.unqualifiedPendingControlLog...)
// run closure
update := fn(ref)
// copy updates back into state tracker
h.stateTracker.qualifiedPendingControlLog = cloneAppendLog(h.stateTracker.qualifiedPendingControlLog, update.QualifiedPendingControlLog...)
h.stateTracker.unqualifiedPendingControlLog = cloneAppendLog(h.stateTracker.unqualifiedPendingControlLog, update.UnqualifiedPendingControlLog...)
}
// cloneAppendLog is a helper for performing deep copies of control log entries
func cloneAppendLog(log []types.InstanceControlLogEntry, entries ...types.InstanceControlLogEntry) []types.InstanceControlLogEntry {
for _, entry := range entries {
log = append(log, entry.Clone())
}
return log
}
// WithLock runs the provided closure with the tracker lock held.
func (i *instanceStateTracker) WithLock(fn func()) {
i.mu.Lock()
defer i.mu.Unlock()
fn()
}
// nextHeartbeat calculates the next heartbeat value. *Must* be called only while lock is held.
func (i *instanceStateTracker) nextHeartbeat(now time.Time, hello proto.UpstreamInventoryHello, authID string) (types.Instance, error) {
instance, err := types.NewInstance(hello.ServerID, types.InstanceSpecV1{
Version: vc.Normalize(hello.Version),
Services: hello.Services,
Hostname: hello.Hostname,
AuthID: authID,
LastSeen: now.UTC(),
})
if err != nil {
return nil, trace.Wrap(err)
}
// preserve control log entries from previous instance if present
if i.lastHeartbeat != nil {
instance.AppendControlLog(i.lastHeartbeat.GetControlLog()...)
}
if len(i.qualifiedPendingControlLog) > 0 {
instance.AppendControlLog(i.qualifiedPendingControlLog...)
}
if len(i.unqualifiedPendingControlLog) > 0 {
instance.AppendControlLog(i.unqualifiedPendingControlLog...)
}
return instance, nil
}
type upstreamHandle struct {
client.UpstreamInventoryControlStream
hello proto.UpstreamInventoryHello
ticker *interval.MultiInterval[intervalKey]
pingC chan pingRequest
stateTracker instanceStateTracker
// --- fields below this point only safe for access by handler goroutine
// pingCounter is incremented on pings, and used as the ping multiplexing ID.
pingCounter uint64
// pings are in-flight pings to be multiplexed by ID.
pings map[uint64]pendingPing
@ -283,7 +457,6 @@ type upstreamHandle struct {
sshServer *types.ServerV2
// retryUpstert inidcates that writing the ssh server lease failed and should be retried.
retrySSHServerUpsert bool
// sshServerLease is used to keep alive an ssh server resource that was previously
// sent over a heartbeat.
sshServerLease *types.KeepAlive
@ -292,13 +465,17 @@ type upstreamHandle struct {
sshServerKeepAliveErrs int
}
func newUpstreamHandle(stream client.UpstreamInventoryControlStream, hello proto.UpstreamInventoryHello) *upstreamHandle {
pings := make(map[uint64]pendingPing)
func (h *upstreamHandle) HeartbeatInstance() {
h.ticker.FireNow(instanceHeartbeatKey)
}
func newUpstreamHandle(stream client.UpstreamInventoryControlStream, hello proto.UpstreamInventoryHello, ticker *interval.MultiInterval[intervalKey]) *upstreamHandle {
return &upstreamHandle{
UpstreamInventoryControlStream: stream,
pingC: make(chan pingRequest),
hello: hello,
pings: pings,
pings: make(map[uint64]pendingPing),
ticker: ticker,
}
}
@ -308,6 +485,7 @@ type pendingPing struct {
}
type pingRequest struct {
id uint64
rspC chan pingResponse
}
@ -316,10 +494,10 @@ type pingResponse struct {
err error
}
func (h *upstreamHandle) Ping(ctx context.Context) (d time.Duration, err error) {
func (h *upstreamHandle) Ping(ctx context.Context, id uint64) (d time.Duration, err error) {
rspC := make(chan pingResponse, 1)
select {
case h.pingC <- pingRequest{rspC}:
case h.pingC <- pingRequest{rspC: rspC, id: id}:
case <-h.Done():
return 0, trace.Errorf("failed to send downstream ping (stream closed)")
case <-ctx.Done():

View file

@ -153,7 +153,7 @@ type Config struct {
Trust services.Trust
// Presence service is a discovery and hearbeat tracker
Presence services.Presence
Presence services.PresenceInternal
// Events is events service
Events types.Events

View file

@ -951,6 +951,7 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) {
ServerID: cfg.HostUUID,
Version: teleport.Version,
Services: process.getInstanceRoles(),
Hostname: cfg.Hostname,
})
process.inventoryHandle.RegisterPingHandler(func(sender inventory.DownstreamSender, ping proto.DownstreamInventoryPing) {

49
lib/services/inventory.go Normal file
View file

@ -0,0 +1,49 @@
/*
Copyright 2021 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package services
import (
"context"
"github.com/gravitational/teleport/api/internalutils/stream"
"github.com/gravitational/teleport/api/types"
)
// Inventory is a subset of Presence dedicated to tracking the status of all
// teleport instances independent of any specific service.
//
// NOTE: the instance resource scales linearly with cluster size and is not cached in a traditional
// manner. as such, it is should not be accessed as part of the "hot path" of any normal request.
type Inventory interface {
// GetInstances iterates the full teleport server inventory.
GetInstances(ctx context.Context, req types.InstanceFilter) stream.Stream[types.Instance]
}
// InventoryInternal is a subset of the PresenceInternal interface that extends
// inventory functionality with auth-specific internal methods.
type InventoryInternal interface {
Inventory
// GetRawInstance gets an instance resource as it appears in the backend, along with its
// associated raw key value for use with CompareAndSwapInstance.
GetRawInstance(ctx context.Context, serverID string) (types.Instance, []byte, error)
// CompareAndSwapInstance creates or updates the underlying instance resource based on the currently
// expected value. The first call to this method should use the value returned by GetRawInstance for the
// 'expect' parameter. Subsequent calls should use the value returned by this method.
CompareAndSwapInstance(ctx context.Context, instance types.Instance, expect []byte) ([]byte, error)
}

View file

@ -0,0 +1,149 @@
/*
Copyright 2022 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"github.com/gravitational/trace"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/internalutils/stream"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/utils"
)
// GetInstances iterates all teleport instances.
func (s *PresenceService) GetInstances(ctx context.Context, req types.InstanceFilter) stream.Stream[types.Instance] {
const pageSize = 10_000
if req.ServerID != "" {
instance, _, err := s.GetRawInstance(ctx, req.ServerID)
if err != nil {
if trace.IsNotFound(err) {
return stream.Empty[types.Instance]()
}
return stream.Fail[types.Instance](trace.Wrap(err))
}
if !req.Match(instance) {
return stream.Empty[types.Instance]()
}
return stream.Once(instance)
}
startKey := backend.Key(instancePrefix, "")
endKey := backend.RangeEnd(startKey)
items := backend.StreamRange(ctx, s, startKey, endKey, pageSize)
return stream.FilterMap(items, func(item backend.Item) (types.Instance, bool) {
var instance types.InstanceV1
if err := utils.FastUnmarshal(item.Value, &instance); err != nil {
s.log.Warnf("Skipping instance at %s, failed to unmarshal: %v", item.Key, err)
return nil, false
}
if err := instance.CheckAndSetDefaults(); err != nil {
s.log.Warnf("Skipping instance at %s: %v", item.Key, err)
return nil, false
}
if !req.Match(&instance) {
return nil, false
}
return &instance, true
})
}
// GetRawInstance gets an instance resource by server ID.
func (s *PresenceService) GetRawInstance(ctx context.Context, serverID string) (types.Instance, []byte, error) {
item, err := s.Get(ctx, backend.Key(instancePrefix, serverID))
if err != nil {
if trace.IsNotFound(err) {
return nil, nil, trace.NotFound("failed to locate instance %q", serverID)
}
return nil, nil, trace.Wrap(err)
}
var instance types.InstanceV1
if err := utils.FastUnmarshal(item.Value, &instance); err != nil {
return nil, nil, trace.BadParameter("failed to unmarshal instance %q: %v", serverID, err)
}
if err := instance.CheckAndSetDefaults(); err != nil {
return nil, nil, trace.BadParameter("instance %q appears malformed: %v", serverID, err)
}
return &instance, item.Value, nil
}
// CompareAndSwapInstance creates or updates the underlying instance resource based on the currently
// expected value. The first call to this method should use the value returned by GetRawInstance for the
// 'expect' parameter. Subsequent calls should use the value returned by this method.
func (s *PresenceService) CompareAndSwapInstance(ctx context.Context, instance types.Instance, expect []byte) ([]byte, error) {
if err := instance.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
// instance resource expiry is calculated relative to LastSeen and/or the longest living
// control log entry (whichever is further in the future).
if instance.GetLastSeen().IsZero() || instance.Expiry().IsZero() {
instance.SetLastSeen(s.Clock().Now().UTC())
instance.SyncLogAndResourceExpiry(apidefaults.ServerAnnounceTTL)
}
v1, ok := instance.(*types.InstanceV1)
if !ok {
return nil, trace.BadParameter("unexpected type %T, expected %T", instance, v1)
}
value, err := utils.FastMarshal(v1)
if err != nil {
return nil, trace.Errorf("failed to marshal Instance: %v", err)
}
item := backend.Item{
Key: backend.Key(instancePrefix, instance.GetName()),
Value: value,
Expires: instance.Expiry(),
}
if len(expect) == 0 {
// empty 'expect' means we expect nonexistence, so we use Create instead of
// the regular CompareAndSwap.
_, err = s.Backend.Create(ctx, item)
if err != nil {
if trace.IsAlreadyExists(err) {
return nil, trace.CompareFailed("instance concurrently created")
}
return nil, trace.Wrap(err)
}
return value, nil
}
_, err = s.Backend.CompareAndSwap(ctx, backend.Item{
Key: item.Key,
Value: expect,
}, item)
if err != nil {
if trace.IsCompareFailed(err) {
return nil, trace.CompareFailed("instance concurrently updated")
}
return nil, trace.Wrap(err)
}
return value, nil
}
const instancePrefix = "instances"

View file

@ -0,0 +1,246 @@
/*
Copyright 2022 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"testing"
"time"
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/internalutils/stream"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/backend/memory"
)
// TestInstanceCAS verifies basic expected behavior of instance creation/update.
func TestInstanceCAS(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
backend, err := memory.New(memory.Config{
Context: ctx,
Clock: clockwork.NewFakeClock(),
})
require.NoError(t, err)
defer backend.Close()
presence := NewPresenceService(backend)
instance1, err := types.NewInstance(uuid.NewString(), types.InstanceSpecV1{})
require.NoError(t, err)
raw1, err := presence.CompareAndSwapInstance(ctx, instance1, nil)
require.NoError(t, err)
// verify that "create" style compare and swaps are now rejected
_, err = presence.CompareAndSwapInstance(ctx, instance1, nil)
require.Error(t, err)
require.True(t, trace.IsCompareFailed(err))
// get the inserted instance
instances, err := stream.Collect(presence.GetInstances(ctx, types.InstanceFilter{}))
require.NoError(t, err)
require.Len(t, instances, 1)
// verify that expiry and last_seen are automatically set to expected values.
exp1 := instances[0].Expiry()
seen1 := instances[0].GetLastSeen()
require.False(t, exp1.IsZero())
require.False(t, seen1.IsZero())
require.Equal(t, presence.Clock().Now().UTC(), seen1)
require.Equal(t, seen1.Add(apidefaults.ServerAnnounceTTL), exp1)
require.True(t, exp1.After(presence.Clock().Now()))
require.False(t, exp1.After(presence.Clock().Now().Add(apidefaults.ServerAnnounceTTL*2)))
// update the instance control log
instance1.AppendControlLog(types.InstanceControlLogEntry{
Type: "testing",
ID: 1,
TTL: time.Hour * 24,
})
instance1.SyncLogAndResourceExpiry(apidefaults.ServerAnnounceTTL)
// verify expected increase in ttl to accommodate custom log entry TTL (sanity check
// to differentiate bugs in SyncLogAndResourceExpiry from bugs in presence/backend).
require.Equal(t, seen1.Add(time.Hour*24), instance1.Expiry())
// perform normal compare and swap using raw value from previous successful call
_, err = presence.CompareAndSwapInstance(ctx, instance1, raw1)
require.NoError(t, err)
// verify that raw value from previous successful CaS no longer works
_, err = presence.CompareAndSwapInstance(ctx, instance1, raw1)
require.Error(t, err)
require.True(t, trace.IsCompareFailed(err))
// load new instance state
instances2, err := stream.Collect(presence.GetInstances(ctx, types.InstanceFilter{}))
require.NoError(t, err)
require.Len(t, instances2, 1)
// ensure that ttl and log were preserved
require.Equal(t, seen1.Add(time.Hour*24), instances2[0].Expiry())
require.Len(t, instances2[0].GetControlLog(), 1)
}
// TestInstanceFiltering tests basic filtering options. A sufficiently large
// instance count is used to ensure that queries span many pages.
func TestInstanceFiltering(t *testing.T) {
const count = 100_000
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// NOTE: backend must be memory, since parallel subtests are used (makes correct cleanup of
// filesystem state tricky).
backend, err := memory.New(memory.Config{
Context: ctx,
Clock: clockwork.NewFakeClock(),
})
require.NoError(t, err)
defer backend.Close()
presence := NewPresenceService(backend)
// store an odd and an even uuid for later use in queries
var evenID, oddID string
evenServices := []types.SystemRole{"even"}
oddServices := []types.SystemRole{"odd"}
evenVersion := "v2.4.6"
oddVersion := "v3.5.7"
allServices := append(evenServices, oddServices...)
// create a bunch of instances with an even mix of odd/even "services".
for i := 0; i < count; i++ {
serverID := uuid.NewString()
var services []types.SystemRole
var version string
if i%2 == 0 {
services = evenServices
version = evenVersion
evenID = serverID
} else {
services = oddServices
version = oddVersion
oddID = serverID
}
instance, err := types.NewInstance(serverID, types.InstanceSpecV1{
Services: services,
Version: version,
})
require.NoError(t, err)
_, err = presence.CompareAndSwapInstance(ctx, instance, nil)
require.NoError(t, err)
}
// check a few simple queries
tts := []struct {
filter types.InstanceFilter
even, odd int
desc string
}{
{
filter: types.InstanceFilter{
Services: evenServices,
},
even: count / 2,
desc: "all even services",
},
{
filter: types.InstanceFilter{
ServerID: oddID,
},
odd: 1,
desc: "single-instance direct",
},
{
filter: types.InstanceFilter{
ServerID: evenID,
Services: oddServices,
},
desc: "non-matching id+service pair",
},
{
filter: types.InstanceFilter{
ServerID: evenID,
Services: evenServices,
},
even: 1,
desc: "matching id+service pair",
},
{
filter: types.InstanceFilter{
Services: allServices,
},
even: count / 2,
odd: count / 2,
desc: "all services",
},
{
filter: types.InstanceFilter{
Version: evenVersion,
},
even: count / 2,
desc: "single version",
},
}
for _, testCase := range tts {
tt := testCase
t.Run(tt.desc, func(t *testing.T) {
t.Parallel()
// load instances with given filter
instances, err := stream.Collect(presence.GetInstances(ctx, tt.filter))
require.NoError(t, err)
// aggregate number of s
var even, odd int
for _, instance := range instances {
require.Len(t, instance.GetServices(), 1)
switch service := instance.GetServices()[0]; service {
case "even":
even++
case "odd":
odd++
default:
t.Fatalf("Unexpected service: %+v", service)
}
}
require.Equal(t, tt.even, even)
require.Equal(t, tt.odd, odd)
})
}
}

View file

@ -38,6 +38,10 @@ type NodesGetter interface {
// Presence records and reports the presence of all components
// of the cluster - Nodes, Proxies and SSH nodes
type Presence interface {
// Inventory is a subset of Presence dedicated to tracking the status of all
// teleport instances independent of any specific service.
Inventory
// Semaphores is responsible for semaphore handling
types.Semaphores
@ -237,3 +241,9 @@ type Presence interface {
// UpsertDatabaseService updates an existing DatabaseService resource.
UpsertDatabaseService(context.Context, types.DatabaseService) (*types.KeepAlive, error)
}
// PresenceInternal extends the Presence interface with auth-specific internal methods.
type PresenceInternal interface {
Presence
InventoryInternal
}

View file

@ -77,6 +77,7 @@ func NewPresetEditorRole() types.Role {
types.NewRule(types.KindLicense, RO()),
types.NewRule(types.KindDownload, RO()),
types.NewRule(types.KindDatabaseService, RO()),
types.NewRule(types.KindInstance, RO()),
// Please see defaultAllowRules when adding a new rule.
},
},
@ -121,6 +122,7 @@ func NewPresetAccessRole() types.Role {
Verbs: []string{types.VerbRead, types.VerbList},
Where: "contains(session.participants, user.metadata.name)",
},
types.NewRule(types.KindInstance, RO()),
// Please see defaultAllowRules when adding a new rule.
},
},

View file

@ -27,6 +27,8 @@ import (
"github.com/gravitational/trace"
jsoniter "github.com/json-iterator/go"
kyaml "k8s.io/apimachinery/pkg/util/yaml"
"github.com/gravitational/teleport/api/internalutils/stream"
)
// ToJSON converts a single YAML document into a JSON document
@ -79,6 +81,16 @@ var SafeConfig = jsoniter.Config{
SortMapKeys: true,
}.Froze()
// SafeConfigWithIndent is equivalent to SafeConfig except with indentation
// enabled.
var SafeConfigWithIndent = jsoniter.Config{
IndentionStep: 2,
EscapeHTML: false,
MarshalFloatWith6Digits: true, // will lose precision
ObjectFieldMustBeSimpleString: true, // do not unescape object field
SortMapKeys: true,
}.Froze()
// FastMarshal uses the json-iterator library for fast JSON marshaling.
// Note, this function unmarshals floats with 6 digits precision.
func FastMarshal(v interface{}) ([]byte, error) {
@ -109,6 +121,29 @@ func WriteJSON(w io.Writer, values interface{}) error {
return trace.Wrap(err)
}
// StremJSONArray streams the elements of a stream.Stream as a json array
// with optional indentation (used to stream to CLI).
func StreamJSONArray[T any](items stream.Stream[T], out io.Writer, indent bool) error {
cfg := SafeConfig
if indent {
cfg = SafeConfigWithIndent
}
stream := jsoniter.NewStream(cfg, out, 512)
stream.WriteArrayStart()
var prev bool
for items.Next() {
if prev {
// if a previous item was written to the array, we need to
// write a comma first.
stream.WriteMore()
}
stream.WriteVal(items.Item())
prev = true
}
stream.WriteArrayEnd()
return trace.NewAggregate(items.Done(), stream.Flush())
}
const yamlDocDelimiter = "---"
// WriteYAML detects whether value is a list

View file

@ -17,10 +17,12 @@ package utils
import (
"bytes"
"fmt"
"testing"
"github.com/stretchr/testify/require"
"github.com/gravitational/teleport/api/internalutils/stream"
"github.com/gravitational/teleport/api/types"
)
@ -50,3 +52,69 @@ func TestMarshalMapConsistency(t *testing.T) {
require.Truef(t, bytes.Equal(val, compareTo), "maps must serialize consistently (attempt %d)", i)
}
}
type testObj struct {
S string `json:"s"`
N int `json:"n"`
}
func TestStreamJSONArray(t *testing.T) {
objects := []testObj{
{"spam", 1},
{"eggs", 2},
{"zero", 0},
{"five", 5},
{"", 100},
{},
{"neg", -100},
}
var objBuf bytes.Buffer
err := StreamJSONArray(stream.Slice(objects), &objBuf, true)
require.NoError(t, err)
var objOut []testObj
err = FastUnmarshal(objBuf.Bytes(), &objOut)
require.NoError(t, err)
require.Equal(t, objects, objOut)
numbers := []int{
0,
-1,
12,
15,
1000,
0,
1,
}
var numBuf bytes.Buffer
err = StreamJSONArray(stream.Slice(numbers), &numBuf, false)
require.NoError(t, err)
var numOut []int
err = FastUnmarshal(numBuf.Bytes(), &numOut)
require.NoError(t, err)
require.Equal(t, numbers, numOut)
var iterative []string
for i := 0; i < 100; i++ {
var iterBuf bytes.Buffer
err = StreamJSONArray(stream.Slice(iterative), &iterBuf, false)
require.NoError(t, err)
var iterOut []string
err = FastUnmarshal(iterBuf.Bytes(), &iterOut)
require.NoError(t, err)
if len(iterative) == 0 {
require.Len(t, iterOut, 0)
} else {
require.Equal(t, iterative, iterOut)
}
// we add at the end of the loop so that this test case
// covers the empty slice case.
iterative = append(iterative, fmt.Sprintf("%d", i))
}
}

View file

@ -19,15 +19,22 @@ package common
import (
"context"
"fmt"
"os"
"strings"
"time"
"github.com/gravitational/kingpin"
"github.com/gravitational/trace"
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client/proto"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/asciitable"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/service"
"github.com/gravitational/teleport/lib/utils"
vc "github.com/gravitational/teleport/lib/versioncontrol"
)
// InventoryCommand implements the `tctl inventory` family of commands.
@ -38,7 +45,16 @@ type InventoryCommand struct {
getConnected bool
format string
controlLog bool
version string
services string
inventoryStatus *kingpin.CmdClause
inventoryList *kingpin.CmdClause
inventoryPing *kingpin.CmdClause
}
@ -50,8 +66,14 @@ func (c *InventoryCommand) Initialize(app *kingpin.Application, config *service.
c.inventoryStatus = inventory.Command("status", "Show inventory status summary")
c.inventoryStatus.Flag("connected", "Show locally connected instances summary").BoolVar(&c.getConnected)
c.inventoryList = inventory.Command("list", "List teleport instance inventory").Alias("ls")
c.inventoryList.Flag("version", "Filter output by version").StringVar(&c.version)
c.inventoryList.Flag("services", "Filter output by service (node,kube,proxy,etc)").StringVar(&c.services)
c.inventoryList.Flag("format", "Output format, 'text' or 'json'").Default(teleport.Text).StringVar(&c.format)
c.inventoryPing = inventory.Command("ping", "Ping locally connected instance")
c.inventoryPing.Arg("server-id", "ID of target server").Required().StringVar(&c.serverID)
c.inventoryPing.Flag("control-log", "Use control log for ping").Hidden().BoolVar(&c.controlLog)
}
// TryRun takes the CLI command as an argument (like "inventory status") and executes it.
@ -59,6 +81,8 @@ func (c *InventoryCommand) TryRun(ctx context.Context, cmd string, client auth.C
switch cmd {
case c.inventoryStatus.FullCommand():
err = c.Status(ctx, client)
case c.inventoryList.FullCommand():
err = c.List(ctx, client)
case c.inventoryPing.FullCommand():
err = c.Ping(ctx, client)
default:
@ -91,14 +115,88 @@ func (c *InventoryCommand) Status(ctx context.Context, client auth.ClientI) erro
}
table.AddRow([]string{h.ServerID, strings.Join(services, ","), h.Version})
}
fmt.Println(table.AsBuffer().String())
_, err := table.AsBuffer().WriteTo(os.Stdout)
return trace.Wrap(err)
}
return nil
}
func (c *InventoryCommand) List(ctx context.Context, client auth.ClientI) error {
var services []types.SystemRole
var err error
if c.services != "" {
services, err = types.ParseTeleportRoles(c.services)
if err != nil {
return trace.Wrap(err)
}
}
instances := client.GetInstances(ctx, types.InstanceFilter{
Services: services,
Version: vc.Normalize(c.version),
})
switch c.format {
case teleport.Text:
table := asciitable.MakeTable([]string{"ServerID", "Hostname", "Services", "Version", "Status"})
now := time.Now().UTC()
for instances.Next() {
instance := instances.Item()
services := make([]string, 0, len(instance.GetServices()))
for _, s := range instance.GetServices() {
services = append(services, string(s))
}
table.AddRow([]string{
instance.GetName(),
instance.GetHostname(),
strings.Join(services, ","),
instance.GetTeleportVersion(),
makeInstanceStatus(now, instance),
})
}
if err := instances.Done(); err != nil {
return trace.Wrap(err)
}
_, err := table.AsBuffer().WriteTo(os.Stdout)
return trace.Wrap(err)
case teleport.JSON:
if err := utils.StreamJSONArray(instances, os.Stdout, true); err != nil {
return trace.Wrap(err)
}
fmt.Fprintf(os.Stdout, "\n")
return nil
default:
return trace.BadParameter("unknown format %q, must be one of [%q, %q]", c.format, teleport.Text, teleport.JSON)
}
}
// makeInstanceStatus builds the instance status string. This currently distinguishes online/offline, but the
// plan is to eventually use the status field to give users insight at a glance into the current status of
// ongoing upgrades as well. Ex:
//
// Status
// -----------------------------------------------
// online (1m7s ago)
// installing -> v1.2.3 (17s ago)
// online, upgrade recommended -> v1.2.3 (20s ago)
// churned during install -> v1.2.3 (6m ago)
// online, install soon -> v1.2.3 (46s ago)
func makeInstanceStatus(now time.Time, instance types.Instance) string {
status := "offline"
if instance.GetLastSeen().Add(apidefaults.ServerAnnounceTTL).After(now) {
status = "online"
}
return fmt.Sprintf("%s (%s ago)", status, now.Sub(instance.GetLastSeen()).Round(time.Second))
}
func (c *InventoryCommand) Ping(ctx context.Context, client auth.ClientI) error {
rsp, err := client.PingInventory(ctx, proto.InventoryPingRequest{
ServerID: c.serverID,
ControlLog: c.controlLog,
})
if err != nil {
return trace.Wrap(err)