Remove legacy heartbeat from Kubernetes Service (#18876)

With the arrival of Teleport 12, we stop supporting v10.x.x clients, and we no longer require the `types.KindKubernetesService` legacy heartbeat.

This change removes the legacy heartbeat from Kubernetes Service but keeps the legacy Auth Server CRUD methods and heartbeat support to maintain compatibility with Teleport 11 Kubernetes Service - Teleport 11 still heartbeats the legacy type.

We postponed the related `DELETES` to Teleport 13.
This commit is contained in:
Tiago Silva 2022-12-01 13:16:41 +00:00 committed by GitHub
parent 710b7d18ef
commit da987dfc6b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 49 additions and 152 deletions

View file

@ -977,7 +977,7 @@ func (c *Client) GetKubernetesServers(ctx context.Context) ([]types.KubeServer,
if err != nil {
// Underlying ListResources for kube server was not available, use fallback KubeService.
// ListResources returns NotImplemented if ResourceType is unknown.
// DELETE IN 12.0.0
// DELETE IN 13.0.0
if trace.IsNotImplemented(err) {
return c.getKubeServersFallback(ctx)
}
@ -1001,7 +1001,7 @@ func (c *Client) GetKubernetesServers(ctx context.Context) ([]types.KubeServer,
// getKubeServersFallback previous implementation of `GetKubeServers` function
// using `GetKubeServices` call.
// DELETE IN 12.0.0
// DELETE IN 13.0.0
func (c *Client) getKubeServersFallback(ctx context.Context) ([]types.KubeServer, error) {
resources, err := c.GetKubeServices(ctx)
if err != nil {
@ -1035,7 +1035,7 @@ func (c *Client) DeleteKubernetesServer(ctx context.Context, hostID, name string
// deleteKubeServerFallback deletes a named Kube Service using legacy API call
// `DeleteKubeService`.
//
// DELETE IN 12.0.0
// DELETE IN 13.0.0
func (c *Client) deleteKubeServerFallback(ctx context.Context, name string) error {
err := c.DeleteKubeService(ctx, name)
return trace.Wrap(err)
@ -1056,7 +1056,7 @@ func (c *Client) DeleteAllKubernetesServers(ctx context.Context) error {
// deleteAllKubeServersFallback deletes all kubernetes servers using legacy API call
// `DeleteAllKubeServices`.
//
// DELETE IN 12.0.0
// DELETE IN 13.0.0
func (c *Client) deleteAllKubernetesServersFallback(ctx context.Context) error {
err := c.DeleteAllKubeServices(ctx)
return trace.Wrap(err)
@ -1078,7 +1078,7 @@ func (c *Client) UpsertKubernetesServer(ctx context.Context, s types.KubeServer)
// UpsertKubeService is used by kubernetes services to report their presence
// to other auth servers in form of hearbeat expiring after ttl period.
// DELETE IN 12.0.0
// DELETE IN 13.0.0
func (c *Client) UpsertKubeService(ctx context.Context, s types.Server) error {
server, ok := s.(*types.ServerV2)
if !ok {
@ -1092,7 +1092,7 @@ func (c *Client) UpsertKubeService(ctx context.Context, s types.Server) error {
// UpsertKubeServiceV2 is used by kubernetes services to report their presence
// to other auth servers in form of hearbeat expiring after ttl period.
// DELETE IN 12.0.0
// DELETE IN 13.0.0
func (c *Client) UpsertKubeServiceV2(ctx context.Context, s types.Server) (*types.KeepAlive, error) {
server, ok := s.(*types.ServerV2)
if !ok {
@ -1107,7 +1107,7 @@ func (c *Client) UpsertKubeServiceV2(ctx context.Context, s types.Server) (*type
// GetKubeServices returns the list of kubernetes services registered in the
// cluster.
// DELETE IN 12.0.0
// DELETE IN 13.0.0
func (c *Client) GetKubeServices(ctx context.Context) ([]types.Server, error) {
resources, err := GetResourcesWithFilters(ctx, c, proto.ListResourcesRequest{
Namespace: defaults.Namespace,
@ -1328,7 +1328,7 @@ func (c *Client) GenerateSnowflakeJWT(ctx context.Context, req types.GenerateSno
}
// DeleteKubeService deletes a named kubernetes service.
// DELETE IN 12.0.0
// DELETE IN 13.0.0
func (c *Client) DeleteKubeService(ctx context.Context, name string) error {
_, err := c.grpc.DeleteKubeService(ctx, &proto.DeleteKubeServiceRequest{
Name: name,
@ -1337,7 +1337,7 @@ func (c *Client) DeleteKubeService(ctx context.Context, name string) error {
}
// DeleteAllKubeServices deletes all registered kubernetes services.
// DELETE IN 12.0.0
// DELETE IN 13.0.0
func (c *Client) DeleteAllKubeServices(ctx context.Context) error {
_, err := c.grpc.DeleteAllKubeServices(ctx, &proto.DeleteAllKubeServicesRequest{}, c.callOpts...)
return trace.Wrap(err)

View file

@ -4234,7 +4234,7 @@ func (m *GetWebTokensResponse) GetTokens() []*types.WebTokenV3 {
// UpsertKubeServiceRequest are the parameters used to add or update a
// kubernetes service.
// DELETE IN 12.0.0
// DELETE IN 13.0.0
type UpsertKubeServiceRequest struct {
Server *types.ServerV2 `protobuf:"bytes,1,opt,name=Server,proto3" json:"server"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
@ -4283,7 +4283,7 @@ func (m *UpsertKubeServiceRequest) GetServer() *types.ServerV2 {
}
// DeleteKubeServiceRequest are the parameters used to remove a kubernetes service.
// DELETE IN 12.0.0
// DELETE IN 13.0.0
type DeleteKubeServiceRequest struct {
// Name is the name of the kubernetes service to delete.
Name string `protobuf:"bytes,2,opt,name=Name,proto3" json:"name"`
@ -4333,7 +4333,7 @@ func (m *DeleteKubeServiceRequest) GetName() string {
}
// DeleteAllKubeServicesRequest are the parameters used to remove all kubernetes services.
// DELETE IN 12.0.0
// DELETE IN 13.0.0
type DeleteAllKubeServicesRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
@ -12812,13 +12812,13 @@ type AuthServiceClient interface {
// DELETE IN 11.0. Deprecated, use UpsertKubeServiceV2
UpsertKubeService(ctx context.Context, in *UpsertKubeServiceRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
// UpsertKubeServiceV2 adds or updates a kubernetes service.
// DELETE IN 12.0. Deprecated, use UpsertKubernetesServer
// DELETE IN 13.0. Deprecated, use UpsertKubernetesServer
UpsertKubeServiceV2(ctx context.Context, in *UpsertKubeServiceRequest, opts ...grpc.CallOption) (*types.KeepAlive, error)
// DeleteKubeService removes a kubernetes service.
// DELETE IN 12.0. Deprecated, use DeleteKubernetesServer
// DELETE IN 13.0. Deprecated, use DeleteKubernetesServer
DeleteKubeService(ctx context.Context, in *DeleteKubeServiceRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
// DeleteAllKubeServices removes all kubernetes services.
// DELETE IN 12.0. Deprecated, use DeleteAllKubernetesServers
// DELETE IN 13.0. Deprecated, use DeleteAllKubernetesServers
DeleteAllKubeServices(ctx context.Context, in *DeleteAllKubeServicesRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
// UpsertKubernetesServer adds or updates a kubernetes server.
UpsertKubernetesServer(ctx context.Context, in *UpsertKubernetesServerRequest, opts ...grpc.CallOption) (*types.KeepAlive, error)
@ -15479,13 +15479,13 @@ type AuthServiceServer interface {
// DELETE IN 11.0. Deprecated, use UpsertKubeServiceV2
UpsertKubeService(context.Context, *UpsertKubeServiceRequest) (*emptypb.Empty, error)
// UpsertKubeServiceV2 adds or updates a kubernetes service.
// DELETE IN 12.0. Deprecated, use UpsertKubernetesServer
// DELETE IN 13.0. Deprecated, use UpsertKubernetesServer
UpsertKubeServiceV2(context.Context, *UpsertKubeServiceRequest) (*types.KeepAlive, error)
// DeleteKubeService removes a kubernetes service.
// DELETE IN 12.0. Deprecated, use DeleteKubernetesServer
// DELETE IN 13.0. Deprecated, use DeleteKubernetesServer
DeleteKubeService(context.Context, *DeleteKubeServiceRequest) (*emptypb.Empty, error)
// DeleteAllKubeServices removes all kubernetes services.
// DELETE IN 12.0. Deprecated, use DeleteAllKubernetesServers
// DELETE IN 13.0. Deprecated, use DeleteAllKubernetesServers
DeleteAllKubeServices(context.Context, *DeleteAllKubeServicesRequest) (*emptypb.Empty, error)
// UpsertKubernetesServer adds or updates a kubernetes server.
UpsertKubernetesServer(context.Context, *UpsertKubernetesServerRequest) (*types.KeepAlive, error)

View file

@ -749,20 +749,20 @@ message GetWebTokensResponse {
// UpsertKubeServiceRequest are the parameters used to add or update a
// kubernetes service.
// DELETE IN 12.0.0
// DELETE IN 13.0.0
message UpsertKubeServiceRequest {
types.ServerV2 Server = 1 [(gogoproto.jsontag) = "server"];
}
// DeleteKubeServiceRequest are the parameters used to remove a kubernetes service.
// DELETE IN 12.0.0
// DELETE IN 13.0.0
message DeleteKubeServiceRequest {
// Name is the name of the kubernetes service to delete.
string Name = 2 [(gogoproto.jsontag) = "name"];
}
// DeleteAllKubeServicesRequest are the parameters used to remove all kubernetes services.
// DELETE IN 12.0.0
// DELETE IN 13.0.0
message DeleteAllKubeServicesRequest {}
// UpsertKubernetesServerRequest are the parameters used to add or update a
@ -1550,7 +1550,7 @@ message PaginatedResource {
// Nodes represents a Server resource.
types.ServerV2 Node = 3 [(gogoproto.jsontag) = "node,omitempty"];
// KubeService represents a KubernetesService resource.
// DELETE IN 12.0.0. Deprecated, use KubernetesService field.
// DELETE IN 13.0.0. Deprecated, use KubernetesService field.
types.ServerV2 KubeService = 4 [(gogoproto.jsontag) = "kube_service,omitempty"];
// WindowsDesktop represents a WindowsDesktop resource.
types.WindowsDesktopV3 WindowsDesktop = 5 [(gogoproto.jsontag) = "windows_desktop,omitempty"];
@ -2108,19 +2108,19 @@ service AuthService {
}
// UpsertKubeServiceV2 adds or updates a kubernetes service.
// DELETE IN 12.0. Deprecated, use UpsertKubernetesServer
// DELETE IN 13.0. Deprecated, use UpsertKubernetesServer
rpc UpsertKubeServiceV2(UpsertKubeServiceRequest) returns (types.KeepAlive) {
option deprecated = true;
}
// DeleteKubeService removes a kubernetes service.
// DELETE IN 12.0. Deprecated, use DeleteKubernetesServer
// DELETE IN 13.0. Deprecated, use DeleteKubernetesServer
rpc DeleteKubeService(DeleteKubeServiceRequest) returns (google.protobuf.Empty) {
option deprecated = true;
}
// DeleteAllKubeServices removes all kubernetes services.
// DELETE IN 12.0. Deprecated, use DeleteAllKubernetesServers
// DELETE IN 13.0. Deprecated, use DeleteAllKubernetesServers
rpc DeleteAllKubeServices(DeleteAllKubeServicesRequest) returns (google.protobuf.Empty) {
option deprecated = true;
}

View file

@ -229,7 +229,7 @@ const (
KindState = "state"
// KindKubeService is a kubernetes service resource
// DELETE in 12.0.0
// DELETE in 13.0.0
KindKubeService = "kube_service"
// KindMFADevice is an MFA device for a user.

View file

@ -80,7 +80,7 @@ func NewKubernetesServerV3FromCluster(cluster *KubernetesClusterV3, hostname, ho
// NewLegacyKubeServer creates legacy Kube server object. Used in tests.
//
// DELETE IN 12.0.0
// DELETE IN 13.0.0
func NewLegacyKubeServer(cluster *KubernetesClusterV3, hostname, hostID string) (Server, error) {
return NewServer(hostID, KindKubeService,
ServerSpecV2{
@ -97,7 +97,7 @@ func NewLegacyKubeServer(cluster *KubernetesClusterV3, hostname, hostID string)
// NewKubeServersV3FromServer creates a list of kube servers from a legacy Server resource.
//
// DELETE IN 12.0.0
// DELETE IN 13.0.0
func NewKubeServersV3FromServer(server Server) (result []KubeServer, err error) {
for _, legacyCluster := range server.GetKubernetesClusters() {
kubeCluster, err := NewKubernetesClusterV3FromLegacyCluster(server.GetNamespace(), legacyCluster)

View file

@ -74,10 +74,10 @@ type Server interface {
SetApps([]*App)
// GetKubeClusters returns the kubernetes clusters directly handled by this
// server.
// DELETE IN 12.0.0
// DELETE IN 13.0.0
GetKubernetesClusters() []*KubernetesCluster
// SetKubeClusters sets the kubernetes clusters handled by this server.
// DELETE IN 12.0.0
// DELETE IN 13.0.0
SetKubernetesClusters([]*KubernetesCluster)
// GetPeerAddr returns the peer address of the server.
GetPeerAddr() string

View file

@ -1890,7 +1890,7 @@ type ServerSpecV2 struct {
// Important: jsontag must not be "kubernetes_clusters", because a
// different field with that jsontag existed in 4.4:
// https://github.com/gravitational/teleport/issues/4862
// DELETE IN 12.0.0. Deprecated, moved to KubernetesServerSpecV3.
// DELETE IN 13.0.0. Deprecated, moved to KubernetesServerSpecV3.
KubernetesClusters []*KubernetesCluster `protobuf:"bytes,10,rep,name=KubernetesClusters,proto3" json:"kube_clusters,omitempty"`
// PeerAddr is the address a proxy server is reachable at by its peer proxies.
PeerAddr string `protobuf:"bytes,11,opt,name=PeerAddr,proto3" json:"peer_addr,omitempty"`

View file

@ -398,7 +398,7 @@ func TestEC2Labels(t *testing.T) {
// dedupClusters is required because GetKubernetesServers returns duplicated servers
// because it lists the KindKubeServer and KindKubeService.
// We must remove this once legacy heartbeat is removed.
// DELETE IN 12.0.0
// DELETE IN 13.0.0
var dedupClusters []types.KubeServer
dedup := map[string]struct{}{}
for _, kube := range kubes {

View file

@ -1378,7 +1378,7 @@ func (k *kubeChecker) CanAccess(resource types.Resource) error {
}
// canAccessKubernetesLegacy checks if the user has access to Kubernetes Cluster when represented by `types.ServerV2`.
// DELETE in 12.0.0
// DELETE in 13.0.0
func (k *kubeChecker) canAccessKubernetesLegacy(server types.Server) error {
// Filter out agents that don't have support for moderated sessions access
// checking if the user has any roles that require it.
@ -4283,7 +4283,7 @@ func (a *ServerWithRoles) DeleteAllKubernetesServers(ctx context.Context) error
// GetKubeServices returns all Servers representing teleport kubernetes
// services.
// DELETE in 12.0.0
// DELETE in 13.0.0
func (a *ServerWithRoles) GetKubeServices(ctx context.Context) ([]types.Server, error) {
if err := a.action(apidefaults.Namespace, types.KindKubeService, types.VerbList, types.VerbRead); err != nil {
return nil, trace.Wrap(err)

2
lib/cache/cache.go vendored
View file

@ -1828,7 +1828,7 @@ func (c *Cache) GetAllTunnelConnections(opts ...services.MarshalOption) (conns [
// GetKubeServices is a part of auth.Cache implementation
//
// DELETE IN 12.0.0 Deprecated, use GetKubernetesServers.
// DELETE IN 13.0.0 Deprecated, use GetKubernetesServers.
func (c *Cache) GetKubeServices(ctx context.Context) ([]types.Server, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetKubeServices")
defer span.End()

View file

@ -21,7 +21,6 @@ import (
"crypto/tls"
"net"
"net/http"
"sort"
"sync"
"github.com/gravitational/trace"
@ -69,7 +68,7 @@ type TLSServerConfig struct {
OnReconcile func(types.KubeClusters)
// CloudClients is a set of cloud clients that Teleport supports.
CloudClients cloud.Clients
//StaticLabels is a map of static labels associated with this service.
// StaticLabels is a map of static labels associated with this service.
// Each cluster advertised by this kubernetes_service will include these static labels.
// If the service and a cluster define labels with the same key,
// service labels take precedence over cluster labels.
@ -142,10 +141,6 @@ type TLSServer struct {
// reconcileCh triggers reconciliation of proxied kube_clusters.
reconcileCh chan struct{}
log *logrus.Entry
// legacyHeartbeat is used to heartbeat clusters as KindKubeService in order to support older
// clients that do not support new KindKubeServer
// DELETE IN 12.0.0
legacyHeartbeat *srv.Heartbeat
}
// NewTLSServer returns new unstarted TLS server
@ -256,13 +251,7 @@ func (t *TLSServer) Serve(listener net.Listener) error {
// Close closes the server and cleans up all resources.
func (t *TLSServer) Close() error {
var (
errs []error
)
// Stop the legacy heartbeat resource watcher.
if t.legacyHeartbeat != nil {
errs = append(errs, t.legacyHeartbeat.Close())
}
var errs []error
for _, kubeCluster := range t.fwd.kubeClusters() {
errs = append(errs, t.unregisterKubeCluster(t.closeContext, kubeCluster.GetName()))
}
@ -419,11 +408,6 @@ func (t *TLSServer) startStaticClustersHeartbeat() error {
return trace.Wrap(err)
}
}
// start a legacy heartbeat
// DELETE in 12.0.0
if err := t.startLegacyHeartbeat(); err != nil {
return trace.Wrap(err)
}
} else {
t.log.Debug("No local kube credentials on proxy, will not start kubernetes_service heartbeats")
}
@ -443,91 +427,6 @@ func (t *TLSServer) stopHeartbeat(name string) error {
return trace.Wrap(heartbeat.Close())
}
// startLegacyHeartbeat starts an heartbeat for a legacy KubernetesService
// so older clients can still look into kube clusters.
// DELETE IN 12.0.0
func (t *TLSServer) startLegacyHeartbeat() (err error) {
t.legacyHeartbeat, err = srv.NewHeartbeat(srv.HeartbeatConfig{
Mode: srv.HeartbeatModeKube,
Context: t.Context,
Component: t.Component,
Announcer: t.AuthClient,
GetServerInfo: t.legacyGetServerInfo,
KeepAlivePeriod: apidefaults.ServerKeepAliveTTL(),
AnnouncePeriod: apidefaults.ServerAnnounceTTL/2 + utils.RandomDuration(apidefaults.ServerAnnounceTTL/10),
ServerTTL: apidefaults.ServerAnnounceTTL,
CheckPeriod: defaults.HeartbeatCheckPeriod,
Clock: t.Clock,
OnHeartbeat: t.OnHeartbeat,
})
if err != nil {
return trace.Wrap(err)
}
go t.legacyHeartbeat.Run()
return nil
}
// legacyGetServerInfo is used to heartbeat the clusters monitored by this service
// as old kubeServices so older clients can still look into kube clusters.
// DELETE IN 12.0.0
func (t *TLSServer) legacyGetServerInfo() (types.Resource, error) {
t.mu.Lock()
defer t.mu.Unlock()
var addr string
if t.TLSServerConfig.ForwarderConfig.PublicAddr != "" {
addr = t.TLSServerConfig.ForwarderConfig.PublicAddr
} else if t.listener != nil {
addr = t.listener.Addr().String()
}
// Both proxy and kubernetes services can run in the same instance (same
// HostID). Add a name suffix to make them distinct.
//
// Note: we *don't* want to add suffix for kubernetes_service!
// This breaks reverse tunnel routing, which uses server.Name.
name := t.HostID
if t.KubeServiceType != KubeService {
name += "-proxy_service"
}
kubeClusters := t.fwd.kubeClusters()
sort.Sort(kubeClusters)
legacyKubeClusters := make([]*types.KubernetesCluster, len(kubeClusters))
for i := range kubeClusters {
clusterName := kubeClusters[i].GetName()
heartbeatCluster, err := t.getKubeClusterForHeartbeat(clusterName)
if err != nil {
t.Log.WithError(err).Warnf("Unable to find %q cluster.", clusterName)
continue
}
legacyKubeClusters[i] = &types.KubernetesCluster{
Name: heartbeatCluster.GetName(),
StaticLabels: heartbeatCluster.GetStaticLabels(),
DynamicLabels: types.LabelsToV2(heartbeatCluster.GetDynamicLabels()),
}
}
srv := &types.ServerV2{
Kind: types.KindKubeService,
Version: types.V2,
Metadata: types.Metadata{
Name: name,
Namespace: t.Namespace,
},
Spec: types.ServerSpecV2{
Addr: addr,
Version: teleport.Version,
KubernetesClusters: legacyKubeClusters,
ProxyIDs: t.ConnectedProxyGetter.GetProxyIDs(),
},
}
srv.SetExpiry(t.Clock.Now().UTC().Add(apidefaults.ServerAnnounceTTL))
return srv, nil
}
// getServiceStaticLabels gets the labels that the server should present as static,
// which includes Cloud labels if available.
func (t *TLSServer) getServiceStaticLabels() map[string]string {

View file

@ -263,10 +263,12 @@ func TestHeartbeat(t *testing.T) {
kubeClusterGetter func(auth.ClientI) []string
}
tests := []struct {
name string
args args
name string
args args
wantEmpty bool
}{
{
// DELETE IN 13.0.0
name: "List KubeServices (legacy)",
args: args{
kubeClusterGetter: func(authClient auth.ClientI) []string {
@ -287,6 +289,7 @@ func TestHeartbeat(t *testing.T) {
return clusters
},
},
wantEmpty: true,
},
{
name: "List KubeServers",
@ -311,10 +314,12 @@ func TestHeartbeat(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
kubeClusters := tt.args.kubeClusterGetter(testCtx.authClient)
require.Equal(t, []string{kubeCluster1, kubeCluster2}, kubeClusters)
if tt.wantEmpty {
require.Empty(t, kubeClusters)
} else {
require.Equal(t, []string{kubeCluster1, kubeCluster2}, kubeClusters)
}
})
}
}

View file

@ -219,12 +219,6 @@ func setupTestContext(ctx context.Context, t *testing.T, cfg testConfig) *testCo
// Waits for len(clusters) heartbeats to start
waitForHeartbeats := len(cfg.clusters)
// we must also wait for the legacy heartbeat.
// FIXME (tigrato): his check was added to force
// the person that removes the legacy heartbeat to adapt this code as well
// in order to wait just for len(cfg.clusters).
_ = testCtx.kubeServer.legacyHeartbeat
waitForHeartbeats++
testCtx.startKubeService(t)

View file

@ -162,7 +162,6 @@ func KubeClusterNames(ctx context.Context, p KubeServicesPresence) ([]string, er
}
func extractAndSortKubeClusterNames(kubeServers []types.KubeServer) []string {
kubeClusters := extractAndSortKubeClusters(kubeServers)
kubeClusterNames := make([]string, len(kubeClusters))
for i := range kubeClusters {
@ -193,7 +192,7 @@ func ListKubeClustersWithFilters(ctx context.Context, p client.ListResourcesClie
resources, err := client.GetResourcesWithFilters(ctx, p, req)
if trace.IsNotImplemented(err) {
// DELETE IN 12.0.0
// DELETE IN 13.0.0
resources, err = listKubeClustersWithFiltersFallback(ctx, p, req)
if err != nil {
return nil, trace.Wrap(err)
@ -201,7 +200,7 @@ func ListKubeClustersWithFilters(ctx context.Context, p client.ListResourcesClie
} else if err != nil {
return nil, trace.Wrap(err)
} else {
// DELETE IN 12.0.0
// DELETE IN 13.0.0
resourceKubeService, err := listKubeClustersWithFiltersFallback(ctx, p, req)
if err != nil {
return nil, trace.Wrap(err)