Ensure Cache types.WatchKinds and proto.WatchEvents are in sync (#11692)

This adds a test, `TestCacheWatchKindExistsInEvents`, that ensures
all `types.WatchKind` registered for all Cache components have a
corresponding `proto.Event` defined. By doing this we can ensure
that for a release there will be no `resource type X is not
suppported` errors causing the Cache to be unhealthy. Unfortunately
this does not prevent issues across releases which may have different
resources being emitted.
This commit is contained in:
rosstimothy 2022-04-09 10:34:32 -04:00 committed by GitHub
parent aba0b0cd93
commit f5d254259b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 373 additions and 269 deletions

286
api/client/events.go Normal file
View file

@ -0,0 +1,286 @@
// Copyright 2022 Gravitational, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/trace"
)
// EventToGRPC converts types.Event to proto.Event
func EventToGRPC(in types.Event) (*proto.Event, error) {
eventType, err := EventTypeToGRPC(in.Type)
if err != nil {
return nil, trace.Wrap(err)
}
out := proto.Event{
Type: eventType,
}
if in.Type == types.OpInit {
return &out, nil
}
switch r := in.Resource.(type) {
case *types.ResourceHeader:
out.Resource = &proto.Event_ResourceHeader{
ResourceHeader: r,
}
case *types.CertAuthorityV2:
out.Resource = &proto.Event_CertAuthority{
CertAuthority: r,
}
case *types.StaticTokensV2:
out.Resource = &proto.Event_StaticTokens{
StaticTokens: r,
}
case *types.ProvisionTokenV2:
out.Resource = &proto.Event_ProvisionToken{
ProvisionToken: r,
}
case *types.ClusterNameV2:
out.Resource = &proto.Event_ClusterName{
ClusterName: r,
}
case *types.UserV2:
out.Resource = &proto.Event_User{
User: r,
}
case *types.RoleV5:
out.Resource = &proto.Event_Role{
Role: r,
}
case *types.Namespace:
out.Resource = &proto.Event_Namespace{
Namespace: r,
}
case *types.ServerV2:
out.Resource = &proto.Event_Server{
Server: r,
}
case *types.ReverseTunnelV2:
out.Resource = &proto.Event_ReverseTunnel{
ReverseTunnel: r,
}
case *types.TunnelConnectionV2:
out.Resource = &proto.Event_TunnelConnection{
TunnelConnection: r,
}
case *types.AccessRequestV3:
out.Resource = &proto.Event_AccessRequest{
AccessRequest: r,
}
case *types.WebSessionV2:
switch r.GetSubKind() {
case types.KindAppSession:
out.Resource = &proto.Event_AppSession{
AppSession: r,
}
case types.KindWebSession:
out.Resource = &proto.Event_WebSession{
WebSession: r,
}
default:
return nil, trace.BadParameter("only %q supported", types.WebSessionSubKinds)
}
case *types.WebTokenV3:
out.Resource = &proto.Event_WebToken{
WebToken: r,
}
case *types.RemoteClusterV3:
out.Resource = &proto.Event_RemoteCluster{
RemoteCluster: r,
}
case *types.AppServerV3:
out.Resource = &proto.Event_AppServer{
AppServer: r,
}
case *types.DatabaseServerV3:
out.Resource = &proto.Event_DatabaseServer{
DatabaseServer: r,
}
case *types.DatabaseV3:
out.Resource = &proto.Event_Database{
Database: r,
}
case *types.AppV3:
out.Resource = &proto.Event_App{
App: r,
}
case *types.ClusterAuditConfigV2:
out.Resource = &proto.Event_ClusterAuditConfig{
ClusterAuditConfig: r,
}
case *types.ClusterNetworkingConfigV2:
out.Resource = &proto.Event_ClusterNetworkingConfig{
ClusterNetworkingConfig: r,
}
case *types.SessionRecordingConfigV2:
out.Resource = &proto.Event_SessionRecordingConfig{
SessionRecordingConfig: r,
}
case *types.AuthPreferenceV2:
out.Resource = &proto.Event_AuthPreference{
AuthPreference: r,
}
case *types.LockV2:
out.Resource = &proto.Event_Lock{
Lock: r,
}
case *types.NetworkRestrictionsV4:
out.Resource = &proto.Event_NetworkRestrictions{
NetworkRestrictions: r,
}
case *types.WindowsDesktopServiceV3:
out.Resource = &proto.Event_WindowsDesktopService{
WindowsDesktopService: r,
}
case *types.WindowsDesktopV3:
out.Resource = &proto.Event_WindowsDesktop{
WindowsDesktop: r,
}
default:
return nil, trace.BadParameter("resource type %T is not supported", in.Resource)
}
return &out, nil
}
// EventTypeToGRPC converts types.OpType to proto.Operation
func EventTypeToGRPC(in types.OpType) (proto.Operation, error) {
switch in {
case types.OpInit:
return proto.Operation_INIT, nil
case types.OpPut:
return proto.Operation_PUT, nil
case types.OpDelete:
return proto.Operation_DELETE, nil
default:
return -1, trace.BadParameter("event type %v is not supported", in)
}
}
// EventFromGRPC converts proto.Event to types.Event
func EventFromGRPC(in proto.Event) (*types.Event, error) {
eventType, err := EventTypeFromGRPC(in.Type)
if err != nil {
return nil, trace.Wrap(err)
}
out := types.Event{
Type: eventType,
}
if eventType == types.OpInit {
return &out, nil
}
if r := in.GetResourceHeader(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetCertAuthority(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetStaticTokens(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetProvisionToken(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetClusterName(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetUser(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetRole(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetNamespace(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetServer(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetReverseTunnel(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetTunnelConnection(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetAccessRequest(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetAppSession(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetWebSession(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetWebToken(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetRemoteCluster(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetAppServer(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetDatabaseServer(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetApp(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetDatabase(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetClusterAuditConfig(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetClusterNetworkingConfig(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetSessionRecordingConfig(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetAuthPreference(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetLock(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetNetworkRestrictions(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetWindowsDesktopService(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetWindowsDesktop(); r != nil {
out.Resource = r
return &out, nil
} else {
return nil, trace.BadParameter("received unsupported resource %T", in.Resource)
}
}
// EventTypeFromGRPC converts proto.Operation to types.OpType
func EventTypeFromGRPC(in proto.Operation) (types.OpType, error) {
switch in {
case proto.Operation_INIT:
return types.OpInit, nil
case proto.Operation_PUT:
return types.OpPut, nil
case proto.Operation_DELETE:
return types.OpDelete, nil
default:
return types.OpInvalid, trace.BadParameter("unsupported operation type: %v", in)
}
}

View file

@ -87,7 +87,7 @@ func (w *streamWatcher) receiveEvents() {
w.closeWithError(trail.FromGRPC(err))
return
}
out, err := eventFromGRPC(*event)
out, err := EventFromGRPC(*event)
if err != nil {
w.closeWithError(trail.FromGRPC(err))
return
@ -100,120 +100,6 @@ func (w *streamWatcher) receiveEvents() {
}
}
// eventFromGRPC converts an proto.Event to a types.Event
func eventFromGRPC(in proto.Event) (*types.Event, error) {
eventType, err := eventTypeFromGRPC(in.Type)
if err != nil {
return nil, trace.Wrap(err)
}
out := types.Event{
Type: eventType,
}
if eventType == types.OpInit {
return &out, nil
}
if r := in.GetResourceHeader(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetCertAuthority(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetStaticTokens(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetProvisionToken(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetClusterName(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetUser(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetRole(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetNamespace(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetServer(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetReverseTunnel(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetTunnelConnection(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetAccessRequest(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetAppSession(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetWebSession(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetWebToken(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetRemoteCluster(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetAppServer(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetDatabaseServer(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetApp(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetDatabase(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetClusterAuditConfig(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetClusterNetworkingConfig(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetSessionRecordingConfig(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetAuthPreference(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetLock(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetNetworkRestrictions(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetWindowsDesktopService(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetWindowsDesktop(); r != nil {
out.Resource = r
return &out, nil
} else {
return nil, trace.BadParameter("received unsupported resource %T", in.Resource)
}
}
func eventTypeFromGRPC(in proto.Operation) (types.OpType, error) {
switch in {
case proto.Operation_INIT:
return types.OpInit, nil
case proto.Operation_PUT:
return types.OpPut, nil
case proto.Operation_DELETE:
return types.OpDelete, nil
default:
return types.OpInvalid, trace.BadParameter("unsupported operation type: %v", in)
}
}
// Done returns a channel that closes once the streamWatcher is Closed
func (w *streamWatcher) Done() <-chan struct{} {
return w.ctx.Done()

View file

@ -27,6 +27,7 @@ import (
"github.com/coreos/go-semver/semver"
"github.com/golang/protobuf/ptypes/empty"
"github.com/google/uuid"
"github.com/gravitational/teleport/api/client"
"github.com/gravitational/trace"
"github.com/gravitational/trace/trail"
"github.com/prometheus/client_golang/prometheus"
@ -335,7 +336,16 @@ func (g *GRPCServer) WatchEvents(watch *proto.Watch, stream proto.AuthService_Wa
case <-watcher.Done():
return watcher.Error()
case event := <-watcher.Events():
out, err := eventToGRPC(stream.Context(), event)
switch r := event.Resource.(type) {
case *types.RoleV5:
downgraded, err := downgradeRole(stream.Context(), r)
if err != nil {
return trace.Wrap(err)
}
event.Resource = downgraded
}
out, err := client.EventToGRPC(event)
if err != nil {
return trace.Wrap(err)
}
@ -414,159 +424,6 @@ func resourceLabel(event types.Event) string {
return fmt.Sprintf("/%s/%s", event.Resource.GetKind(), sub)
}
// eventToGRPC converts a types.Event to an proto.Event
func eventToGRPC(ctx context.Context, in types.Event) (*proto.Event, error) {
eventType, err := eventTypeToGRPC(in.Type)
if err != nil {
return nil, trace.Wrap(err)
}
out := proto.Event{
Type: eventType,
}
if in.Type == types.OpInit {
return &out, nil
}
switch r := in.Resource.(type) {
case *types.ResourceHeader:
out.Resource = &proto.Event_ResourceHeader{
ResourceHeader: r,
}
case *types.CertAuthorityV2:
out.Resource = &proto.Event_CertAuthority{
CertAuthority: r,
}
case *types.StaticTokensV2:
out.Resource = &proto.Event_StaticTokens{
StaticTokens: r,
}
case *types.ProvisionTokenV2:
out.Resource = &proto.Event_ProvisionToken{
ProvisionToken: r,
}
case *types.ClusterNameV2:
out.Resource = &proto.Event_ClusterName{
ClusterName: r,
}
case *types.UserV2:
out.Resource = &proto.Event_User{
User: r,
}
case *types.RoleV5:
downgraded, err := downgradeRole(ctx, r)
if err != nil {
return nil, trace.Wrap(err)
}
out.Resource = &proto.Event_Role{
Role: downgraded,
}
case *types.Namespace:
out.Resource = &proto.Event_Namespace{
Namespace: r,
}
case *types.ServerV2:
out.Resource = &proto.Event_Server{
Server: r,
}
case *types.ReverseTunnelV2:
out.Resource = &proto.Event_ReverseTunnel{
ReverseTunnel: r,
}
case *types.TunnelConnectionV2:
out.Resource = &proto.Event_TunnelConnection{
TunnelConnection: r,
}
case *types.AccessRequestV3:
out.Resource = &proto.Event_AccessRequest{
AccessRequest: r,
}
case *types.WebSessionV2:
switch r.GetSubKind() {
case types.KindAppSession:
out.Resource = &proto.Event_AppSession{
AppSession: r,
}
case types.KindWebSession:
out.Resource = &proto.Event_WebSession{
WebSession: r,
}
default:
return nil, trace.BadParameter("only %q supported", types.WebSessionSubKinds)
}
case *types.WebTokenV3:
out.Resource = &proto.Event_WebToken{
WebToken: r,
}
case *types.RemoteClusterV3:
out.Resource = &proto.Event_RemoteCluster{
RemoteCluster: r,
}
case *types.AppServerV3:
out.Resource = &proto.Event_AppServer{
AppServer: r,
}
case *types.DatabaseServerV3:
out.Resource = &proto.Event_DatabaseServer{
DatabaseServer: r,
}
case *types.DatabaseV3:
out.Resource = &proto.Event_Database{
Database: r,
}
case *types.AppV3:
out.Resource = &proto.Event_App{
App: r,
}
case *types.ClusterAuditConfigV2:
out.Resource = &proto.Event_ClusterAuditConfig{
ClusterAuditConfig: r,
}
case *types.ClusterNetworkingConfigV2:
out.Resource = &proto.Event_ClusterNetworkingConfig{
ClusterNetworkingConfig: r,
}
case *types.SessionRecordingConfigV2:
out.Resource = &proto.Event_SessionRecordingConfig{
SessionRecordingConfig: r,
}
case *types.AuthPreferenceV2:
out.Resource = &proto.Event_AuthPreference{
AuthPreference: r,
}
case *types.LockV2:
out.Resource = &proto.Event_Lock{
Lock: r,
}
case *types.NetworkRestrictionsV4:
out.Resource = &proto.Event_NetworkRestrictions{
NetworkRestrictions: r,
}
case *types.WindowsDesktopServiceV3:
out.Resource = &proto.Event_WindowsDesktopService{
WindowsDesktopService: r,
}
case *types.WindowsDesktopV3:
out.Resource = &proto.Event_WindowsDesktop{
WindowsDesktop: r,
}
default:
return nil, trace.BadParameter("resource type %T is not supported", in.Resource)
}
return &out, nil
}
func eventTypeToGRPC(in types.OpType) (proto.Operation, error) {
switch in {
case types.OpInit:
return proto.Operation_INIT, nil
case types.OpPut:
return proto.Operation_PUT, nil
case types.OpDelete:
return proto.Operation_DELETE, nil
default:
return -1, trace.BadParameter("event type %v is not supported", in)
}
}
func (g *GRPCServer) GenerateUserCerts(ctx context.Context, req *proto.UserCertsRequest) (*proto.Certs, error) {
auth, err := g.authenticate(ctx)
if err != nil {

View file

@ -24,6 +24,7 @@ import (
"testing"
"time"
"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
@ -2404,3 +2405,77 @@ func (p *proxyEvents) NewWatcher(ctx context.Context, watch types.Watch) (types.
p.watchers = append(p.watchers, w)
return w, nil
}
// TestCacheWatchKindExistsInEvents ensures that the watch kinds for each cache component are in sync
// with proto Events delivered via WatchEvents. If a watch kind is added to a cache component, but it
// doesn't exist in the proto Events definition, an error will cause the WatchEvents stream to be closed.
// This causes the cache to reinitialize every time that an unknown message is received and can lead to
// a permanently unhealthy cache.
//
// While this test will ensure that there are no issues for the current release, it does not guarantee
// that this issue won't arise across releases.
func TestCacheWatchKindExistsInEvents(t *testing.T) {
cases := map[string]Config{
"ForAuth": ForAuth(Config{}),
"ForProxy": ForProxy(Config{}),
"ForRemoteProxy": ForRemoteProxy(Config{}),
"ForOldRemoteProxy": ForOldRemoteProxy(Config{}),
"ForNode": ForNode(Config{}),
"ForKubernetes": ForKubernetes(Config{}),
"ForApps": ForApps(Config{}),
"ForDatabases": ForDatabases(Config{}),
}
events := map[string]types.Resource{
types.KindCertAuthority: &types.CertAuthorityV2{},
types.KindClusterName: &types.ClusterNameV2{},
types.KindClusterAuditConfig: types.DefaultClusterAuditConfig(),
types.KindClusterNetworkingConfig: types.DefaultClusterNetworkingConfig(),
types.KindClusterAuthPreference: types.DefaultAuthPreference(),
types.KindSessionRecordingConfig: types.DefaultSessionRecordingConfig(),
types.KindStaticTokens: &types.StaticTokensV2{},
types.KindToken: &types.ProvisionTokenV2{},
types.KindUser: &types.UserV2{},
types.KindRole: &types.RoleV5{Version: types.V4},
types.KindNamespace: &types.Namespace{},
types.KindNode: &types.ServerV2{},
types.KindProxy: &types.ServerV2{},
types.KindAuthServer: &types.ServerV2{},
types.KindReverseTunnel: &types.ReverseTunnelV2{},
types.KindTunnelConnection: &types.TunnelConnectionV2{},
types.KindAccessRequest: &types.AccessRequestV3{},
types.KindAppServer: &types.AppServerV3{},
types.KindApp: &types.AppV3{},
types.KindWebSession: &types.WebSessionV2{SubKind: types.KindWebSession},
types.KindAppSession: &types.WebSessionV2{SubKind: types.KindAppSession},
types.KindWebToken: &types.WebTokenV3{},
types.KindRemoteCluster: &types.RemoteClusterV3{},
types.KindKubeService: &types.ServerV2{},
types.KindDatabaseServer: &types.DatabaseServerV3{},
types.KindDatabase: &types.DatabaseV3{},
types.KindNetworkRestrictions: &types.NetworkRestrictionsV4{},
types.KindLock: &types.LockV2{},
types.KindWindowsDesktopService: &types.WindowsDesktopServiceV3{},
types.KindWindowsDesktop: &types.WindowsDesktopV3{},
}
for name, cfg := range cases {
t.Run(name, func(t *testing.T) {
for _, watch := range cfg.Watches {
resource, ok := events[watch.Kind]
require.True(t, ok, "missing event for kind %q", watch.Kind)
protoEvent, err := client.EventToGRPC(types.Event{
Type: types.OpPut,
Resource: resource,
})
require.NoError(t, err)
event, err := client.EventFromGRPC(*protoEvent)
require.NoError(t, err)
require.Empty(t, cmp.Diff(resource, event.Resource))
}
})
}
}