[kube] support SPDY over websocket protocol (#46663)

SPDY has been deprecated for several years, and most proxies are expected to lose support in the coming months. To address this issue, Kubernetes introduced SPDY over WebSocket connections. This protocol leverages a WebSocket upgrade, but once established, it functions as a simple connection with SPDY-based framing.

This pull request (PR) introduces initial support for customer-facing upgrades. Future PRs will add support for teleport-to-teleport communication using the `SPDY/3.1+portforward.k8s.io` protocol.
This commit is contained in:
Tiago Silva 2024-09-20 17:16:18 +01:00 committed by GitHub
parent d28977fae4
commit ab2f7fd4bd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 176 additions and 42 deletions

View file

@ -47,7 +47,7 @@ rules:
resourceNames: ["test-pod"]
- apiGroups: [""]
resources: ["pods/portforward"]
verbs: ["create"]
verbs: ["create", "get"]
resourceNames: ["test-pod"]
- apiGroups: [""]
resources: ["pods/ephemeralcontainers"]

View file

@ -516,34 +516,6 @@ func testKubePortForward(t *testing.T, suite *KubeSuite) {
})
require.NoError(t, err)
// forward local port to target port 80 of the nginx container
localPort := newPortValue()
forwarder, err := newPortForwarder(proxyClientConfig, kubePortForwardArgs{
ports: []string{fmt.Sprintf("%v:80", localPort)},
podName: testPod,
podNamespace: testNamespace,
})
require.NoError(t, err)
forwarderCh := make(chan error)
go func() { forwarderCh <- forwarder.ForwardPorts() }()
defer func() {
assert.NoError(t, <-forwarderCh, "Forward ports exited with error")
}()
select {
case <-time.After(5 * time.Second):
t.Fatalf("Timeout waiting for port forwarding.")
case <-forwarder.readyC:
}
defer close(forwarder.stopC)
resp, err := http.Get(fmt.Sprintf("http://localhost:%v", localPort))
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
require.NoError(t, resp.Body.Close())
// impersonating client requests will bse denied
_, impersonatingProxyClientConfig, err := kube.ProxyClient(kube.ProxyConfig{
T: teleport,
@ -553,18 +525,72 @@ func testKubePortForward(t *testing.T, suite *KubeSuite) {
})
require.NoError(t, err)
localPort = newPortValue()
impersonatingForwarder, err := newPortForwarder(impersonatingProxyClientConfig, kubePortForwardArgs{
ports: []string{fmt.Sprintf("%v:80", localPort)},
podName: testPod,
podNamespace: testNamespace,
})
require.NoError(t, err)
tests := []struct {
name string
builder func(*rest.Config, kubePortForwardArgs) (*kubePortForwarder, error)
}{
{
name: "SPDY portForwarder",
builder: newPortForwarder,
},
{
name: "SPDY over Websocket portForwarder",
builder: newPortForwarderSPDYOverWebsocket,
},
}
for _, tt := range tests {
t.Run(tt.name,
func(t *testing.T) {
// forward local port to target port 80 of the nginx container
listener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, listener.Close())
})
localPort := listener.Addr().(*net.TCPAddr).Port
forwarder, err := tt.builder(proxyClientConfig, kubePortForwardArgs{
ports: []string{fmt.Sprintf("%v:80", localPort)},
podName: testPod,
podNamespace: testNamespace,
})
require.NoError(t, err)
forwarderCh := make(chan error)
go func() { forwarderCh <- forwarder.ForwardPorts() }()
select {
case <-time.After(5 * time.Second):
t.Fatalf("Timeout waiting for port forwarding.")
case <-forwarder.readyC:
}
t.Cleanup(func() {})
resp, err := http.Get(fmt.Sprintf("http://localhost:%v", localPort))
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
require.NoError(t, resp.Body.Close())
close(forwarder.stopC)
require.NoError(t, <-forwarderCh, "Forward ports exited with error")
impersonatingForwarder, err := tt.builder(impersonatingProxyClientConfig, kubePortForwardArgs{
ports: []string{fmt.Sprintf("%v:80", localPort)},
podName: testPod,
podNamespace: testNamespace,
})
require.NoError(t, err)
// This request should be denied
err = impersonatingForwarder.ForwardPorts()
require.Error(t, err)
require.Regexp(t, ".*impersonation request has been denied.*|.*403 Forbidden.*", err.Error())
},
)
}
// This request should be denied
err = impersonatingForwarder.ForwardPorts()
require.Error(t, err)
require.Regexp(t, ".*impersonation request has been denied.*", err.Error())
}
// TestKubeTrustedClustersClientCert tests scenario with trusted clusters
@ -1948,6 +1974,27 @@ type kubePortForwarder struct {
readyC chan struct{}
}
func newPortForwarderSPDYOverWebsocket(kubeConfig *rest.Config, args kubePortForwardArgs) (*kubePortForwarder, error) {
u, err := url.Parse(kubeConfig.Host)
if err != nil {
return nil, trace.Wrap(err)
}
u.Scheme = "https"
u.Path = fmt.Sprintf("/api/v1/namespaces/%v/pods/%v/portforward", args.podNamespace, args.podName)
tunnelingDialer, err := portforward.NewSPDYOverWebsocketDialer(u, kubeConfig)
if err != nil {
return nil, trace.Wrap(err)
}
stopC, readyC := make(chan struct{}), make(chan struct{})
fwd, err := portforward.New(tunnelingDialer, args.ports, stopC, readyC, nil, nil)
if err != nil {
return nil, trace.Wrap(err)
}
return &kubePortForwarder{PortForwarder: fwd, stopC: stopC, readyC: readyC}, nil
}
func newPortForwarder(kubeConfig *rest.Config, args kubePortForwardArgs) (*kubePortForwarder, error) {
u, err := url.Parse(kubeConfig.Host)
if err != nil {

View file

@ -50,6 +50,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/httpstream"
httpstreamspdy "k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/rest"
@ -1814,10 +1815,14 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req
// Go client uses SPDY while other clients still require WebSockets.
// This function will run until the end of the execution of the request.
func runPortForwarding(req portForwardRequest) error {
if wsstream.IsWebSocketRequest(req.httpRequest) {
switch {
case wsstream.IsWebSocketRequestWithTunnelingProtocol(req.httpRequest):
return trace.Wrap(runPortForwardingTunneledHTTPStreams(req))
case wsstream.IsWebSocketRequest(req.httpRequest):
return trace.Wrap(runPortForwardingWebSocket(req))
default:
return trace.Wrap(runPortForwardingHTTPStreams(req))
}
return trace.Wrap(runPortForwardingHTTPStreams(req))
}
const (
@ -2171,6 +2176,7 @@ func (f *Forwarder) getSPDYDialer(sess *clusterSession, req *http.Request) (http
return nil, trace.Wrap(err)
}
req = createSPDYRequest(req, PortForwardProtocolV1Name)
upgradeRoundTripper := NewSpdyRoundTripperWithDialer(roundTripperConfig{
ctx: req.Context(),
sess: sess,
@ -2196,6 +2202,27 @@ func (f *Forwarder) getSPDYDialer(sess *clusterSession, req *http.Request) (http
return spdy.NewDialer(upgradeRoundTripper, client, req.Method, req.URL), nil
}
// createSPDYRequest modifies the passed request to remove
// WebSockets headers and add SPDY upgrade information, including
// spdy protocols acceptable to the client.
func createSPDYRequest(req *http.Request, spdyProtocols ...string) *http.Request {
clone := req.Clone(req.Context())
// Clean up the websocket headers from the http request.
clone.Header.Del(wsstream.WebSocketProtocolHeader)
clone.Header.Del("Sec-Websocket-Key")
clone.Header.Del("Sec-Websocket-Version")
clone.Header.Del(httpstream.HeaderUpgrade)
// Update the http request for an upstream SPDY upgrade.
clone.Method = "POST"
clone.Body = nil // Remove the request body which is unused.
clone.Header.Set(httpstream.HeaderUpgrade, httpstreamspdy.HeaderSpdy31)
clone.Header.Del(httpstream.HeaderProtocolVersion)
for i := range spdyProtocols {
clone.Header.Add(httpstream.HeaderProtocolVersion, spdyProtocols[i])
}
return clone
}
// clusterSession contains authenticated user session to the target cluster:
// x509 short lived credentials, forwarding proxies and other data
type clusterSession struct {

View file

@ -27,10 +27,14 @@ import (
"strings"
"sync"
gwebsocket "github.com/gorilla/websocket"
"github.com/gravitational/trace"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/httpstream"
spdystream "k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
portforwardconstants "k8s.io/apimachinery/pkg/util/portforward"
"k8s.io/client-go/tools/portforward"
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/lib/events"
@ -297,3 +301,59 @@ func (h *websocketPortforwardHandler) forwardStreamPair(p *websocketChannelPair)
h.Debugf("Port forwarding pair completed.")
}
// runPortForwardingTunneledHTTPStreams handles a port-forwarding request that uses SPDY protocol
// over WebSockets.
func runPortForwardingTunneledHTTPStreams(req portForwardRequest) error {
targetConn, _, err := req.targetDialer.Dial(PortForwardProtocolV1Name)
if err != nil {
return trace.Wrap(err, "error upgrading target connection")
}
defer targetConn.Close()
// Try to upgrade the websocket connection.
// Beyond this point, we don't need to write errors to the response.
upgrader := gwebsocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
Subprotocols: []string{portforwardconstants.WebsocketsSPDYTunnelingPortForwardV1},
}
conn, err := upgrader.Upgrade(req.httpResponseWriter, req.httpRequest, nil)
if err != nil {
return trace.Wrap(err)
}
tunneledConn := portforward.NewTunnelingConnection("server", conn)
streamChan := make(chan httpstream.Stream, 1)
spdyConn, err := spdystream.NewServerConnectionWithPings(
tunneledConn,
httpStreamReceived(req.context, streamChan),
req.pingPeriod,
)
if err != nil {
return trace.Wrap(err, "error upgrading connection")
}
if conn == nil {
return trace.ConnectionProblem(nil, "Unable to upgrade websocket connection")
}
defer conn.Close()
h := &portForwardProxy{
Entry: logrus.WithFields(logrus.Fields{
teleport.ComponentKey: teleport.Component(teleport.ComponentProxyKube),
events.RemoteAddr: req.httpRequest.RemoteAddr,
}),
portForwardRequest: req,
sourceConn: spdyConn,
streamChan: streamChan,
streamPairs: make(map[string]*httpStreamPair),
streamCreationTimeout: DefaultStreamCreationTimeout,
targetConn: targetConn,
}
defer h.Close()
h.Debugf("Setting port forwarding streaming connection idle timeout to %v", IdleTimeout)
spdyConn.SetIdleTimeout(IdleTimeout)
h.run()
return nil
}