Add quicker retry to get kube cluster details if the cluster was offline (#43903)

* Add quicker retry to get kube cluster details if the cluster goes offline.

* Add kubedetails close call on error.

Although this error will never actually happen in practice.
This commit is contained in:
Anton Miniailo 2024-07-09 12:41:53 -04:00 committed by GitHub
parent c97533ed22
commit 96e7cc3bb5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 198 additions and 17 deletions

View file

@ -37,6 +37,7 @@ import (
"k8s.io/client-go/tools/clientcmd"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/cloud"
"github.com/gravitational/teleport/lib/cloud/azure"
"github.com/gravitational/teleport/lib/cloud/gcp"
@ -102,6 +103,9 @@ type clusterDetailsConfig struct {
component KubeServiceType
}
const defaultRefreshPeriod = 5 * time.Minute
const backoffRefreshStep = 10 * time.Second
// newClusterDetails creates a proxied kubeDetails structure given a dynamic cluster.
func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDetails, err error) {
creds := cfg.kubeCreds
@ -159,19 +163,43 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
gvkSupportedResources: gvkSupportedRes,
}
// If cluster is online and there's no errors, we refresh details seldom (every 5 minutes),
// but if the cluster is offline, we try to refresh details more often to catch it getting back online earlier.
firstPeriod := defaultRefreshPeriod
if isClusterOffline {
firstPeriod = backoffRefreshStep
}
refreshDelay, err := retryutils.NewLinear(retryutils.LinearConfig{
First: firstPeriod,
Step: backoffRefreshStep,
Max: defaultRefreshPeriod,
Jitter: retryutils.NewSeventhJitter(),
Clock: cfg.clock,
})
if err != nil {
k.Close()
return nil, trace.Wrap(err)
}
k.wg.Add(1)
// Start the periodic update of the codec factory and the list of supported types for RBAC.
go func() {
defer k.wg.Done()
ticker := cfg.clock.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.Chan():
case <-refreshDelay.After():
codecFactory, rbacSupportedTypes, gvkSupportedResources, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient())
if err != nil {
// If this is first time we get an error, we reset retry mechanism so it will start trying to refresh details quicker, with linear backoff.
if refreshDelay.First == defaultRefreshPeriod {
refreshDelay.First = backoffRefreshStep
refreshDelay.Reset()
} else {
refreshDelay.Inc()
}
cfg.log.WithError(err).Error("Failed to update cluster schema")
continue
}
@ -181,6 +209,9 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
cfg.log.WithError(err).Warn("Failed to get Kubernetes cluster version. Possibly the cluster is offline.")
}
// Restore details refresh delay to the default value, in case previously cluster was offline.
refreshDelay.First = defaultRefreshPeriod
k.rwMu.Lock()
k.kubeCodecs = codecFactory
k.rbacSupportedTypes = rbacSupportedTypes

View file

@ -0,0 +1,148 @@
/*
* Teleport
* Copyright (C) 2024 Gravitational, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package proxy
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"github.com/gravitational/teleport/api/types"
)
func TestNewClusterDetails(t *testing.T) {
t.Parallel()
ctx := context.Background()
log := logrus.New().WithContext(ctx)
getClusterDetailsConfig := func(c clockwork.FakeClock) (clusterDetailsConfig, *clusterDetailsClientSet) {
client := &clusterDetailsClientSet{}
return clusterDetailsConfig{
kubeCreds: &staticKubeCreds{
kubeClient: client,
},
cluster: &types.KubernetesClusterV3{},
log: log,
clock: c,
}, client
}
t.Run("normal operation", func(t *testing.T) {
clock := clockwork.NewFakeClock()
config, client := getClusterDetailsConfig(clock)
details, err := newClusterDetails(ctx, config)
require.NoError(t, err)
require.NotNil(t, details)
require.Equal(t, 1, client.GetCalledTimes())
clock.BlockUntil(1)
// Advancing by short period doesn't cause another details refresh, since in normal state refresh interval
// is long.
clock.Advance(backoffRefreshStep + time.Second)
clock.BlockUntil(1)
require.Equal(t, 1, client.GetCalledTimes())
// Advancing by the default interval period causes another details refresh.
clock.Advance(defaultRefreshPeriod + time.Second)
clock.BlockUntil(1)
require.Equal(t, 2, client.GetCalledTimes())
})
t.Run("first time has failed, second time it's restored", func(t *testing.T) {
clock := clockwork.NewFakeClock()
config, client := getClusterDetailsConfig(clock)
client.discoveryErr = errors.New("error")
details, err := newClusterDetails(ctx, config)
require.NoError(t, err)
require.NotNil(t, details)
require.True(t, details.isClusterOffline)
require.Equal(t, 1, client.GetCalledTimes())
clock.BlockUntil(1)
client.discoveryErr = nil
// Advancing by short interval causes details refresh because we're in a bad state, and trying to
// refresh details more often.
clock.Advance(backoffRefreshStep + time.Second)
clock.BlockUntil(1)
require.Equal(t, 2, client.GetCalledTimes())
require.False(t, details.isClusterOffline)
// After we've restored normal state advancing by short interval doesn't cause details refresh.
clock.Advance(backoffRefreshStep + time.Second)
clock.BlockUntil(1)
require.Equal(t, 2, client.GetCalledTimes())
// Advancing by the default interval period causes another details refresh.
clock.Advance(defaultRefreshPeriod + time.Second)
clock.BlockUntil(1)
require.Equal(t, 3, client.GetCalledTimes())
})
}
type clusterDetailsClientSet struct {
kubernetes.Interface
discovery.DiscoveryInterface
discoveryErr error
calledTimes atomic.Int32
}
func (c *clusterDetailsClientSet) Discovery() discovery.DiscoveryInterface {
return c
}
func (c *clusterDetailsClientSet) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
c.calledTimes.Add(1)
if c.discoveryErr != nil {
return nil, nil, c.discoveryErr
}
return nil, []*metav1.APIResourceList{
&fakeAPIResource,
}, nil
}
func (c *clusterDetailsClientSet) ServerVersion() (*version.Info, error) {
return &version.Info{
Major: "1",
Minor: "29",
GitVersion: "v1.29.0",
}, nil
}
func (c *clusterDetailsClientSet) GetCalledTimes() int {
return int(c.calledTimes.Load())
}

View file

@ -42,7 +42,7 @@ type kubeCreds interface {
getTransportConfig() *transport.Config
getTargetAddr() string
getKubeRestConfig() *rest.Config
getKubeClient() *kubernetes.Clientset
getKubeClient() kubernetes.Interface
getTransport() http.RoundTripper
wrapTransport(http.RoundTripper) (http.RoundTripper, error)
close() error
@ -65,7 +65,7 @@ type staticKubeCreds struct {
transportConfig *transport.Config
// targetAddr is a kubernetes API address.
targetAddr string
kubeClient *kubernetes.Clientset
kubeClient kubernetes.Interface
// clientRestCfg is the Kubernetes Rest config for the cluster.
clientRestCfg *rest.Config
transport http.RoundTripper
@ -87,7 +87,7 @@ func (s *staticKubeCreds) getTargetAddr() string {
return s.targetAddr
}
func (s *staticKubeCreds) getKubeClient() *kubernetes.Clientset {
func (s *staticKubeCreds) getKubeClient() kubernetes.Interface {
return s.kubeClient
}
@ -257,7 +257,7 @@ func (d *dynamicKubeCreds) getTargetAddr() string {
return d.staticCreds.targetAddr
}
func (d *dynamicKubeCreds) getKubeClient() *kubernetes.Clientset {
func (d *dynamicKubeCreds) getKubeClient() kubernetes.Interface {
d.RLock()
defer d.RUnlock()
return d.staticCreds.kubeClient

View file

@ -44,17 +44,19 @@ func (c *clientSet) Discovery() discovery.DiscoveryInterface {
return c
}
var fakeAPIResource = metav1.APIResourceList{
GroupVersion: "extensions/v1beta1",
APIResources: []metav1.APIResource{
{
Name: "ingresses",
Kind: "Ingress",
Namespaced: true,
},
},
}
func (c *clientSet) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
return nil, []*metav1.APIResourceList{
{
GroupVersion: "extensions/v1beta1",
APIResources: []metav1.APIResource{
{
Name: "ingresses",
Kind: "Ingress",
Namespaced: true,
},
},
},
&fakeAPIResource,
}, nil
}