diff --git a/lib/kube/proxy/cluster_details.go b/lib/kube/proxy/cluster_details.go index fad0e385215..0ac7a2c0149 100644 --- a/lib/kube/proxy/cluster_details.go +++ b/lib/kube/proxy/cluster_details.go @@ -37,6 +37,7 @@ import ( "k8s.io/client-go/tools/clientcmd" "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/cloud" "github.com/gravitational/teleport/lib/cloud/azure" "github.com/gravitational/teleport/lib/cloud/gcp" @@ -102,6 +103,9 @@ type clusterDetailsConfig struct { component KubeServiceType } +const defaultRefreshPeriod = 5 * time.Minute +const backoffRefreshStep = 10 * time.Second + // newClusterDetails creates a proxied kubeDetails structure given a dynamic cluster. func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDetails, err error) { creds := cfg.kubeCreds @@ -159,19 +163,43 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe gvkSupportedResources: gvkSupportedRes, } + // If cluster is online and there's no errors, we refresh details seldom (every 5 minutes), + // but if the cluster is offline, we try to refresh details more often to catch it getting back online earlier. + firstPeriod := defaultRefreshPeriod + if isClusterOffline { + firstPeriod = backoffRefreshStep + } + refreshDelay, err := retryutils.NewLinear(retryutils.LinearConfig{ + First: firstPeriod, + Step: backoffRefreshStep, + Max: defaultRefreshPeriod, + Jitter: retryutils.NewSeventhJitter(), + Clock: cfg.clock, + }) + if err != nil { + k.Close() + return nil, trace.Wrap(err) + } + k.wg.Add(1) // Start the periodic update of the codec factory and the list of supported types for RBAC. go func() { defer k.wg.Done() - ticker := cfg.clock.NewTicker(5 * time.Minute) - defer ticker.Stop() + for { select { case <-ctx.Done(): return - case <-ticker.Chan(): + case <-refreshDelay.After(): codecFactory, rbacSupportedTypes, gvkSupportedResources, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient()) if err != nil { + // If this is first time we get an error, we reset retry mechanism so it will start trying to refresh details quicker, with linear backoff. + if refreshDelay.First == defaultRefreshPeriod { + refreshDelay.First = backoffRefreshStep + refreshDelay.Reset() + } else { + refreshDelay.Inc() + } cfg.log.WithError(err).Error("Failed to update cluster schema") continue } @@ -181,6 +209,9 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe cfg.log.WithError(err).Warn("Failed to get Kubernetes cluster version. Possibly the cluster is offline.") } + // Restore details refresh delay to the default value, in case previously cluster was offline. + refreshDelay.First = defaultRefreshPeriod + k.rwMu.Lock() k.kubeCodecs = codecFactory k.rbacSupportedTypes = rbacSupportedTypes diff --git a/lib/kube/proxy/cluster_details_test.go b/lib/kube/proxy/cluster_details_test.go new file mode 100644 index 00000000000..116a575bc41 --- /dev/null +++ b/lib/kube/proxy/cluster_details_test.go @@ -0,0 +1,148 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package proxy + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/version" + "k8s.io/client-go/discovery" + "k8s.io/client-go/kubernetes" + + "github.com/gravitational/teleport/api/types" +) + +func TestNewClusterDetails(t *testing.T) { + t.Parallel() + ctx := context.Background() + log := logrus.New().WithContext(ctx) + + getClusterDetailsConfig := func(c clockwork.FakeClock) (clusterDetailsConfig, *clusterDetailsClientSet) { + client := &clusterDetailsClientSet{} + return clusterDetailsConfig{ + kubeCreds: &staticKubeCreds{ + kubeClient: client, + }, + cluster: &types.KubernetesClusterV3{}, + log: log, + clock: c, + }, client + } + + t.Run("normal operation", func(t *testing.T) { + clock := clockwork.NewFakeClock() + config, client := getClusterDetailsConfig(clock) + details, err := newClusterDetails(ctx, config) + + require.NoError(t, err) + require.NotNil(t, details) + require.Equal(t, 1, client.GetCalledTimes()) + + clock.BlockUntil(1) + + // Advancing by short period doesn't cause another details refresh, since in normal state refresh interval + // is long. + clock.Advance(backoffRefreshStep + time.Second) + clock.BlockUntil(1) + require.Equal(t, 1, client.GetCalledTimes()) + + // Advancing by the default interval period causes another details refresh. + clock.Advance(defaultRefreshPeriod + time.Second) + clock.BlockUntil(1) + require.Equal(t, 2, client.GetCalledTimes()) + }) + + t.Run("first time has failed, second time it's restored", func(t *testing.T) { + clock := clockwork.NewFakeClock() + config, client := getClusterDetailsConfig(clock) + client.discoveryErr = errors.New("error") + details, err := newClusterDetails(ctx, config) + + require.NoError(t, err) + require.NotNil(t, details) + require.True(t, details.isClusterOffline) + require.Equal(t, 1, client.GetCalledTimes()) + + clock.BlockUntil(1) + + client.discoveryErr = nil + + // Advancing by short interval causes details refresh because we're in a bad state, and trying to + // refresh details more often. + clock.Advance(backoffRefreshStep + time.Second) + clock.BlockUntil(1) + + require.Equal(t, 2, client.GetCalledTimes()) + require.False(t, details.isClusterOffline) + + // After we've restored normal state advancing by short interval doesn't cause details refresh. + clock.Advance(backoffRefreshStep + time.Second) + clock.BlockUntil(1) + require.Equal(t, 2, client.GetCalledTimes()) + + // Advancing by the default interval period causes another details refresh. + clock.Advance(defaultRefreshPeriod + time.Second) + clock.BlockUntil(1) + require.Equal(t, 3, client.GetCalledTimes()) + }) + +} + +type clusterDetailsClientSet struct { + kubernetes.Interface + discovery.DiscoveryInterface + + discoveryErr error + calledTimes atomic.Int32 +} + +func (c *clusterDetailsClientSet) Discovery() discovery.DiscoveryInterface { + return c +} + +func (c *clusterDetailsClientSet) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) { + c.calledTimes.Add(1) + if c.discoveryErr != nil { + return nil, nil, c.discoveryErr + } + + return nil, []*metav1.APIResourceList{ + &fakeAPIResource, + }, nil +} + +func (c *clusterDetailsClientSet) ServerVersion() (*version.Info, error) { + return &version.Info{ + Major: "1", + Minor: "29", + GitVersion: "v1.29.0", + }, nil +} + +func (c *clusterDetailsClientSet) GetCalledTimes() int { + return int(c.calledTimes.Load()) +} diff --git a/lib/kube/proxy/kube_creds.go b/lib/kube/proxy/kube_creds.go index 55c393b4a46..19fd6edb2bd 100644 --- a/lib/kube/proxy/kube_creds.go +++ b/lib/kube/proxy/kube_creds.go @@ -42,7 +42,7 @@ type kubeCreds interface { getTransportConfig() *transport.Config getTargetAddr() string getKubeRestConfig() *rest.Config - getKubeClient() *kubernetes.Clientset + getKubeClient() kubernetes.Interface getTransport() http.RoundTripper wrapTransport(http.RoundTripper) (http.RoundTripper, error) close() error @@ -65,7 +65,7 @@ type staticKubeCreds struct { transportConfig *transport.Config // targetAddr is a kubernetes API address. targetAddr string - kubeClient *kubernetes.Clientset + kubeClient kubernetes.Interface // clientRestCfg is the Kubernetes Rest config for the cluster. clientRestCfg *rest.Config transport http.RoundTripper @@ -87,7 +87,7 @@ func (s *staticKubeCreds) getTargetAddr() string { return s.targetAddr } -func (s *staticKubeCreds) getKubeClient() *kubernetes.Clientset { +func (s *staticKubeCreds) getKubeClient() kubernetes.Interface { return s.kubeClient } @@ -257,7 +257,7 @@ func (d *dynamicKubeCreds) getTargetAddr() string { return d.staticCreds.targetAddr } -func (d *dynamicKubeCreds) getKubeClient() *kubernetes.Clientset { +func (d *dynamicKubeCreds) getKubeClient() kubernetes.Interface { d.RLock() defer d.RUnlock() return d.staticCreds.kubeClient diff --git a/lib/kube/proxy/scheme_test.go b/lib/kube/proxy/scheme_test.go index b0348456765..d13dbbc94df 100644 --- a/lib/kube/proxy/scheme_test.go +++ b/lib/kube/proxy/scheme_test.go @@ -44,17 +44,19 @@ func (c *clientSet) Discovery() discovery.DiscoveryInterface { return c } +var fakeAPIResource = metav1.APIResourceList{ + GroupVersion: "extensions/v1beta1", + APIResources: []metav1.APIResource{ + { + Name: "ingresses", + Kind: "Ingress", + Namespaced: true, + }, + }, +} + func (c *clientSet) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) { return nil, []*metav1.APIResourceList{ - { - GroupVersion: "extensions/v1beta1", - APIResources: []metav1.APIResource{ - { - Name: "ingresses", - Kind: "Ingress", - Namespaced: true, - }, - }, - }, + &fakeAPIResource, }, nil }