Kubernetes License check for Kubernetes Proxies (#24543)

* Kubernetes License check for root clusters

This PR ensures that a given Teleport root cluster is licensed for
Kubernetes access when forwarding credentials to leaf clusters.

Previously, the root Kube proxy would call auth server to generate a new
cert-key pair with the user identity and during the call Auth ensured
that it was licensed for Kubernetes usage. Given the recent developments
for #22533, the call mentioned was removed and it's now possible for a
root enterprise cluster to forward requests to a OSS leaf cluster
without validating its license.

Auth server already enforces Kubernetes license for Kubernetes service
when it tries to heartbeat the cluster. If the cluster is not properly
licensed, it's not possible to register kubernetes clusters.

Part of #22533

* remove watcher and use in process.getClusterFeatures callback

* add test to validate kube api license enforcement

* move err code to forbidden
This commit is contained in:
Tiago Silva 2023-04-18 10:37:14 +01:00 committed by GitHub
parent 689dce425f
commit 3ad31fb222
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 127 additions and 6 deletions

View file

@ -64,6 +64,7 @@ import (
kubeexec "k8s.io/client-go/util/exec"
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/constants"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/observability/tracing"
@ -171,8 +172,14 @@ type ForwarderConfig struct {
// the upstream Teleport proxy or Kubernetes service when forwarding requests
// using the forward identity (i.e. proxy impersonating a user) method.
ConnTLSConfig *tls.Config
// ClusterFeaturesGetter is a function that returns the Teleport cluster licensed features.
// It is used to determine if the cluster is licensed for Kubernetes usage.
ClusterFeatures ClusterFeaturesGetter
}
// ClusterFeaturesGetter is a function that returns the Teleport cluster licensed features.
type ClusterFeaturesGetter func() proto.Features
// CheckAndSetDefaults checks and sets default values
func (f *ForwarderConfig) CheckAndSetDefaults() error {
if f.AuthClient == nil {
@ -199,6 +206,9 @@ func (f *ForwarderConfig) CheckAndSetDefaults() error {
if f.HostID == "" {
return trace.BadParameter("missing parameter ServerID")
}
if f.ClusterFeatures == nil {
return trace.BadParameter("missing parameter ClusterFeatures")
}
if f.KubeServiceType != KubeService && f.PROXYSigner == nil {
return trace.BadParameter("missing parameter PROXYSigner")
}
@ -274,7 +284,6 @@ func NewForwarder(cfg ForwarderConfig) (*Forwarder, error) {
}
closeCtx, close := context.WithCancel(cfg.Context)
fwd := &Forwarder{
log: cfg.log,
cfg: cfg,
@ -290,6 +299,7 @@ func NewForwarder(cfg ForwarderConfig) (*Forwarder, error) {
clusterDetails: make(map[string]*kubeDetails),
cachedTransport: transportClients,
}
router := httprouter.New()
router.UseRawPath = true
@ -339,6 +349,7 @@ func NewForwarder(cfg ForwarderConfig) (*Forwarder, error) {
return nil, trace.Wrap(err)
}
}
return fwd, nil
}
@ -498,6 +509,11 @@ const accessDeniedMsg = "[00] access denied"
// authenticate function authenticates request
func (f *Forwarder) authenticate(req *http.Request) (*authContext, error) {
// If the cluster is not licensed for Kubernetes, return an error to the client.
if !f.cfg.ClusterFeatures().Kubernetes {
// If the cluster is not licensed for Kubernetes, return an error to the client.
return nil, trace.AccessDenied("Teleport cluster is not licensed for Kubernetes")
}
ctx, span := f.cfg.tracer.Start(
req.Context(),
"kube.Forwarder/authenticate",
@ -711,14 +727,15 @@ func (f *Forwarder) writeResponseErrorToBody(rw http.ResponseWriter, respErr err
// formatStatusResponseError formats the error response into a kube Status object.
func (f *Forwarder) formatStatusResponseError(rw http.ResponseWriter, respErr error) {
code := trace.ErrorToCode(respErr)
status := &metav1.Status{
Status: metav1.StatusFailure,
// Don't trace.Unwrap the error, in case it was wrapped with a
// user-friendly message. The underlying root error is likely too
// low-level to be useful.
Message: respErr.Error(),
Code: int32(trace.ErrorToCode(respErr)),
Reason: errorToKubeStatusReason(respErr),
Code: int32(code),
Reason: errorToKubeStatusReason(respErr, code),
}
data, err := runtime.Encode(kubeCodecs.LegacyCodec(), status)
if err != nil {
@ -2991,7 +3008,7 @@ func getRequestVerb(method string) string {
// errorToKubeStatusReason returns an appropriate StatusReason based on the
// provided error type.
func errorToKubeStatusReason(err error) metav1.StatusReason {
func errorToKubeStatusReason(err error, code int) metav1.StatusReason {
switch {
case trace.IsAggregate(err):
return metav1.StatusReasonTimeout
@ -3011,6 +3028,8 @@ func errorToKubeStatusReason(err error) metav1.StatusReason {
return metav1.StatusReasonTooManyRequests
case trace.IsConnectionProblem(err):
return metav1.StatusReasonTimeout
case code == http.StatusInternalServerError:
return metav1.StatusReasonInternalError
default:
return metav1.StatusReasonUnknown
}

View file

@ -43,9 +43,12 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/transport"
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/auth/testauthority"
@ -53,6 +56,7 @@ import (
"github.com/gravitational/teleport/lib/backend/memory"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/fixtures"
testingkubemock "github.com/gravitational/teleport/lib/kube/proxy/testing/kube_server"
"github.com/gravitational/teleport/lib/reversetunnel"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/services/local"
@ -124,6 +128,12 @@ func TestRequestCertificate(t *testing.T) {
require.Empty(t, cmp.Diff(*idFromCSR, ctx.UnmappedIdentity.GetIdentity()))
}
func fakeClusterFeatures() proto.Features {
return proto.Features{
Kubernetes: true,
}
}
func TestAuthenticate(t *testing.T) {
t.Parallel()
@ -163,6 +173,7 @@ func TestAuthenticate(t *testing.T) {
CachingAuthClient: ap,
TracerProvider: otel.GetTracerProvider(),
tracer: otel.Tracer(teleport.ComponentKube),
ClusterFeatures: fakeClusterFeatures,
},
getKubernetesServersForKubeCluster: func(ctx context.Context, name string) ([]types.KubeServer, error) {
servers, err := ap.GetKubernetesServers(ctx)
@ -1266,6 +1277,7 @@ func newMockForwader(ctx context.Context, t *testing.T) *Forwarder {
Context: ctx,
TracerProvider: otel.GetTracerProvider(),
tracer: otel.Tracer(teleport.ComponentKube),
ClusterFeatures: fakeClusterFeatures,
},
clientCredentials: clientCreds,
activeRequests: make(map[string]context.Context),
@ -1821,3 +1833,78 @@ func Test_copyImpersonationHeaders(t *testing.T) {
})
}
}
func TestKubernetesLicenseEnforcement(t *testing.T) {
t.Parallel()
// kubeMock is a Kubernetes API mock for the session tests.
kubeMock, err := testingkubemock.NewKubeAPIMock()
require.NoError(t, err)
t.Cleanup(func() { kubeMock.Close() })
tests := []struct {
name string
features proto.Features
assertErrFunc require.ErrorAssertionFunc
}{
{
name: "kubernetes agent is licensed",
features: proto.Features{
Kubernetes: true,
},
assertErrFunc: require.NoError,
},
{
name: "kubernetes isn't licensed",
features: proto.Features{
Kubernetes: false,
},
assertErrFunc: func(tt require.TestingT, err error, i ...interface{}) {
require.Error(tt, err)
var kubeErr *kubeerrors.StatusError
require.ErrorAs(tt, err, &kubeErr)
require.Equal(tt, kubeErr.ErrStatus.Code, int32(http.StatusForbidden))
require.Equal(tt, kubeErr.ErrStatus.Reason, metav1.StatusReasonForbidden)
require.Equal(tt, kubeErr.ErrStatus.Message, "Teleport cluster is not licensed for Kubernetes")
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
// creates a Kubernetes service with a configured cluster pointing to mock api server
testCtx := SetupTestContext(
context.Background(),
t,
TestConfig{
Clusters: []KubeClusterConfig{{Name: kubeCluster, APIEndpoint: kubeMock.URL}},
ClusterFeatures: func() proto.Features {
return tt.features
},
},
)
// close tests
t.Cleanup(func() { require.NoError(t, testCtx.Close()) })
_, _ = testCtx.CreateUserAndRole(
testCtx.Context,
t,
username,
RoleSpec{
Name: roleName,
KubeUsers: roleKubeUsers,
KubeGroups: roleKubeGroups,
})
// generate a kube client with user certs for auth
client, _ := testCtx.GenTestKubeClientTLSCert(
t,
username,
kubeCluster,
)
_, err = client.CoreV1().Pods(metav1.NamespaceDefault).List(context.Background(), metav1.ListOptions{})
tt.assertErrFunc(t, err)
})
}
}

View file

@ -38,6 +38,7 @@ import (
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client/proto"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
@ -82,6 +83,7 @@ type TestConfig struct {
ResourceMatchers []services.ResourceMatcher
OnReconcile func(types.KubeClusters)
OnEvent func(apievents.AuditEvent)
ClusterFeatures func() proto.Features
}
// SetupTestContext creates a kube service with clusters configured.
@ -173,6 +175,11 @@ func SetupTestContext(ctx context.Context, t *testing.T, cfg TestConfig) *TestCo
// heartbeatsWaitChannel waits for clusters heartbeats to start.
heartbeatsWaitChannel := make(chan struct{}, len(cfg.Clusters)+1)
client := newAuthClientWithStreamer(testCtx)
features := func() proto.Features { return proto.Features{Kubernetes: true} }
if cfg.ClusterFeatures != nil {
features = cfg.ClusterFeatures
}
// Create kubernetes service server.
testCtx.KubeServer, err = NewTLSServer(TLSServerConfig{
ForwarderConfig: ForwarderConfig{
@ -202,7 +209,8 @@ func SetupTestContext(ctx context.Context, t *testing.T, cfg TestConfig) *TestCo
CheckImpersonationPermissions: func(ctx context.Context, clusterName string, sarClient authztypes.SelfSubjectAccessReviewInterface) error {
return nil
},
Clock: clockwork.NewRealClock(),
Clock: clockwork.NewRealClock(),
ClusterFeatures: features,
},
DynamicLabels: nil,
TLS: tlsConfig,

View file

@ -231,6 +231,7 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C
LockWatcher: lockWatcher,
CheckImpersonationPermissions: cfg.Kube.CheckImpersonationPermissions,
PublicAddr: publicAddr,
ClusterFeatures: process.getClusterFeatures,
},
TLS: tlsConfig,
AccessPoint: accessPoint,

View file

@ -4070,7 +4070,8 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
// using Impersonation headers. The upstream service will validate if
// the provided connection certificate is from a proxy server and
// will impersonate the identity of the user that is making the request.
ConnTLSConfig: tlsConfig.Clone(),
ConnTLSConfig: tlsConfig.Clone(),
ClusterFeatures: process.getClusterFeatures,
},
TLS: tlsConfig.Clone(),
LimiterConfig: cfg.Proxy.Limiter,

View file

@ -7943,6 +7943,11 @@ func startKubeWithoutCleanup(ctx context.Context, t *testing.T, cfg startKubeOpt
},
ConnTLSConfig: tlsConfig,
Clock: clockwork.NewRealClock(),
ClusterFeatures: func() clientproto.Features {
return clientproto.Features{
Kubernetes: true,
}
},
},
TLS: tlsConfig,
AccessPoint: client,