Cleanup closed remote cluster sessions from k8s forwarder cache

Background:

    Kube forwarder caches `clusterSession` objects for 1hr. These objects
    contain a `reversetunel.RemoteSite` wrapper for remote cluster tunnels.

    When an admin runs `tctl rm rc/leaf` on the root cluster, the tunnel
    gets closed. But kube forwarder cache retains a reference to it, which
    causes all requests to fails saying "node is offline".

    If admin then adds another leaf cluster with the same name, kube
    requests will still fail trying to use the old closed
    `reversetunnel.RemoteSite`, even though a new one has been established.

Fix:

    Check the status of the cached `RemoteSite` before using it. Discard
    it if it was closed.

Updates #3678
This commit is contained in:
Andrew Lytvynov 2020-05-27 11:20:13 -07:00 committed by Andrew Lytvynov
parent 3c94003379
commit aac33af44f
6 changed files with 66 additions and 7 deletions

View file

@ -921,10 +921,16 @@ func (f *Forwarder) getClusterSession(ctx authContext) *clusterSession {
f.Lock()
defer f.Unlock()
creds, ok := f.clusterSessions.Get(ctx.key())
if ok {
return creds.(*clusterSession)
if !ok {
return nil
}
return nil
s := creds.(*clusterSession)
if s.cluster.isRemote && s.cluster.RemoteSite.IsClosed() {
f.Debugf("Found an existing clusterSession for remote cluster %q but it has been closed. Discarding it to create a new clusterSession.", ctx.cluster.GetName())
f.clusterSessions.Remove(ctx.key())
return nil
}
return s
}
func (f *Forwarder) serializedNewClusterSession(authContext authContext) (*clusterSession, error) {

View file

@ -7,9 +7,11 @@ import (
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/auth/testauthority"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/reversetunnel"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/tlsca"
"github.com/gravitational/ttlmap"
"gopkg.in/check.v1"
)
@ -70,6 +72,43 @@ func (s ForwarderSuite) TestRequestCertificate(c *check.C) {
c.Assert(idFromCSR, check.DeepEquals, ctx.Identity)
}
func (s ForwarderSuite) TestGetClusterSession(c *check.C) {
clusterSessions, err := ttlmap.New(defaults.ClientCacheSize)
c.Assert(err, check.IsNil)
f := &Forwarder{
clusterSessions: clusterSessions,
}
user, err := services.NewUser("bob")
c.Assert(err, check.IsNil)
remote := &mockRemoteSite{name: "site a"}
ctx := authContext{
cluster: cluster{
isRemote: true,
RemoteSite: remote,
},
AuthContext: auth.AuthContext{
User: user,
},
}
sess := &clusterSession{authContext: ctx}
// Initial clusterSessions is empty, no session should be found.
c.Assert(f.getClusterSession(ctx), check.IsNil)
// Add a session to clusterSessions, getClusterSession should find it.
clusterSessions.Set(ctx.key(), sess, time.Hour)
c.Assert(f.getClusterSession(ctx), check.Equals, sess)
// Close the RemoteSite out-of-band (like when a remote cluster got removed
// via tctl), getClusterSession should notice this and discard the
// clusterSession.
remote.closed = true
c.Assert(f.getClusterSession(ctx), check.IsNil)
_, ok := f.clusterSessions.Get(ctx.key())
c.Assert(ok, check.Equals, false)
}
// mockClient to intercept ProcessKubeCSR requests, record them and return a
// stub response.
type mockClient struct {
@ -89,9 +128,9 @@ func (c *mockClient) ProcessKubeCSR(csr auth.KubeCSR) (*auth.KubeCSRResponse, er
// reversetunnel.RemoteSite.
type mockRemoteSite struct {
reversetunnel.RemoteSite
name string
name string
closed bool
}
func (s mockRemoteSite) GetName() string {
return s.name
}
func (s mockRemoteSite) GetName() string { return s.name }
func (s mockRemoteSite) IsClosed() bool { return s.closed }

View file

@ -92,6 +92,9 @@ type RemoteSite interface {
// GetTunnelsCount returns the amount of active inbound tunnels
// from the remote cluster
GetTunnelsCount() int
// IsClosed reports whether this RemoteSite has been closed and should no
// longer be used.
IsClosed() bool
}
// Server is a TCP/IP SSH server which listens on an SSH endpoint and remote/local

View file

@ -191,6 +191,9 @@ func (s *localSite) DialTCP(params DialParams) (net.Conn, error) {
return conn, nil
}
// IsClosed always returns false because localSite is never closed.
func (s *localSite) IsClosed() bool { return false }
func (s *localSite) dialWithAgent(params DialParams) (net.Conn, error) {
s.log.Debugf("Dialing with an agent from %v to %v.", params.From, params.To)

View file

@ -132,6 +132,9 @@ func (p *clusterPeers) DialTCP(params DialParams) (conn net.Conn, err error) {
return nil, trace.ConnectionProblem(nil, "unable to dial, this proxy has not been discovered yet, try again later")
}
// IsClosed always returns false because clusterPeers is never closed.
func (s *clusterPeers) IsClosed() bool { return false }
// newClusterPeer returns new cluster peer
func newClusterPeer(srv *server, connInfo services.TunnelConnection, offlineThreshold time.Duration) (*clusterPeer, error) {
clusterPeer := &clusterPeer{

View file

@ -175,6 +175,11 @@ func (s *remoteSite) Close() error {
return nil
}
// IsClosed reports whether this remoteSite has been closed.
func (s *remoteSite) IsClosed() bool {
return s.ctx.Err() != nil
}
// nextConn returns next connection that is ready
// and has not been marked as invalid
// it will close connections marked as invalid