Use a discard session server and audit logger when the proxy is in

recording mode and on a Teleport node.
This commit is contained in:
Russell Jones 2018-01-15 21:31:30 +00:00
parent 691a8a5758
commit 6a814e8a85
4 changed files with 85 additions and 32 deletions

View file

@ -260,9 +260,10 @@ func (s *IntSuite) TestAuditOn(c *check.C) {
if err != nil {
return nil, trace.Wrap(err)
}
if len(sessions) > 0 {
return &sessions[0], nil
if len(sessions) != 1 {
continue
}
return &sessions[0], nil
case <-stopCh:
return nil, trace.BadParameter("unable to find sessions after 10s (mode=%v)", tt.inRecordLocation)
}

View file

@ -216,24 +216,26 @@ func (u *UpdateRequest) Check() error {
return nil
}
// Due to limitations of the current back-end, Teleport won't return more than 1000 sessions
// per time window
// MaxSessionSliceLength is the maximum number of sessions per time window
// that the backend will return.
const MaxSessionSliceLength = 1000
// Service is a realtime SSH session service
// that has information about sessions that are in-flight in the
// cluster at the moment
// Service is a realtime SSH session service that has information about
// sessions that are in-flight in the cluster at the moment.
type Service interface {
// GetSessions returns a list of currently active sessions
// with all parties involved
// GetSessions returns a list of currently active sessions with all parties
// involved.
GetSessions(namespace string) ([]Session, error)
// GetSession returns a session with it's parties by ID
// GetSession returns a session with it's parties by ID.
GetSession(namespace string, id ID) (*Session, error)
// CreateSession creates a new active session and it's parameters
// if term is skipped, terminal size won't be recorded
// CreateSession creates a new active session and it's parameters if term is
// skipped, terminal size won't be recorded.
CreateSession(sess Session) error
// UpdateSession updates certain session parameters (last_active, terminal parameters)
// other parameters will not be updated
// UpdateSession updates certain session parameters (last_active, terminal
// parameters) other parameters will not be updated.
UpdateSession(req UpdateRequest) error
}
@ -386,6 +388,37 @@ func (s *server) UpdateSession(req UpdateRequest) error {
return nil
}
// discardSessionServer discards all information about sessions given to it.
type discardSessionServer struct {
}
// NewDiscardSessionServer returns a new discarding session server. It's used
// with the recording proxy so that nodes don't register active sessions to
// the backend.
func NewDiscardSessionServer() *discardSessionServer {
return &discardSessionServer{}
}
// GetSessions returns an empty list of sessions.
func (d *discardSessionServer) GetSessions(namespace string) ([]Session, error) {
return []Session{}, nil
}
// GetSession always returns a zero session.
func (d *discardSessionServer) GetSession(namespace string, id ID) (*Session, error) {
return &Session{}, nil
}
// CreateSession always returns nil, does nothing.
func (d *discardSessionServer) CreateSession(sess Session) error {
return nil
}
// UpdateSession always returns nil, does nothing.
func (d *discardSessionServer) UpdateSession(req UpdateRequest) error {
return nil
}
// NewTerminalParamsFromUint32 returns new terminal parameters from uint32 width and height
func NewTerminalParamsFromUint32(w uint32, h uint32) (*TerminalParams, error) {
if w > maxSize || w < minSize {

View file

@ -124,6 +124,9 @@ func (s *Server) GetNamespace() string {
}
func (s *Server) GetAuditLog() events.IAuditLog {
if s.isAuditedAtProxy() {
return events.NewDiscardAuditLog()
}
return s.alog
}
@ -132,9 +135,30 @@ func (s *Server) GetAccessPoint() auth.AccessPoint {
}
func (s *Server) GetSessionServer() rsession.Service {
if s.isAuditedAtProxy() {
return rsession.NewDiscardSessionServer()
}
return s.sessionServer
}
// isAuditedAtProxy returns true if sessions are being recorded at the proxy
// and this is a Teleport node.
func (s *Server) isAuditedAtProxy() bool {
// always be safe, better to double record than not record at all
clusterConfig, err := s.GetAccessPoint().GetClusterConfig()
if err != nil {
return false
}
isRecordAtProxy := clusterConfig.GetSessionRecording() == services.RecordAtProxy
isTeleportNode := s.Component() == teleport.ComponentNode
if isRecordAtProxy && isTeleportNode {
return true
}
return false
}
// ServerOption is a functional option passed to the server
type ServerOption func(s *Server) error

View file

@ -26,7 +26,6 @@ import (
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/services"
rsession "github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/sshutils"
"github.com/gravitational/teleport/lib/state"
@ -340,12 +339,18 @@ func newSession(id rsession.ID, r *SessionRegistry, ctx *ServerContext) (*sessio
rsess.TerminalParams.W = int(winsize.Width)
rsess.TerminalParams.H = int(winsize.Height)
}
err := r.srv.GetSessionServer().CreateSession(rsess)
// get the session server where session information lives. if the recording
// proxy is being used and this is a node, then a discard session server will
// be returned here.
sessionServer := r.srv.GetSessionServer()
err := sessionServer.CreateSession(rsess)
if err != nil {
if trace.IsAlreadyExists(err) {
// if session already exists, make sure they are compatible
// Login matches existing login
existing, err := r.srv.GetSessionServer().GetSession(r.srv.GetNamespace(), id)
existing, err := sessionServer.GetSession(r.srv.GetNamespace(), id)
if err != nil {
return nil, trace.Wrap(err)
}
@ -556,6 +561,8 @@ func (r *sessionRecorder) Close() error {
// start starts a new interactive process (or a shell) in the current session
func (s *session) start(ch ssh.Channel, ctx *ServerContext) error {
var err error
// create a new "party" (connected client)
p := newParty(s, ch, ctx)
@ -565,7 +572,6 @@ func (s *session) start(ch ssh.Channel, ctx *ServerContext) error {
s.term = ctx.GetTerm()
ctx.SetTerm(nil)
} else {
var err error
if s.term, err = NewTerminal(ctx); err != nil {
ctx.Infof("handleShell failed to create term: %v", err)
return trace.Wrap(err)
@ -582,21 +588,10 @@ func (s *session) start(ch ssh.Channel, ctx *ServerContext) error {
params := s.term.GetTerminalParams()
// get the audit log. if the cluster is recording sessions at the proxy and
// this is a teleport node, then use a discard audit log because everything
// is being recorded at the proxy already.
// get the audit log from the server and create a session recorder. this will
// be a discard audit log if the proxy is in recording mode and a teleport
// node so we don't create double recordings.
auditLog := s.registry.srv.GetAuditLog()
clusterConfig, err := s.registry.srv.GetAccessPoint().GetClusterConfig()
if err != nil {
return trace.Wrap(err)
}
isRecordAtProxy := clusterConfig.GetSessionRecording() == services.RecordAtProxy
isTeleportNode := s.registry.srv.Component() == teleport.ComponentNode
if isRecordAtProxy && isTeleportNode {
auditLog = events.NewDiscardAuditLog()
}
// create the recorder for the session with the passed in audit log
s.recorder, err = newSessionRecorder(auditLog, ctx.srv.GetNamespace(), s.id)
if err != nil {
return trace.Wrap(err)