Partial cache healthiness (#24080)

* Partial cache healthiness

* IsSupersetOf -> Contains

* NewWatchStatus fixes

* make map[kindSubKind]types.WatchKind with capacity

* fixed TestRootNetwork?

* post-rebase fix

* fixes regarding docs/comments

* const noSubKind = ""

* preallocate slices for validKinds

* DisablePartialHealth

* test fixes

* fixed struct logging field name

* use custom identity in TestEventsPermissionsPartialSuccess

* fixed TestNodeCAFiltering

* don't look at versions in events APIs

* RFD state: implemented
This commit is contained in:
Andrey Bulgakov 2023-04-15 13:07:43 -07:00 committed by GitHub
parent 4c0113df9e
commit ad80fa3e17
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 1017 additions and 216 deletions

View file

@ -31,6 +31,13 @@ func EventToGRPC(in types.Event) (*proto.Event, error) {
Type: eventType,
}
if in.Type == types.OpInit {
watchStatus, ok := in.Resource.(*types.WatchStatusV1)
if !ok {
return nil, trace.BadParameter("unexpected resource type %T for Init event", in.Resource)
}
out.Resource = &proto.Event_WatchStatus{
WatchStatus: watchStatus,
}
return &out, nil
}
switch r := in.Resource.(type) {
@ -229,6 +236,9 @@ func EventFromGRPC(in proto.Event) (*types.Event, error) {
Type: eventType,
}
if eventType == types.OpInit {
if r := in.GetWatchStatus(); r != nil {
out.Resource = r
}
return &out, nil
}
if r := in.GetResourceHeader(); r != nil {

View file

@ -30,7 +30,11 @@ import (
// NewWatcher returns a new streamWatcher
func (c *Client) NewWatcher(ctx context.Context, watch types.Watch) (types.Watcher, error) {
cancelCtx, cancel := context.WithCancel(ctx)
stream, err := c.grpc.WatchEvents(cancelCtx, &proto.Watch{Kinds: watch.Kinds})
protoWatch := proto.Watch{
Kinds: watch.Kinds,
AllowPartialSuccess: watch.AllowPartialSuccess,
}
stream, err := c.grpc.WatchEvents(cancelCtx, &protoWatch)
if err != nil {
cancel()
return nil, trail.FromGRPC(err)

View file

@ -5421,6 +5421,7 @@ enum HeadlessAuthenticationState {
}
// WatchKind specifies resource kind to watch
// When adding fields to this struct, make sure to review/update WatchKind.Contains method.
message WatchKind {
// Kind is a resource kind to watch
string Kind = 1 [(gogoproto.jsontag) = "kind"];
@ -5436,6 +5437,7 @@ message WatchKind {
// SubKind is a resource subkind to watch
string SubKind = 5 [(gogoproto.jsontag) = "sub_kind,omitempty"];
// Version optionally specifies the resource version to watch.
// Currently this field is ignored.
string Version = 6 [(gogoproto.jsontag) = "version,omitempty"];
}

View file

@ -225,6 +225,9 @@ const (
// MetaNameClusterName is the name of a configuration resource for cluster name.
MetaNameClusterName = "cluster-name"
// MetaNameWatchStatus is the name of a watch status resource.
MetaNameWatchStatus = "watch-status"
// KindStaticTokens is a type of configuration resource that contains static tokens.
KindStaticTokens = "static_tokens"
@ -348,6 +351,9 @@ const (
// window singleton resource.
MetaNameClusterMaintenanceConfig = "cluster-maintenance-config"
// KindWatchStatus is a kind for WatchStatus resource which contains information about a successful Watch request.
KindWatchStatus = "watch_status"
// V6 is the sixth version of resources.
V6 = "v6"
@ -369,7 +375,7 @@ const (
)
// WebSessionSubKinds lists subkinds of web session resources
var WebSessionSubKinds = []string{KindAppSession, KindWebSession, KindSnowflakeSession}
var WebSessionSubKinds = []string{KindAppSession, KindWebSession, KindSnowflakeSession, KindSAMLIdPSession}
const (
// VerbList is used to list all objects. Does not imply the ability to read a single object.

View file

@ -97,6 +97,14 @@ type Watch struct {
// MetricComponent is used for reporting
MetricComponent string
// AllowPartialSuccess enables a mode in which a watch will succeed if some of the requested kinds aren't available.
// When this is set, the client must inspect the WatchStatus resource attached to the first OpInit event emitted
// by the watcher for a list of kinds confirmed by the event source. Kinds requested but omitted from the confirmation
// will not be included in the event stream.
// If AllowPartialSuccess was set, but OpInit doesn't have a resource attached, it means that the event source
// doesn't support partial success and all requested resource kinds should be considered confirmed.
AllowPartialSuccess bool
}
// Matches attempts to determine if the supplied event matches
@ -147,6 +155,32 @@ func (kind WatchKind) IsTrivial() bool {
return kind.SubKind == "" && kind.Name == "" && kind.Version == "" && !kind.LoadSecrets && len(kind.Filter) == 0
}
// Contains determines whether kind (receiver) targets exactly the same or a wider scope of events as the given subset kind.
// Generally this means that if kind specifies a filter, its subset must have exactly the same or a narrower one.
// Currently, does not take resource versions into account.
func (kind WatchKind) Contains(subset WatchKind) bool {
// kind and subkind must always be equal
if kind.Kind != subset.Kind || kind.SubKind != subset.SubKind {
return false
}
if kind.Name != "" && kind.Name != subset.Name {
return false
}
if !kind.LoadSecrets && subset.LoadSecrets {
return false
}
for k, v := range kind.Filter {
if subset.Filter[k] != v {
return false
}
}
return true
}
// Events returns new events interface
type Events interface {
// NewWatcher returns a new event watcher

160
api/types/events_test.go Normal file
View file

@ -0,0 +1,160 @@
/*
*
* Copyright 2023 Gravitational, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package types
import (
"testing"
"github.com/stretchr/testify/require"
)
// TestWatchKindContains tests that the WatchKind.Contains method correctly detects whether its receiver contains its
// argument.
func TestWatchKindContains(t *testing.T) {
testCases := []struct {
name string
kind WatchKind
other WatchKind
assertion require.BoolAssertionFunc
}{
{
name: "yes: kind and subkind match",
kind: WatchKind{
Kind: "a",
SubKind: "b",
},
other: WatchKind{
Kind: "a",
SubKind: "b",
},
assertion: require.True,
},
{
name: "no: kind and subkind don't match",
kind: WatchKind{
Kind: "a",
SubKind: "b",
},
other: WatchKind{
Kind: "a",
SubKind: "c",
},
assertion: require.False,
},
{
name: "yes: only subset specifies name",
kind: WatchKind{
Kind: "a",
SubKind: "b",
},
other: WatchKind{
Kind: "a",
SubKind: "b",
Name: "c",
},
assertion: require.True,
},
{
name: "no: subset is missing name when superset has one",
kind: WatchKind{
Kind: "a",
SubKind: "b",
Name: "c",
},
other: WatchKind{
Kind: "a",
SubKind: "b",
},
assertion: require.False,
},
{
name: "no: different names",
kind: WatchKind{
Kind: "a",
SubKind: "b",
Name: "c",
},
other: WatchKind{
Kind: "a",
SubKind: "b",
Name: "d",
},
assertion: require.False,
},
{
name: "yes: subset has narrower filter",
kind: WatchKind{
Kind: "a",
SubKind: "b",
Filter: map[string]string{
"c": "d",
},
},
other: WatchKind{
Kind: "a",
SubKind: "b",
Filter: map[string]string{
"c": "d",
"e": "f",
},
},
assertion: require.True,
},
{
name: "no: subset has no filter",
kind: WatchKind{
Kind: "a",
SubKind: "b",
Filter: map[string]string{
"c": "d",
},
},
other: WatchKind{
Kind: "a",
SubKind: "b",
},
assertion: require.False,
},
{
name: "no: subset has wider filter",
kind: WatchKind{
Kind: "a",
SubKind: "b",
Filter: map[string]string{
"c": "d",
"e": "f",
},
},
other: WatchKind{
Kind: "a",
SubKind: "b",
Filter: map[string]string{
"e": "f",
},
},
assertion: require.False,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tc.assertion(t, tc.kind.Contains(tc.other))
})
}
}

View file

@ -14406,6 +14406,7 @@ func (m *HeadlessAuthentication) XXX_DiscardUnknown() {
var xxx_messageInfo_HeadlessAuthentication proto.InternalMessageInfo
// WatchKind specifies resource kind to watch
// When adding fields to this struct, make sure to review/update WatchKind.Contains method.
type WatchKind struct {
// Kind is a resource kind to watch
Kind string `protobuf:"bytes,1,opt,name=Kind,proto3" json:"kind"`
@ -14421,6 +14422,7 @@ type WatchKind struct {
// SubKind is a resource subkind to watch
SubKind string `protobuf:"bytes,5,opt,name=SubKind,proto3" json:"sub_kind,omitempty"`
// Version optionally specifies the resource version to watch.
// Currently this field is ignored.
Version string `protobuf:"bytes,6,opt,name=Version,proto3" json:"version,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`

119
api/types/watch_status.go Normal file
View file

@ -0,0 +1,119 @@
/*
*
* Copyright 2023 Gravitational, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package types
import (
"time"
"github.com/gravitational/teleport/api/defaults"
)
// WatchStatus contains information about a successful Watch request.
type WatchStatus interface {
Resource
// GetKinds returns the list of kinds confirmed by the Watch request.
GetKinds() []WatchKind
// SetKinds sets the list of kinds confirmed by the Watch request.
SetKinds([]WatchKind)
}
// GetKind returns the watch status resource kind.
func (w *WatchStatusV1) GetKind() string {
return w.Kind
}
// GetSubKind returns the watch status resource subkind.
func (w *WatchStatusV1) GetSubKind() string {
return w.SubKind
}
// SetSubKind sets the watch status resource subkind.
func (w *WatchStatusV1) SetSubKind(k string) {
w.SubKind = k
}
// GetVersion returns the watch status resource version.
func (w *WatchStatusV1) GetVersion() string {
return w.Version
}
// GetName returns the watch status resource name.
func (w *WatchStatusV1) GetName() string {
return w.Metadata.Name
}
// SetName sets the watch status resource name.
func (w *WatchStatusV1) SetName(name string) {
w.Metadata.Name = name
}
// Expiry returns the watch status resource expiration time.
func (w *WatchStatusV1) Expiry() time.Time {
return w.Metadata.Expiry()
}
// SetExpiry sets the watch status resource expiration time.
func (w *WatchStatusV1) SetExpiry(time time.Time) {
w.Metadata.SetExpiry(time)
}
// GetMetadata returns the watch status resource metadata.
func (w *WatchStatusV1) GetMetadata() Metadata {
return w.Metadata
}
// GetResourceID returns the watch status resource ID.
func (w *WatchStatusV1) GetResourceID() int64 {
return w.Metadata.ID
}
// SetResourceID sets the watch status resource ID.
func (w *WatchStatusV1) SetResourceID(id int64) {
w.Metadata.ID = id
}
// CheckAndSetDefaults checks and sets default values for any missing fields.
func (w *WatchStatusV1) CheckAndSetDefaults() error {
return nil
}
// GetKinds returns the list of kinds confirmed by the Watch request.
func (w *WatchStatusV1) GetKinds() []WatchKind {
return w.Spec.Kinds
}
// SetKinds sets the list of kinds confirmed by the Watch request.
func (w *WatchStatusV1) SetKinds(kinds []WatchKind) {
w.Spec.Kinds = kinds
}
// NewWatchStatus returns a new WatchStatus resource.
func NewWatchStatus(kinds []WatchKind) *WatchStatusV1 {
return &WatchStatusV1{
Kind: KindWatchStatus,
Version: V1,
Metadata: Metadata{
Name: MetaNameWatchStatus,
Namespace: defaults.Namespace,
},
Spec: WatchStatusSpecV1{
Kinds: kinds,
},
}
}

View file

@ -1332,6 +1332,8 @@ func (a *ServerWithRoles) NewWatcher(ctx context.Context, watch types.Watch) (ty
if len(watch.Kinds) == 0 {
return nil, trace.AccessDenied("can't setup global watch")
}
validKinds := make([]types.WatchKind, 0, len(watch.Kinds))
for _, kind := range watch.Kinds {
// Check the permissions for data of each kind. For watching, most
// kinds of data just need a Read permission, but some have more
@ -1343,25 +1345,33 @@ func (a *ServerWithRoles) NewWatcher(ctx context.Context, watch types.Watch) (ty
verb = types.VerbRead
}
if err := a.action(apidefaults.Namespace, types.KindCertAuthority, verb); err != nil {
if watch.AllowPartialSuccess {
continue
}
return nil, trace.Wrap(err)
}
case types.KindAccessRequest:
var filter types.AccessRequestFilter
if err := filter.FromMap(kind.Filter); err != nil {
if watch.AllowPartialSuccess {
continue
}
return nil, trace.Wrap(err)
}
if filter.User == "" || a.currentUserAction(filter.User) != nil {
if err := a.action(apidefaults.Namespace, types.KindAccessRequest, types.VerbRead); err != nil {
if watch.AllowPartialSuccess {
continue
}
return nil, trace.Wrap(err)
}
}
case types.KindAppServer:
if err := a.action(apidefaults.Namespace, types.KindAppServer, types.VerbRead); err != nil {
return nil, trace.Wrap(err)
}
case types.KindWebSession:
var filter types.WebSessionFilter
if err := filter.FromMap(kind.Filter); err != nil {
if watch.AllowPartialSuccess {
continue
}
return nil, trace.Wrap(err)
}
resource := types.KindWebSession
@ -1371,35 +1381,29 @@ func (a *ServerWithRoles) NewWatcher(ctx context.Context, watch types.Watch) (ty
}
if filter.User == "" || a.currentUserAction(filter.User) != nil {
if err := a.action(apidefaults.Namespace, resource, types.VerbRead); err != nil {
if watch.AllowPartialSuccess {
continue
}
return nil, trace.Wrap(err)
}
}
case types.KindWebToken:
if err := a.action(apidefaults.Namespace, types.KindWebToken, types.VerbRead); err != nil {
return nil, trace.Wrap(err)
}
case types.KindRemoteCluster:
if err := a.action(apidefaults.Namespace, types.KindRemoteCluster, types.VerbRead); err != nil {
return nil, trace.Wrap(err)
}
case types.KindDatabaseServer:
if err := a.action(apidefaults.Namespace, types.KindDatabaseServer, types.VerbRead); err != nil {
return nil, trace.Wrap(err)
}
case types.KindKubeServer:
if err := a.action(apidefaults.Namespace, types.KindKubeServer, types.VerbRead); err != nil {
return nil, trace.Wrap(err)
}
case types.KindWindowsDesktopService:
if err := a.action(apidefaults.Namespace, types.KindWindowsDesktopService, types.VerbRead); err != nil {
return nil, trace.Wrap(err)
}
default:
if err := a.action(apidefaults.Namespace, kind.Kind, types.VerbRead); err != nil {
if watch.AllowPartialSuccess {
continue
}
return nil, trace.Wrap(err)
}
}
validKinds = append(validKinds, kind)
}
if len(validKinds) == 0 {
return nil, trace.BadParameter("none of the requested kinds can be watched")
}
watch.Kinds = validKinds
switch {
case a.hasBuiltinRole(types.RoleProxy):
watch.QueueSize = defaults.ProxyQueueSize

View file

@ -358,8 +358,9 @@ func (g *GRPCServer) WatchEvents(watch *proto.Watch, stream proto.AuthService_Wa
return trace.Wrap(err)
}
servicesWatch := types.Watch{
Name: auth.User.GetName(),
Kinds: watch.Kinds,
Name: auth.User.GetName(),
Kinds: watch.Kinds,
AllowPartialSuccess: watch.AllowPartialSuccess,
}
if clusterName, err := auth.GetClusterName(); err == nil {

View file

@ -3382,6 +3382,93 @@ func TestEventsPermissions(t *testing.T) {
}
}
// TestEventsPermissionsPartialSuccess verifies that in partial success mode NewWatcher can still succeed
// if caller lacks permission to watch only some of the requested resource kinds.
func TestEventsPermissionsPartialSuccess(t *testing.T) {
t.Parallel()
testCases := []struct {
name string
watch types.Watch
expectedConfirmedKinds []types.WatchKind
}{
{
name: "no permission for any of the requested kinds",
watch: types.Watch{
Kinds: []types.WatchKind{
{Kind: types.KindUser},
{Kind: types.KindRole},
},
AllowPartialSuccess: true,
},
},
{
name: "has permission only for some of the requested kinds",
watch: types.Watch{
Kinds: []types.WatchKind{
{Kind: types.KindUser},
{Kind: types.KindRole},
{Kind: types.KindStaticTokens},
},
AllowPartialSuccess: true,
},
expectedConfirmedKinds: []types.WatchKind{
{Kind: types.KindStaticTokens},
},
},
{
name: "has permission only for some kinds but partial success is not enabled",
watch: types.Watch{
Kinds: []types.WatchKind{
{Kind: types.KindUser},
{Kind: types.KindRole},
{Kind: types.KindStaticTokens},
},
},
},
}
ctx := context.Background()
tt := setupAuthContext(ctx, t)
testUser, testRole, err := CreateUserAndRole(tt.server.Auth(), "test", nil, []types.Rule{
types.NewRule(types.KindStaticTokens, services.RO()),
})
require.NoError(t, err)
require.NoError(t, tt.server.Auth().UpsertRole(ctx, testRole))
testIdentity := TestUser(testUser.GetName())
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
client, err := tt.server.NewClient(testIdentity)
require.NoError(t, err)
defer client.Close()
w, err := client.NewWatcher(ctx, tc.watch)
require.NoError(t, err)
defer w.Close()
select {
case event := <-w.Events():
if len(tc.expectedConfirmedKinds) > 0 {
require.Equal(t, event.Type, types.OpInit)
watchStatus, ok := event.Resource.(types.WatchStatus)
require.True(t, ok)
require.Equal(t, tc.expectedConfirmedKinds, watchStatus.GetKinds())
} else {
t.Fatal("unexpected event from watcher that is supposed to fail")
}
case <-w.Done():
if len(tc.expectedConfirmedKinds) > 0 {
t.Fatalf("Watcher exited with error %v", w.Error())
}
require.Error(t, w.Error())
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for watcher")
}
})
}
}
// TestEvents tests events suite
func TestEvents(t *testing.T) {
t.Parallel()

269
lib/cache/cache.go vendored
View file

@ -92,7 +92,6 @@ func ForAuth(cfg Config) Config {
{Kind: types.KindAccessRequest},
{Kind: types.KindAppServer},
{Kind: types.KindApp},
{Kind: types.KindAppServer, Version: types.V2},
{Kind: types.KindWebSession, SubKind: types.KindSAMLIdPSession},
{Kind: types.KindWebSession, SubKind: types.KindSnowflakeSession},
{Kind: types.KindWebSession, SubKind: types.KindAppSession},
@ -116,6 +115,9 @@ func ForAuth(cfg Config) Config {
{Kind: types.KindIntegration},
}
cfg.QueueSize = defaults.AuthQueueSize
// We don't want to enable partial health for auth cache because auth uses an event stream
// from the local backend which must support all resource kinds. We want to catch it early if it doesn't.
cfg.DisablePartialHealth = true
return cfg
}
@ -139,7 +141,6 @@ func ForProxy(cfg Config) Config {
{Kind: types.KindReverseTunnel},
{Kind: types.KindTunnelConnection},
{Kind: types.KindAppServer},
{Kind: types.KindAppServer, Version: types.V2},
{Kind: types.KindApp},
{Kind: types.KindWebSession, SubKind: types.KindSAMLIdPSession},
{Kind: types.KindWebSession, SubKind: types.KindSnowflakeSession},
@ -193,7 +194,6 @@ func ForRemoteProxy(cfg Config) Config {
{Kind: types.KindReverseTunnel},
{Kind: types.KindTunnelConnection},
{Kind: types.KindAppServer},
{Kind: types.KindAppServer, Version: types.V2},
{Kind: types.KindRemoteCluster},
{Kind: types.KindDatabaseServer},
{Kind: types.KindDatabaseService},
@ -226,7 +226,6 @@ func ForOldRemoteProxy(cfg Config) Config {
{Kind: types.KindReverseTunnel},
{Kind: types.KindTunnelConnection},
{Kind: types.KindAppServer},
{Kind: types.KindAppServer, Version: types.V2},
{Kind: types.KindRemoteCluster},
{Kind: types.KindDatabaseServer},
{Kind: types.KindDatabaseService},
@ -402,12 +401,12 @@ type Cache struct {
Logger *log.Entry
// rw is used to prevent reads of invalid cache states. From a
// memory-safety perspective, this RWMutex is just used to protect
// the `ok` field. *However*, cache reads must hold the read lock
// for the duration of the read, not just when checking the `ok`
// field. Since the write lock must be held in order to modify
// the `ok` field, this serves to ensure that all in-progress reads
// complete *before* a reset can begin.
// memory-safety perspective, this RWMutex is used to protect
// the `ok` and `confirmedKinds` fields. *However*, cache reads
// must hold the read lock for the duration of the read, not just
// when checking the `ok` or `confirmedKinds` fields. Since the write
// lock must be held in order to modify the `ok` field, this serves
// to ensure that all in-progress reads complete *before* a reset can begin.
rw sync.RWMutex
// ok indicates whether the cache is in a valid state for reads.
// If `ok` is `false`, reads are forwarded directly to the backend.
@ -438,6 +437,10 @@ type Cache struct {
// collections is a map of registered collections by resource Kind/SubKind
collections map[resourceKind]collection
// confirmedKinds is a map of kinds confirmed by the server to be included in the current generation
// by resource Kind/SubKind
confirmedKinds map[resourceKind]types.WatchKind
// fnCache is used to perform short ttl-based caching of the results of
// regularly called methods.
fnCache *utils.FnCache
@ -477,37 +480,36 @@ func (c *Cache) setInitError(err error) {
})
}
// setReadOK updates Cache.ok, which determines whether the
// cache is accessible for reads.
func (c *Cache) setReadOK(ok bool) {
// setReadStatus updates Cache.ok, which determines whether the
// cache is overall accessible for reads, and confirmedKinds
// which stores resource kinds accessible in current generation.
func (c *Cache) setReadStatus(ok bool, confirmedKinds map[resourceKind]types.WatchKind) {
if c.neverOK {
// we are running inside of a test where the cache
// needs to pretend that it never becomes healthy.
return
}
if ok == c.getReadOK() {
return
}
c.rw.Lock()
defer c.rw.Unlock()
c.ok = ok
}
func (c *Cache) getReadOK() (ok bool) {
c.rw.RLock()
defer c.rw.RUnlock()
return c.ok
c.confirmedKinds = confirmedKinds
}
// read acquires the cache read lock and selects the appropriate
// target for read operations. The returned guard *must* be
// released to prevent deadlocks.
func (c *Cache) read() (readGuard, error) {
// Currently, the caller is trusted to only use the returned guard
// to access methods related to the specified kind and subKind.
// Failure to do that might cause incorrect behavior.
// TODO(andrey): follow up with the type safe approach from the RFD 114 in a separate PR.
func (c *Cache) read(kind string, subkind string) (readGuard, error) {
if c.closed.Load() {
return readGuard{}, trace.Errorf("cache is closed")
}
c.rw.RLock()
if c.ok {
_, kindOK := c.confirmedKinds[resourceKind{kind: kind, subkind: subkind}]
if c.ok && kindOK {
return readGuard{
trust: c.trustCache,
clusterConfig: c.clusterConfigCache,
@ -702,6 +704,10 @@ type Config struct {
// Unstarted indicates that the cache should not be started during New. The
// cache is usable before it's started, but it will always hit the backend.
Unstarted bool
// DisablePartialHealth disables the default mode in which cache can become
// healthy even if some of the requested resource kinds aren't
// supported by the event source.
DisablePartialHealth bool
}
// CheckAndSetDefaults checks parameters and sets default values
@ -777,6 +783,8 @@ const (
// RelativeExpiry notifies that relative expiry operations have
// been run.
RelativeExpiry = "relative_expiry"
// noSubKind is supposed to clarify the meaning of an empty string being passed as an argument to Cache.read.
noSubKind = ""
)
// New creates a new instance of Cache
@ -922,15 +930,47 @@ func (c *Cache) Start() error {
func (c *Cache) NewWatcher(ctx context.Context, watch types.Watch) (types.Watcher, error) {
ctx, span := c.Tracer.Start(ctx, "cache/NewWatcher")
defer span.End()
c.rw.RLock()
cacheOK := c.ok
confirmedKinds := c.confirmedKinds
c.rw.RUnlock()
validKinds := make([]types.WatchKind, 0, len(watch.Kinds))
Outer:
for _, requested := range watch.Kinds {
for _, configured := range c.Config.Watches {
if requested.Kind == configured.Kind {
continue Outer
if cacheOK {
// if cache has been initialized, we already know which kinds are confirmed by the event source
// and can validate the kinds requested for fanout against that.
key := resourceKind{kind: requested.Kind, subkind: requested.SubKind}
if confirmed, ok := confirmedKinds[key]; !ok || !confirmed.Contains(requested) {
if watch.AllowPartialSuccess {
continue
}
return nil, trace.BadParameter("cache %q does not support watching resource %q", c.Config.target, requested.Kind)
}
validKinds = append(validKinds, requested)
} else {
// otherwise, we can only perform preliminary validation against the kinds that cache has been configured for,
// and the returned fanout watcher might fail later when cache receives and propagates its OpInit event.
for _, configured := range c.Config.Watches {
if requested.Kind == configured.Kind && requested.SubKind == configured.SubKind && configured.Contains(requested) {
validKinds = append(validKinds, requested)
continue Outer
}
}
if watch.AllowPartialSuccess {
continue
}
return nil, trace.BadParameter("cache %q does not support watching resource %q", c.Config.target, requested.Kind)
}
return nil, trace.BadParameter("cache %q does not support watching resource %q", c.Config.target, requested.Kind)
}
if len(validKinds) == 0 {
return nil, trace.BadParameter("cache %q does not support any of the requested resources", c.Config.target)
}
watch.Kinds = validKinds
return c.eventsFanout.NewWatcher(ctx, watch)
}
@ -1019,11 +1059,13 @@ func (c *Cache) notify(ctx context.Context, event Event) {
// we assume that this cache will eventually end up in a correct state
// potentially lagging behind the state of the database.
func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer *time.Timer) error {
requestKinds := c.watchKinds()
watcher, err := c.Events.NewWatcher(c.ctx, types.Watch{
QueueSize: c.QueueSize,
Name: c.Component,
Kinds: c.watchKinds(),
MetricComponent: c.MetricComponent,
Name: c.Component,
Kinds: requestKinds,
QueueSize: c.QueueSize,
MetricComponent: c.MetricComponent,
AllowPartialSuccess: !c.DisablePartialHealth,
})
if err != nil {
c.notify(c.ctx, Event{Type: WatcherFailed})
@ -1040,6 +1082,8 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer
// set timer to watcher init timeout
timer.Reset(c.Config.WatcherInitTimeout)
var confirmedKinds []types.WatchKind
// before fetch, make sure watcher is synced by receiving init event,
// to avoid the scenario:
// 1. Cache process: w = NewWatcher()
@ -1063,17 +1107,40 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer
if event.Type != types.OpInit {
return trace.BadParameter("expected init event, got %v instead", event.Type)
}
if watchStatus, ok := event.Resource.(types.WatchStatus); ok {
confirmedKinds = watchStatus.GetKinds()
} else {
// this event was generated by an old Auth service that doesn't support partial success mode,
// which means that we can assume all requested kinds to be confirmed.
confirmedKinds = requestKinds
}
case <-timer.C:
return trace.ConnectionProblem(nil, "timeout waiting for watcher init")
}
apply, err := c.fetch(ctx)
confirmedKindsMap := make(map[resourceKind]types.WatchKind, len(confirmedKinds))
for _, kind := range confirmedKinds {
confirmedKindsMap[resourceKind{kind: kind.Kind, subkind: kind.SubKind}] = kind
}
if len(confirmedKinds) < len(requestKinds) {
rejectedKinds := make([]string, 0, len(requestKinds)-len(confirmedKinds))
for _, kind := range requestKinds {
key := resourceKind{kind: kind.Kind, subkind: kind.SubKind}
if _, ok := confirmedKindsMap[key]; !ok {
rejectedKinds = append(rejectedKinds, key.String())
}
}
c.Logger.WithField("rejected", rejectedKinds).Warn("Some resource kinds unsupported by the server cannot be cached")
}
apply, err := c.fetch(ctx, confirmedKindsMap)
if err != nil {
return trace.Wrap(err)
}
// apply will mutate cache, and possibly leave it in an invalid state
// if an error occurs, so ensure that cache is not read.
c.setReadOK(false)
c.setReadStatus(false, nil)
err = apply(ctx)
if err != nil {
return trace.Wrap(err)
@ -1081,7 +1148,7 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer
// apply was successful; cache is now readable.
c.generation.Add(1)
c.setReadOK(true)
c.setReadStatus(true, confirmedKindsMap)
c.setInitError(nil)
// watchers have been queuing up since the last time
@ -1090,7 +1157,7 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer
// after we've placed the cache into a readable state. This ensures
// that any derivative caches do not perform their fetch operations
// until this cache has finished its apply operations.
c.eventsFanout.SetInit()
c.eventsFanout.SetInit(confirmedKinds)
defer c.eventsFanout.Reset()
retry.Reset()
@ -1318,9 +1385,6 @@ func tracedApplyFn(parent oteltrace.Span, tracer oteltrace.Tracer, kind resource
ctx, span := tracer.Start(
oteltrace.ContextWithSpan(ctx, parent),
fmt.Sprintf("cache/apply/%s", kind.String()),
oteltrace.WithAttributes(
attribute.String("version", kind.version),
),
)
defer func() {
@ -1359,7 +1423,7 @@ func isControlPlane(target string) bool {
return false
}
func (c *Cache) fetch(ctx context.Context) (fn applyFn, err error) {
func (c *Cache) fetch(ctx context.Context, confirmedKinds map[resourceKind]types.WatchKind) (fn applyFn, err error) {
ctx, fetchSpan := c.Tracer.Start(ctx, "cache/fetch", oteltrace.WithAttributes(attribute.String("target", c.target)))
defer func() {
if err != nil {
@ -1394,7 +1458,8 @@ func (c *Cache) fetch(ctx context.Context) (fn applyFn, err error) {
span.End()
}()
applyfn, err := collection.fetch(ctx)
_, cacheOK := confirmedKinds[resourceKind{kind: kind.kind, subkind: kind.subkind}]
applyfn, err := collection.fetch(ctx, cacheOK)
if err != nil {
return trace.Wrap(err, "failed to fetch resource: %q", kind)
}
@ -1449,7 +1514,7 @@ func (c *Cache) GetCertAuthority(ctx context.Context, id types.CertAuthID, loadS
ctx, span := c.Tracer.Start(ctx, "cache/GetCertAuthority")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindCertAuthority, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1491,7 +1556,7 @@ func (c *Cache) GetCertAuthorities(ctx context.Context, caType types.CertAuthTyp
ctx, span := c.Tracer.Start(ctx, "cache/GetCertAuthorities")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindCertAuthority, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1518,7 +1583,7 @@ func (c *Cache) GetStaticTokens() (types.StaticTokens, error) {
_, span := c.Tracer.Start(context.TODO(), "cache/GetStaticTokens")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindStaticTokens, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1531,7 +1596,7 @@ func (c *Cache) GetTokens(ctx context.Context) ([]types.ProvisionToken, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetTokens")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindToken, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1544,7 +1609,7 @@ func (c *Cache) GetToken(ctx context.Context, name string) (types.ProvisionToken
ctx, span := c.Tracer.Start(ctx, "cache/GetToken")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindToken, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1574,7 +1639,7 @@ func (c *Cache) GetClusterAuditConfig(ctx context.Context, opts ...services.Mars
ctx, span := c.Tracer.Start(ctx, "cache/GetClusterAuditConfig")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindClusterAuditConfig, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1597,7 +1662,7 @@ func (c *Cache) GetClusterNetworkingConfig(ctx context.Context, opts ...services
ctx, span := c.Tracer.Start(ctx, "cache/GetClusterNetworkingConfig")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindClusterNetworkingConfig, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1620,7 +1685,7 @@ func (c *Cache) GetClusterName(opts ...services.MarshalOption) (types.ClusterNam
ctx, span := c.Tracer.Start(context.TODO(), "cache/GetClusterName")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindClusterName, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1642,7 +1707,7 @@ func (c *Cache) GetUIConfig(ctx context.Context) (types.UIConfig, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetUIConfig")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindUIConfig, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1657,7 +1722,7 @@ func (c *Cache) GetInstaller(ctx context.Context, name string) (types.Installer,
ctx, span := c.Tracer.Start(ctx, "cache/GetInstaller")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindInstaller, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1672,7 +1737,7 @@ func (c *Cache) GetInstallers(ctx context.Context) ([]types.Installer, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetInstallers")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindInstaller, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1687,7 +1752,7 @@ func (c *Cache) GetRoles(ctx context.Context) ([]types.Role, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetRoles")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindRole, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1700,7 +1765,7 @@ func (c *Cache) GetRole(ctx context.Context, name string) (types.Role, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetRole")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindRole, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1723,7 +1788,7 @@ func (c *Cache) GetNamespace(name string) (*types.Namespace, error) {
_, span := c.Tracer.Start(context.TODO(), "cache/GetNamespace")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindNamespace, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1736,7 +1801,7 @@ func (c *Cache) GetNamespaces() ([]types.Namespace, error) {
_, span := c.Tracer.Start(context.TODO(), "cache/GetNamespaces")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindNamespace, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1749,7 +1814,7 @@ func (c *Cache) GetNode(ctx context.Context, namespace, name string) (types.Serv
ctx, span := c.Tracer.Start(ctx, "cache/GetNode")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindNode, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1768,7 +1833,7 @@ func (c *Cache) GetNodes(ctx context.Context, namespace string) ([]types.Server,
ctx, span := c.Tracer.Start(ctx, "cache/GetNodes")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindNode, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1805,7 +1870,7 @@ func (c *Cache) GetAuthServers() ([]types.Server, error) {
_, span := c.Tracer.Start(context.TODO(), "cache/GetAuthServers")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindAuthServer, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1818,7 +1883,7 @@ func (c *Cache) GetReverseTunnels(ctx context.Context, opts ...services.MarshalO
ctx, span := c.Tracer.Start(ctx, "cache/GetReverseTunnels")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindReverseTunnel, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1831,7 +1896,7 @@ func (c *Cache) GetProxies() ([]types.Server, error) {
_, span := c.Tracer.Start(context.TODO(), "cache/GetProxies")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindProxy, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1850,7 +1915,7 @@ func (c *Cache) GetRemoteClusters(opts ...services.MarshalOption) ([]types.Remot
ctx, span := c.Tracer.Start(context.TODO(), "cache/GetRemoteClusters")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindRemoteCluster, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1878,7 +1943,7 @@ func (c *Cache) GetRemoteCluster(clusterName string) (types.RemoteCluster, error
ctx, span := c.Tracer.Start(context.TODO(), "cache/GetRemoteCluster")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindRemoteCluster, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1915,7 +1980,7 @@ func (c *Cache) GetUser(name string, withSecrets bool) (user types.User, err err
if withSecrets { // cache never tracks user secrets
return c.Config.Users.GetUser(name, withSecrets)
}
rg, err := c.read()
rg, err := c.read(types.KindUser, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1942,7 +2007,7 @@ func (c *Cache) GetUsers(withSecrets bool) (users []types.User, err error) {
if withSecrets { // cache never tracks user secrets
return c.Users.GetUsers(withSecrets)
}
rg, err := c.read()
rg, err := c.read(types.KindUser, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1955,7 +2020,7 @@ func (c *Cache) GetTunnelConnections(clusterName string, opts ...services.Marsha
_, span := c.Tracer.Start(context.TODO(), "cache/GetTunnelConnections")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindTunnelConnection, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1968,7 +2033,7 @@ func (c *Cache) GetAllTunnelConnections(opts ...services.MarshalOption) (conns [
_, span := c.Tracer.Start(context.TODO(), "cache/GetAllTunnelConnections")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindTunnelConnection, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1981,7 +2046,7 @@ func (c *Cache) GetKubernetesServers(ctx context.Context) ([]types.KubeServer, e
ctx, span := c.Tracer.Start(ctx, "cache/GetKubernetesServers")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindKubeServer, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -1994,7 +2059,7 @@ func (c *Cache) GetApplicationServers(ctx context.Context, namespace string) ([]
ctx, span := c.Tracer.Start(ctx, "cache/GetApplicationServers")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindAppServer, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2007,7 +2072,7 @@ func (c *Cache) GetKubernetesClusters(ctx context.Context) ([]types.KubeCluster,
ctx, span := c.Tracer.Start(ctx, "cache/GetKubernetesClusters")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindKubernetesCluster, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2020,7 +2085,7 @@ func (c *Cache) GetKubernetesCluster(ctx context.Context, name string) (types.Ku
ctx, span := c.Tracer.Start(ctx, "cache/GetKubernetesCluster")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindKubernetesCluster, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2033,7 +2098,7 @@ func (c *Cache) GetApps(ctx context.Context) ([]types.Application, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetApps")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindApp, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2046,7 +2111,7 @@ func (c *Cache) GetApp(ctx context.Context, name string) (types.Application, err
ctx, span := c.Tracer.Start(ctx, "cache/GetApp")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindApp, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2059,7 +2124,7 @@ func (c *Cache) GetAppSession(ctx context.Context, req types.GetAppSessionReques
ctx, span := c.Tracer.Start(ctx, "cache/GetAppSession")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindWebSession, types.KindAppSession)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2072,7 +2137,7 @@ func (c *Cache) GetSnowflakeSession(ctx context.Context, req types.GetSnowflakeS
ctx, span := c.Tracer.Start(ctx, "cache/GetSnowflakeSession")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindWebSession, types.KindSnowflakeSession)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2085,7 +2150,7 @@ func (c *Cache) GetSAMLIdPSession(ctx context.Context, req types.GetSAMLIdPSessi
ctx, span := c.Tracer.Start(ctx, "cache/GetSAMLIdPSession")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindWebSession, types.KindSAMLIdPSession)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2098,7 +2163,7 @@ func (c *Cache) GetDatabaseServers(ctx context.Context, namespace string, opts .
ctx, span := c.Tracer.Start(ctx, "cache/GetDatabaseServers")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindDatabaseServer, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2111,7 +2176,7 @@ func (c *Cache) GetDatabases(ctx context.Context) ([]types.Database, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetDatabases")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindDatabase, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2124,7 +2189,7 @@ func (c *Cache) GetDatabase(ctx context.Context, name string) (types.Database, e
ctx, span := c.Tracer.Start(ctx, "cache/GetDatabase")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindDatabase, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2137,7 +2202,7 @@ func (c *Cache) GetWebSession(ctx context.Context, req types.GetWebSessionReques
ctx, span := c.Tracer.Start(ctx, "cache/GetWebSession")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindWebSession, types.KindWebSession)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2150,7 +2215,7 @@ func (c *Cache) GetWebToken(ctx context.Context, req types.GetWebTokenRequest) (
ctx, span := c.Tracer.Start(ctx, "cache/GetWebToken")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindWebToken, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2163,7 +2228,7 @@ func (c *Cache) GetAuthPreference(ctx context.Context) (types.AuthPreference, er
ctx, span := c.Tracer.Start(ctx, "cache/GetAuthPreference")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindClusterAuthPreference, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2176,7 +2241,7 @@ func (c *Cache) GetSessionRecordingConfig(ctx context.Context, opts ...services.
ctx, span := c.Tracer.Start(ctx, "cache/GetSessionRecordingConfig")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindSessionRecordingConfig, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2189,7 +2254,7 @@ func (c *Cache) GetNetworkRestrictions(ctx context.Context) (types.NetworkRestri
ctx, span := c.Tracer.Start(ctx, "cache/GetNetworkRestrictions")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindNetworkRestrictions, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2203,7 +2268,7 @@ func (c *Cache) GetLock(ctx context.Context, name string) (types.Lock, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetLock")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindLock, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2228,7 +2293,7 @@ func (c *Cache) GetLocks(ctx context.Context, inForceOnly bool, targets ...types
ctx, span := c.Tracer.Start(ctx, "cache/GetLocks")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindLock, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2241,7 +2306,7 @@ func (c *Cache) GetWindowsDesktopServices(ctx context.Context) ([]types.WindowsD
ctx, span := c.Tracer.Start(ctx, "cache/GetWindowsDesktopServices")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindWindowsDesktopService, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2254,7 +2319,7 @@ func (c *Cache) GetWindowsDesktopService(ctx context.Context, name string) (type
ctx, span := c.Tracer.Start(ctx, "cache/GetWindowsDesktopService")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindWindowsDesktopService, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2267,7 +2332,7 @@ func (c *Cache) GetWindowsDesktops(ctx context.Context, filter types.WindowsDesk
ctx, span := c.Tracer.Start(ctx, "cache/GetWindowsDesktops")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindWindowsDesktop, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2280,7 +2345,7 @@ func (c *Cache) ListWindowsDesktops(ctx context.Context, req types.ListWindowsDe
ctx, span := c.Tracer.Start(ctx, "cache/ListWindowsDesktops")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindWindowsDesktop, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2293,7 +2358,7 @@ func (c *Cache) ListWindowsDesktopServices(ctx context.Context, req types.ListWi
ctx, span := c.Tracer.Start(ctx, "cache/ListWindowsDesktopServices")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindWindowsDesktopService, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2306,7 +2371,7 @@ func (c *Cache) ListSAMLIdPServiceProviders(ctx context.Context, pageSize int, n
ctx, span := c.Tracer.Start(ctx, "cache/ListSAMLIdPServiceProviders")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindSAMLIdPServiceProvider, noSubKind)
if err != nil {
return nil, "", trace.Wrap(err)
}
@ -2319,7 +2384,7 @@ func (c *Cache) GetSAMLIdPServiceProvider(ctx context.Context, name string) (typ
ctx, span := c.Tracer.Start(ctx, "cache/GetSAMLIdPServiceProvider")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindSAMLIdPServiceProvider, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2332,7 +2397,7 @@ func (c *Cache) ListUserGroups(ctx context.Context, pageSize int, nextKey string
ctx, span := c.Tracer.Start(ctx, "cache/ListUserGroups")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindUserGroup, noSubKind)
if err != nil {
return nil, "", trace.Wrap(err)
}
@ -2345,7 +2410,7 @@ func (c *Cache) GetUserGroup(ctx context.Context, name string) (types.UserGroup,
ctx, span := c.Tracer.Start(ctx, "cache/GetUserGroup")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindUserGroup, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2358,7 +2423,7 @@ func (c *Cache) ListOktaImportRules(ctx context.Context, pageSize int, nextKey s
ctx, span := c.Tracer.Start(ctx, "cache/ListOktaImportRules")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindOktaImportRule, noSubKind)
if err != nil {
return nil, "", trace.Wrap(err)
}
@ -2371,7 +2436,7 @@ func (c *Cache) GetOktaImportRule(ctx context.Context, name string) (types.OktaI
ctx, span := c.Tracer.Start(ctx, "cache/GetOktaImportRule")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindOktaImportRule, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2384,7 +2449,7 @@ func (c *Cache) ListOktaAssignments(ctx context.Context, pageSize int, nextKey s
ctx, span := c.Tracer.Start(ctx, "cache/ListOktaAssignments")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindOktaAssignment, noSubKind)
if err != nil {
return nil, "", trace.Wrap(err)
}
@ -2397,7 +2462,7 @@ func (c *Cache) GetOktaAssignment(ctx context.Context, name string) (types.OktaA
ctx, span := c.Tracer.Start(ctx, "cache/GetOktaAssignment")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindOktaAssignment, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2410,7 +2475,7 @@ func (c *Cache) ListIntegrations(ctx context.Context, pageSize int, nextKey stri
ctx, span := c.Tracer.Start(ctx, "cache/ListIntegrations")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindIntegration, noSubKind)
if err != nil {
return nil, "", trace.Wrap(err)
}
@ -2423,7 +2488,7 @@ func (c *Cache) GetIntegration(ctx context.Context, name string) (types.Integrat
ctx, span := c.Tracer.Start(ctx, "cache/GetIntegration")
defer span.End()
rg, err := c.read()
rg, err := c.read(types.KindIntegration, noSubKind)
if err != nil {
return nil, trace.Wrap(err)
}
@ -2436,7 +2501,7 @@ func (c *Cache) ListResources(ctx context.Context, req proto.ListResourcesReques
ctx, span := c.Tracer.Start(ctx, "cache/ListResources")
defer span.End()
rg, err := c.read()
rg, err := c.read(req.ResourceType, "")
if err != nil {
return nil, trace.Wrap(err)
}

View file

@ -135,6 +135,7 @@ func newTestPackWithoutCache(t *testing.T) *testPack {
type packCfg struct {
memoryBackend bool
ignoreKinds []types.WatchKind
}
type packOption func(cfg *packCfg)
@ -145,6 +146,14 @@ func memoryBackend(bool) packOption {
}
}
// ignoreKinds specifies the list of kinds that should be removed from the watch request by eventsProxy
// to simulate cache resource type rejection due to version incompatibility.
func ignoreKinds(kinds []types.WatchKind) packOption {
return func(cfg *packCfg) {
cfg.ignoreKinds = kinds
}
}
// newPackWithoutCache returns a new test pack without creating cache
func newPackWithoutCache(dir string, opts ...packOption) (*testPack, error) {
ctx := context.Background()
@ -193,7 +202,7 @@ func newPackWithoutCache(dir string, opts ...packOption) (*testPack, error) {
p.trustS = local.NewCAService(p.backend)
p.clusterConfigS = clusterConfig
p.provisionerS = local.NewProvisioningService(p.backend)
p.eventsS = &proxyEvents{events: local.NewEventsService(p.backend)}
p.eventsS = newProxyEvents(local.NewEventsService(p.backend), cfg.ignoreKinds)
p.presenceS = local.NewPresenceService(p.backend)
p.usersS = local.NewIdentityService(p.backend)
p.accessS = local.NewAccessService(p.backend)
@ -482,7 +491,12 @@ func TestNodeCAFiltering(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, nodeCache.Close()) })
cacheWatcher, err := nodeCache.NewWatcher(ctx, types.Watch{Kinds: []types.WatchKind{{Kind: types.KindCertAuthority}}})
cacheWatcher, err := nodeCache.NewWatcher(ctx, types.Watch{Kinds: []types.WatchKind{
{
Kind: types.KindCertAuthority,
Filter: map[string]string{"host": "example.com", "user": "*"},
},
}})
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, cacheWatcher.Close()) })
@ -2350,10 +2364,43 @@ func TestCache_Backoff(t *testing.T) {
}
}
// TestSetupConfigFns ensures that all WatchKinds used in setup config functions are present in ForAuth() as well.
func TestSetupConfigFns(t *testing.T) {
setupFuncs := map[string]SetupConfigFn{
"ForProxy": ForProxy,
"ForRemoteProxy": ForRemoteProxy,
"ForOldRemoteProxy": ForOldRemoteProxy,
"ForNode": ForNode,
"ForKubernetes": ForKubernetes,
"ForApps": ForApps,
"ForDatabases": ForDatabases,
"ForWindowsDesktop": ForWindowsDesktop,
"ForDiscovery": ForDiscovery,
"ForOkta": ForOkta,
}
authKindMap := make(map[resourceKind]types.WatchKind)
for _, wk := range ForAuth(Config{}).Watches {
authKindMap[resourceKind{kind: wk.Kind, subkind: wk.SubKind}] = wk
}
for name, f := range setupFuncs {
t.Run(name, func(t *testing.T) {
for _, wk := range f(Config{}).Watches {
authWK, ok := authKindMap[resourceKind{kind: wk.Kind, subkind: wk.SubKind}]
if !ok || !authWK.Contains(wk) {
t.Errorf("%s includes WatchKind %s that is missing from ForAuth", name, wk.String())
}
}
})
}
}
type proxyEvents struct {
sync.Mutex
watchers []types.Watcher
events types.Events
watchers []types.Watcher
events types.Events
ignoreKinds map[resourceKind]struct{}
}
func (p *proxyEvents) getWatchers() []types.Watcher {
@ -2374,6 +2421,19 @@ func (p *proxyEvents) closeWatchers() {
}
func (p *proxyEvents) NewWatcher(ctx context.Context, watch types.Watch) (types.Watcher, error) {
var effectiveKinds []types.WatchKind
for _, requested := range watch.Kinds {
if _, ok := p.ignoreKinds[resourceKind{kind: requested.Kind, subkind: requested.SubKind}]; ok {
continue
}
effectiveKinds = append(effectiveKinds, requested)
}
if len(effectiveKinds) == 0 {
return nil, trace.BadParameter("all of the requested kinds were ignored")
}
watch.Kinds = effectiveKinds
w, err := p.events.NewWatcher(ctx, watch)
if err != nil {
return nil, trace.Wrap(err)
@ -2384,6 +2444,17 @@ func (p *proxyEvents) NewWatcher(ctx context.Context, watch types.Watch) (types.
return w, nil
}
func newProxyEvents(events types.Events, ignoreKinds []types.WatchKind) *proxyEvents {
ignoreSet := make(map[resourceKind]struct{}, len(ignoreKinds))
for _, kind := range ignoreKinds {
ignoreSet[resourceKind{kind: kind.Kind, subkind: kind.SubKind}] = struct{}{}
}
return &proxyEvents{
events: events,
ignoreKinds: ignoreSet,
}
}
// TestCacheWatchKindExistsInEvents ensures that the watch kinds for each cache component are in sync
// with proto Events delivered via WatchEvents. If a watch kind is added to a cache component, but it
// doesn't exist in the proto Events definition, an error will cause the WatchEvents stream to be closed.
@ -2472,6 +2543,85 @@ func TestCacheWatchKindExistsInEvents(t *testing.T) {
}
}
// TestPartialHealth ensures that when an event source confirms only some resource kinds specified on the watch request,
// Cache operates in partially healthy mode in which it serves reads of the confirmed kinds from the cache and
// lets everything else pass through.
func TestPartialHealth(t *testing.T) {
ctx := context.Background()
// setup cache such that role resources wouldn't be recognized by the event source and wouldn't be cached.
p, err := newPack(t.TempDir(), ForApps, ignoreKinds([]types.WatchKind{{Kind: types.KindRole}}))
require.NoError(t, err)
t.Cleanup(p.Close)
role, err := types.NewRole("editor", types.RoleSpecV6{})
require.NoError(t, err)
require.NoError(t, p.accessS.UpsertRole(ctx, role))
user, err := types.NewUser("bob")
require.NoError(t, err)
require.NoError(t, p.usersS.UpsertUser(user))
select {
case event := <-p.eventsC:
require.Equal(t, EventProcessed, event.Type)
require.Equal(t, types.KindUser, event.Event.Resource.GetKind())
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
// make sure that the user resource works as normal and gets replicated to cache
replicatedUsers, err := p.cache.GetUsers(false)
require.NoError(t, err)
require.Len(t, replicatedUsers, 1)
// now add a label to the user directly in the cache
meta := user.GetMetadata()
meta.Labels = map[string]string{"origin": "cache"}
user.SetMetadata(meta)
require.NoError(t, p.cache.usersCache.UpsertUser(user))
// the label on the returned user proves that it came from the cache
resultUser, err := p.cache.GetUser("bob", false)
require.NoError(t, err)
require.Equal(t, "cache", resultUser.GetMetadata().Labels["origin"])
// query cache storage directly to ensure roles haven't been replicated
rolesStoredInCache, err := p.cache.accessCache.GetRoles(ctx)
require.NoError(t, err)
require.Empty(t, rolesStoredInCache)
// non-empty result here proves that it was not served from cache
resultRoles, err := p.cache.GetRoles(ctx)
require.NoError(t, err)
require.Len(t, resultRoles, 1)
// ensure that cache cannot be watched for resources that weren't confirmed in regular mode
testWatch := types.Watch{
Kinds: []types.WatchKind{
{Kind: types.KindUser},
{Kind: types.KindRole},
},
}
_, err = p.cache.NewWatcher(ctx, testWatch)
require.Error(t, err)
// same request should work in partial success mode, but WatchStatus on the OpInit event should indicate
// that only user resources will be watched.
testWatch.AllowPartialSuccess = true
w, err := p.cache.NewWatcher(ctx, testWatch)
require.NoError(t, err)
select {
case e := <-w.Events():
require.Equal(t, types.OpInit, e.Type)
watchStatus, ok := e.Resource.(types.WatchStatus)
require.True(t, ok)
require.Equal(t, []types.WatchKind{{Kind: types.KindUser}}, watchStatus.GetKinds())
case <-time.After(time.Second):
t.Fatal("Timeout waiting for event.")
}
}
func withKeepalive[T any](fn func(context.Context, T) (*types.KeepAlive, error)) func(context.Context, T) error {
return func(ctx context.Context, resource T) error {
_, err := fn(ctx, resource)

View file

@ -32,10 +32,12 @@ import (
// collection is responsible for managing collection
// of resources updates
type collection interface {
// fetch fetches resources and returns a function which
// will apply said resources to the cache. fetch *must*
// not mutate cache state outside of the apply function.
fetch(ctx context.Context) (apply func(ctx context.Context) error, err error)
// fetch fetches resources and returns a function which will apply said resources to the cache.
// fetch *must* not mutate cache state outside of the apply function.
// The provided cacheOK flag indicates whether this collection will be included in the cache generation that is
// being prepared. If cacheOK is false, fetch doesn't need to fetch any resources, but the apply function that it
// returns must still delete resources from the backend.
fetch(ctx context.Context, cacheOK bool) (apply func(ctx context.Context) error, err error)
// processEvent processes event
processEvent(ctx context.Context, e types.Event) error
// watchKind returns a watch
@ -71,20 +73,26 @@ type genericCollection[R types.Resource, E executor[R]] struct {
}
// fetch implements collection
func (g *genericCollection[_, _]) fetch(ctx context.Context) (apply func(ctx context.Context) error, err error) {
func (g *genericCollection[R, _]) fetch(ctx context.Context, cacheOK bool) (apply func(ctx context.Context) error, err error) {
// Singleton objects will only get deleted or updated, not both
deleteSingleton := false
resources, err := g.exec.getAll(ctx, g.cache, g.watch.LoadSecrets)
if err != nil {
if !trace.IsNotFound(err) {
return nil, trace.Wrap(err)
var resources []R
if cacheOK {
resources, err = g.exec.getAll(ctx, g.cache, g.watch.LoadSecrets)
if err != nil {
if !trace.IsNotFound(err) {
return nil, trace.Wrap(err)
}
deleteSingleton = true
}
deleteSingleton = true
}
return func(ctx context.Context) error {
// Always perform the delete if this is not a singleton, otherwise
// only perform the delete if the singleton wasn't found.
if !g.exec.isSingleton() || deleteSingleton {
// only perform the delete if the singleton wasn't found
// or the resource kind isn't cached in the current generation.
if !g.exec.isSingleton() || deleteSingleton || !cacheOK {
if err := g.exec.deleteAll(ctx, g.cache); err != nil {
if !trace.IsNotFound(err) {
return trace.Wrap(err)
@ -93,7 +101,8 @@ func (g *genericCollection[_, _]) fetch(ctx context.Context) (apply func(ctx con
}
// If this is a singleton and we performed a deletion, return here
// because we only want to update or delete a singleton, not both.
if g.exec.isSingleton() && deleteSingleton {
// Also don't continue if the resource kind isn't cached in the current generation.
if g.exec.isSingleton() && deleteSingleton || !cacheOK {
return nil
}
for _, resource := range resources {
@ -374,12 +383,10 @@ func resourceKindFromWatchKind(wk types.WatchKind) resourceKind {
return resourceKind{
kind: wk.Kind,
subkind: wk.SubKind,
version: wk.Version,
}
}
return resourceKind{
kind: wk.Kind,
version: wk.Version,
kind: wk.Kind,
}
}
@ -392,15 +399,6 @@ func resourceKindFromResource(res types.Resource) resourceKind {
kind: res.GetKind(),
subkind: res.GetSubKind(),
}
case types.KindAppServer:
// DELETE IN 9.0.
switch res.GetVersion() {
case types.V2:
return resourceKind{
kind: res.GetKind(),
version: res.GetVersion(),
}
}
}
return resourceKind{
kind: res.GetKind(),
@ -410,7 +408,6 @@ func resourceKindFromResource(res types.Resource) resourceKind {
type resourceKind struct {
kind string
subkind string
version string
}
func (r resourceKind) String() string {

View file

@ -220,7 +220,7 @@ func setupBPFContext(t *testing.T) *bpfContext {
bpfCtx.restrictedMgr, err = New(config, client)
require.NoError(t, err)
client.Fanout.SetInit()
client.Fanout.SetInit([]api.WatchKind{{Kind: api.KindNetworkRestrictions}})
time.Sleep(100 * time.Millisecond)

View file

@ -33,12 +33,18 @@ type fanoutEntry struct {
watcher *fanoutWatcher
}
type resourceKind struct {
kind string
subKind string
}
// Fanout is a helper which allows a stream of events to be fanned-out to many
// watchers. Used by the cache layer to forward events.
type Fanout struct {
mu sync.Mutex
init, closed bool
watchers map[string][]fanoutEntry
mu sync.Mutex
init, closed bool
confirmedKinds map[resourceKind]types.WatchKind
watchers map[string][]fanoutEntry
// eventsCh is used in tests
eventsCh chan FanoutEvent
}
@ -81,9 +87,9 @@ func (f *Fanout) NewWatcher(ctx context.Context, watch types.Watch) (types.Watch
}
if f.init {
// fanout is already initialized; emit init event immediately.
if !w.init() {
if err := w.init(f.confirmedKinds); err != nil {
w.cancel()
return nil, trace.BadParameter("failed to send init event")
return nil, trace.Wrap(err)
}
}
f.addWatcher(w)
@ -92,16 +98,27 @@ func (f *Fanout) NewWatcher(ctx context.Context, watch types.Watch) (types.Watch
// SetInit sets Fanout into an initialized state, sending OpInit events
// to any watchers which were added prior to initialization.
func (f *Fanout) SetInit() {
// Caller must pass a list of resource kinds confirmed by the upstream event source.
// As a result of this call, each member watcher will also receive a confirmation
// based on the provided kinds. Some of the watchers might be closed with an error if resource kinds
// requested by them weren't confirmed by the upstream event source and they didn't enable
// partial success mode.
func (f *Fanout) SetInit(confirmedKinds []types.WatchKind) {
f.mu.Lock()
defer f.mu.Unlock()
if f.init {
return
}
f.confirmedKinds = make(map[resourceKind]types.WatchKind, len(confirmedKinds))
for _, kind := range confirmedKinds {
f.confirmedKinds[resourceKind{kind: kind.Kind, subKind: kind.SubKind}] = kind
}
for _, entries := range f.watchers {
var remove []*fanoutWatcher
for _, entry := range entries {
if !entry.watcher.init() {
if err := entry.watcher.init(f.confirmedKinds); err != nil {
remove = append(remove, entry.watcher)
}
}
@ -325,16 +342,38 @@ type fanoutWatcher struct {
}
// init transmits the OpInit event. safe to double-call.
func (w *fanoutWatcher) init() (ok bool) {
// Validates requested resource kinds against the list confirmed by the upstream event source.
// See Fanout.SetInit() for more details.
func (w *fanoutWatcher) init(confirmedKinds map[resourceKind]types.WatchKind) (err error) {
w.initOnce.Do(func() {
w.initOk = false
validKinds := make([]types.WatchKind, 0, len(w.watch.Kinds))
for _, requested := range w.watch.Kinds {
k := resourceKind{kind: requested.Kind, subKind: requested.SubKind}
if configured, ok := confirmedKinds[k]; !ok || !configured.Contains(requested) {
if w.watch.AllowPartialSuccess {
continue
}
err = trace.BadParameter("resource type %s is not supported by this fanoutWatcher", requested.Kind)
return
}
validKinds = append(validKinds, requested)
}
if len(validKinds) == 0 {
err = trace.BadParameter("none of the requested resources are supported by this fanoutWatcher")
return
}
select {
case w.eventC <- types.Event{Type: types.OpInit}:
case w.eventC <- types.Event{Type: types.OpInit, Resource: types.NewWatchStatus(validKinds)}:
w.initOk = true
default:
w.initOk = false
err = trace.BadParameter("failed to send init event")
}
})
return w.initOk
return err
}
func (w *fanoutWatcher) emit(event types.Event) error {
@ -428,11 +467,13 @@ func (s *FanoutSet) NewWatcher(ctx context.Context, watch types.Watch) (types.Wa
// SetInit sets the Fanout instances into an initialized state, sending OpInit
// events to any watchers which were added prior to initialization.
func (s *FanoutSet) SetInit() {
// Takes a list of resource kinds confirmed by an upstream event source.
// See Fanout.SetInit() for more details.
func (s *FanoutSet) SetInit(confirmedKinds []types.WatchKind) {
s.rw.RLock() // see field-level docks for locking model
defer s.rw.RUnlock()
for _, f := range s.members {
f.SetInit()
f.SetInit(confirmedKinds)
}
}

View file

@ -54,15 +54,16 @@ func TestFanoutInit(t *testing.T) {
w, err := f.NewWatcher(context.TODO(), types.Watch{
Name: "test",
Kinds: []types.WatchKind{{Name: "spam"}, {Name: "eggs"}},
Kinds: []types.WatchKind{{Kind: "spam"}, {Kind: "eggs"}},
})
require.NoError(t, err)
f.SetInit()
f.SetInit([]types.WatchKind{{Kind: "spam"}, {Kind: "eggs"}})
select {
case e := <-w.Events():
require.Equal(t, types.OpInit, e.Type)
require.Equal(t, types.NewWatchStatus([]types.WatchKind{{Kind: "spam"}, {Kind: "eggs"}}), e.Resource)
default:
t.Fatalf("Expected init event")
}
@ -74,6 +75,81 @@ func TestFanoutInit(t *testing.T) {
}
}
// TestUnsupportedKindInitialized verifies that an initialized Fanout fails immediately when requested a watched
// for a resource kind that wasn't confirmed by the event source in regular mode, but works in partial success mode.
func TestUnsupportedKindInitialized(t *testing.T) {
ctx := context.Background()
f := NewFanout()
f.SetInit([]types.WatchKind{{Kind: "spam"}})
// fails immediately in regular mode
testWatch := types.Watch{
Name: "test",
Kinds: []types.WatchKind{{Kind: "spam"}, {Kind: "eggs"}},
}
_, err := f.NewWatcher(ctx, testWatch)
require.Error(t, err)
// works in partial success mode
testWatch.AllowPartialSuccess = true
w, err := f.NewWatcher(ctx, testWatch)
require.NoError(t, err)
select {
case e := <-w.Events():
require.Equal(t, types.OpInit, e.Type)
watchStatus, ok := e.Resource.(types.WatchStatus)
require.True(t, ok)
require.Equal(t, []types.WatchKind{{Kind: "spam"}}, watchStatus.GetKinds())
case <-time.After(time.Second):
t.Fatal("Timeout waiting for event.")
}
}
// TestUnsupportedKindDelayed verifies that, upon initialization, Fanout closes pending watchers that requested
// resource kinds that weren't confirmed by the event source and didn't enable partial success mode.
func TestUnsupportedKindDelayed(t *testing.T) {
ctx := context.Background()
f := NewFanout()
regularWatcher, err := f.NewWatcher(ctx, types.Watch{
Name: "test",
Kinds: []types.WatchKind{{Kind: "spam"}, {Kind: "eggs"}},
})
require.NoError(t, err)
partialSuccessWatcher, err := f.NewWatcher(ctx, types.Watch{
Name: "test",
Kinds: []types.WatchKind{{Kind: "spam"}, {Kind: "eggs"}},
AllowPartialSuccess: true,
})
require.NoError(t, err)
f.SetInit([]types.WatchKind{{Kind: "spam"}})
// regular watcher fails upon Fanout initialization
select {
case <-regularWatcher.Events():
t.Fatal("unexpected event from watcher that's supposed to fail")
case <-regularWatcher.Done():
require.Error(t, regularWatcher.Error())
case <-time.After(time.Second):
t.Fatal("Timeout waiting for close event.")
}
// watcher in partial success mode receives OpInit with partial confirmation
select {
case e := <-partialSuccessWatcher.Events():
require.Equal(t, types.OpInit, e.Type)
watchStatus, ok := e.Resource.(types.WatchStatus)
require.True(t, ok)
require.Equal(t, []types.WatchKind{{Kind: "spam"}}, watchStatus.GetKinds())
case <-time.After(time.Second):
t.Fatal("Timeout waiting for event.")
}
}
/*
goos: linux
goarch: amd64
@ -89,7 +165,7 @@ func BenchmarkFanoutRegistration(b *testing.B) {
for n := 0; n < b.N; n++ {
f := NewFanout()
f.SetInit()
f.SetInit([]types.WatchKind{{Kind: "spam"}, {Kind: "eggs"}})
var wg sync.WaitGroup
@ -99,7 +175,7 @@ func BenchmarkFanoutRegistration(b *testing.B) {
defer wg.Done()
w, err := f.NewWatcher(ctx, types.Watch{
Name: "test",
Kinds: []types.WatchKind{{Name: "spam"}, {Name: "eggs"}},
Kinds: []types.WatchKind{{Kind: "spam"}, {Kind: "eggs"}},
})
require.NoError(b, err)
w.Close()
@ -123,7 +199,7 @@ func BenchmarkFanoutSetRegistration(b *testing.B) {
for n := 0; n < b.N; n++ {
f := NewFanoutSet()
f.SetInit()
f.SetInit([]types.WatchKind{{Kind: "spam"}, {Kind: "eggs"}})
var wg sync.WaitGroup
@ -133,7 +209,7 @@ func BenchmarkFanoutSetRegistration(b *testing.B) {
defer wg.Done()
w, err := f.NewWatcher(ctx, types.Watch{
Name: "test",
Kinds: []types.WatchKind{{Name: "spam"}, {Name: "eggs"}},
Kinds: []types.WatchKind{{Kind: "spam"}, {Kind: "eggs"}},
})
require.NoError(b, err)
w.Close()

View file

@ -50,10 +50,15 @@ func (e *EventsService) NewWatcher(ctx context.Context, watch types.Watch) (type
if len(watch.Kinds) == 0 {
return nil, trace.BadParameter("global watches are not supported yet")
}
validKinds := make([]types.WatchKind, 0, len(watch.Kinds))
var parsers []resourceParser
var prefixes [][]byte
for _, kind := range watch.Kinds {
if kind.Name != "" && kind.Kind != types.KindNamespace {
if watch.AllowPartialSuccess {
continue
}
return nil, trace.BadParameter("watch with Name is only supported for Namespace resource")
}
var parser resourceParser
@ -95,16 +100,14 @@ func (e *EventsService) NewWatcher(ctx context.Context, watch types.Watch) (type
case types.KindAccessRequest:
p, err := newAccessRequestParser(kind.Filter)
if err != nil {
if watch.AllowPartialSuccess {
continue
}
return nil, trace.Wrap(err)
}
parser = p
case types.KindAppServer:
switch kind.Version {
case types.V2: // DELETE IN 9.0.
parser = newAppServerV2Parser()
default:
parser = newAppServerV3Parser()
}
parser = newAppServerV3Parser()
case types.KindWebSession:
switch kind.SubKind {
case types.KindSAMLIdPSession:
@ -116,6 +119,9 @@ func (e *EventsService) NewWatcher(ctx context.Context, watch types.Watch) (type
case types.KindWebSession:
parser = newWebSessionParser()
default:
if watch.AllowPartialSuccess {
continue
}
return nil, trace.BadParameter("watcher on object subkind %q is not supported", kind.SubKind)
}
case types.KindWebToken:
@ -157,11 +163,20 @@ func (e *EventsService) NewWatcher(ctx context.Context, watch types.Watch) (type
case types.KindIntegration:
parser = newIntegrationParser()
default:
if watch.AllowPartialSuccess {
continue
}
return nil, trace.BadParameter("watcher on object kind %q is not supported", kind.Kind)
}
prefixes = append(prefixes, parser.prefixes()...)
parsers = append(parsers, parser)
validKinds = append(validKinds, kind)
}
if len(validKinds) == 0 {
return nil, trace.BadParameter("none of the requested kinds can be watched")
}
w, err := e.backend.NewWatcher(ctx, backend.Watch{
Name: watch.Name,
Prefixes: prefixes,
@ -171,15 +186,16 @@ func (e *EventsService) NewWatcher(ctx context.Context, watch types.Watch) (type
if err != nil {
return nil, trace.Wrap(err)
}
return newWatcher(w, e.Entry, parsers), nil
return newWatcher(w, e.Entry, parsers, validKinds), nil
}
func newWatcher(backendWatcher backend.Watcher, l *logrus.Entry, parsers []resourceParser) *watcher {
func newWatcher(backendWatcher backend.Watcher, l *logrus.Entry, parsers []resourceParser, kinds []types.WatchKind) *watcher {
w := &watcher{
backendWatcher: backendWatcher,
Entry: l,
parsers: parsers,
eventsC: make(chan types.Event),
kinds: kinds,
}
go w.forwardEvents()
return w
@ -190,6 +206,7 @@ type watcher struct {
parsers []resourceParser
backendWatcher backend.Watcher
eventsC chan types.Event
kinds []types.WatchKind
}
func (w *watcher) Error() error {
@ -198,7 +215,7 @@ func (w *watcher) Error() error {
func (w *watcher) parseEvent(e backend.Event) ([]types.Event, []error) {
if e.Type == types.OpInit {
return []types.Event{{Type: e.Type}}, nil
return []types.Event{{Type: e.Type, Resource: types.NewWatchStatus(w.kinds)}}, nil
}
events := []types.Event{}
errs := []error{}
@ -904,21 +921,6 @@ func (p *appServerV3Parser) parse(event backend.Event) (types.Resource, error) {
}
}
func newAppServerV2Parser() *appServerV2Parser {
return &appServerV2Parser{
baseParser: newBaseParser(backend.Key(appsPrefix, serversPrefix, apidefaults.Namespace)),
}
}
// DELETE IN 9.0. Deprecated, replaced by applicationServerParser.
type appServerV2Parser struct {
baseParser
}
func (p *appServerV2Parser) parse(event backend.Event) (types.Resource, error) {
return parseServer(event, types.KindAppServer)
}
func newSAMLIdPSessionParser() *webSessionParser {
return &webSessionParser{
baseParser: newBaseParser(backend.Key(samlIdPPrefix, sessionsPrefix)),

View file

@ -1336,7 +1336,7 @@ func (s *ServicesTestSuite) Events(t *testing.T) {
},
},
}
s.runEventsTests(t, testCases)
s.runEventsTests(t, testCases, types.Watch{Kinds: eventsTestKinds(testCases)})
testCases = []eventTest{
{
@ -1357,7 +1357,7 @@ func (s *ServicesTestSuite) Events(t *testing.T) {
},
},
}
s.runEventsTests(t, testCases)
s.runEventsTests(t, testCases, types.Watch{Kinds: eventsTestKinds(testCases)})
testCases = []eventTest{
{
@ -1586,7 +1586,11 @@ func (s *ServicesTestSuite) Events(t *testing.T) {
},
},
}
s.runEventsTests(t, testCases)
// this also tests the partial success mode by requesting an unknown kind
s.runEventsTests(t, testCases, types.Watch{
Kinds: append(eventsTestKinds(testCases), types.WatchKind{Kind: "unknown"}),
AllowPartialSuccess: true,
})
// Namespace with a name
testCases = []eventTest{
@ -1618,7 +1622,22 @@ func (s *ServicesTestSuite) Events(t *testing.T) {
},
},
}
s.runEventsTests(t, testCases)
s.runEventsTests(t, testCases, types.Watch{Kinds: eventsTestKinds(testCases)})
// tests that a watch fails given an unknown kind when the partial success mode is not enabled
s.runUnknownEventsTest(t, types.Watch{Kinds: []types.WatchKind{
{Kind: types.KindNamespace},
{Kind: "unknown"},
}})
// tests that a watch fails if all given kinds are unknown even if the success mode is enabled
s.runUnknownEventsTest(t, types.Watch{
Kinds: []types.WatchKind{
{Kind: "unrecognized"},
{Kind: "unidentified"},
},
AllowPartialSuccess: true,
})
}
// EventsClusterConfig tests cluster config resource events
@ -1716,7 +1735,7 @@ func (s *ServicesTestSuite) EventsClusterConfig(t *testing.T) {
},
},
}
s.runEventsTests(t, testCases)
s.runEventsTests(t, testCases, types.Watch{Kinds: eventsTestKinds(testCases)})
}
// NetworkRestrictions tests network restrictions.
@ -1761,17 +1780,19 @@ func (s *ServicesTestSuite) NetworkRestrictions(t *testing.T, opts ...Option) {
require.True(t, trace.IsNotFound(err))
}
func (s *ServicesTestSuite) runEventsTests(t *testing.T, testCases []eventTest) {
func (s *ServicesTestSuite) runEventsTests(t *testing.T, testCases []eventTest, watch types.Watch) {
ctx := context.Background()
w, err := s.EventsS.NewWatcher(ctx, types.Watch{
Kinds: eventsTestKinds(testCases),
})
w, err := s.EventsS.NewWatcher(ctx, watch)
require.NoError(t, err)
defer w.Close()
select {
case event := <-w.Events():
require.Equal(t, event.Type, types.OpInit)
watchStatus, ok := event.Resource.(types.WatchStatus)
require.True(t, ok)
expectedKinds := eventsTestKinds(testCases)
require.Equal(t, expectedKinds, watchStatus.GetKinds())
case <-w.Done():
t.Fatalf("Watcher exited with error %v", w.Error())
case <-time.After(2 * time.Second):
@ -1815,6 +1836,26 @@ skiploop:
}
}
func (s *ServicesTestSuite) runUnknownEventsTest(t *testing.T, watch types.Watch) {
ctx := context.Background()
w, err := s.EventsS.NewWatcher(ctx, watch)
if err != nil {
// depending on the implementation of EventsS, it might fail here immediately
// or later before returning the first event from the watcher.
return
}
defer w.Close()
select {
case <-w.Events():
t.Fatal("unexpected event from watcher that is supposed to fail")
case <-w.Done():
require.Error(t, w.Error())
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for error from watcher")
}
}
type eventTest struct {
name string
kind types.WatchKind

View file

@ -521,7 +521,7 @@ func NewLockWatcher(ctx context.Context, cfg LockWatcherConfig) (*LockWatcher, e
}
// Resource watcher require the fanout to be initialized before passing in.
// Otherwise, Emit() may fail due to a race condition mentioned in https://github.com/gravitational/teleport/issues/19289
collector.fanout.SetInit()
collector.fanout.SetInit([]types.WatchKind{{Kind: collector.resourceKind()}})
watcher, err := newResourceWatcher(ctx, collector, cfg.ResourceWatcherConfig)
if err != nil {
return nil, trace.Wrap(err)
@ -1198,7 +1198,7 @@ func NewCertAuthorityWatcher(ctx context.Context, cfg CertAuthorityWatcherConfig
}
// Resource watcher require the fanout to be initialized before passing in.
// Otherwise, Emit() may fail due to a race condition mentioned in https://github.com/gravitational/teleport/issues/19289
collector.fanout.SetInit()
collector.fanout.SetInit([]types.WatchKind{{Kind: collector.resourceKind()}})
watcher, err := newResourceWatcher(ctx, collector, cfg.ResourceWatcherConfig)
if err != nil {
return nil, trace.Wrap(err)

View file

@ -1,6 +1,6 @@
---
authors: Andrey Bulgakov (andrey.bulgakov@goteleport.com)
state: draft
state: implemented
---
# RFD 0114 - Partial cache healthiness