Remove events.Forwarder and RecordSessions config param

The Forwarder type has been replaced with the new GRPC/streaming based
session recording and was only used in tests.

The RecordSessions param is never consulted, as it was replaced with
AuditWriter's RecordOutput param a couple of years ago.
This commit is contained in:
Zac Bergquist 2022-03-23 19:13:29 -06:00 committed by Zac Bergquist
parent 58b2aac411
commit bd7e7a84f0
8 changed files with 37 additions and 1146 deletions

View file

@ -41,7 +41,6 @@ import (
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/api/utils/sshutils"
"github.com/gravitational/teleport/lib/auth/testauthority"
authority "github.com/gravitational/teleport/lib/auth/testauthority"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/lite"
"github.com/gravitational/teleport/lib/backend/memory"
@ -87,7 +86,7 @@ func newTestPack(ctx context.Context, dataDir string) (testPack, error) {
authConfig := &InitConfig{
Backend: p.bk,
ClusterName: p.clusterName,
Authority: authority.New(),
Authority: testauthority.New(),
SkipPeriodicOperations: true,
}
p.a, err = NewServer(authConfig)
@ -780,7 +779,7 @@ func TestUpdateConfig(t *testing.T) {
authConfig := &InitConfig{
ClusterName: clusterName,
Backend: s.bk,
Authority: authority.New(),
Authority: testauthority.New(),
SkipPeriodicOperations: true,
}
authServer, err := NewServer(authConfig)

View file

@ -209,10 +209,9 @@ func NewTestAuthServer(cfg TestAuthServerConfig) (*TestAuthServer, error) {
srv.Backend = backend.NewSanitizer(b)
localLog, err := events.NewAuditLog(events.AuditLogConfig{
DataDir: cfg.Dir,
RecordSessions: true,
ServerID: cfg.ClusterName,
UploadHandler: events.NewMemoryUploader(),
DataDir: cfg.Dir,
ServerID: cfg.ClusterName,
UploadHandler: events.NewMemoryUploader(),
})
if err != nil {
return nil, trace.Wrap(err)

View file

@ -42,7 +42,6 @@ import (
"github.com/gravitational/teleport/api/constants"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
apiutils "github.com/gravitational/teleport/api/utils"
"github.com/gravitational/teleport/api/utils/sshutils"
"github.com/gravitational/teleport/lib/defaults"
@ -1206,164 +1205,6 @@ func (s *TLSSuite) TestValidatePostSessionSlice(c *check.C) {
}
}
func (s *TLSSuite) TestSharedSessions(c *check.C) {
clt, err := s.server.NewClient(TestAdmin())
c.Assert(err, check.IsNil)
out, err := clt.GetSessions(apidefaults.Namespace)
c.Assert(err, check.IsNil)
c.Assert(out, check.DeepEquals, []session.Session{})
date := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
sess := session.Session{
ID: session.NewID(),
TerminalParams: session.TerminalParams{W: 100, H: 100},
Created: date,
LastActive: date,
Login: "bob",
Namespace: apidefaults.Namespace,
}
c.Assert(clt.CreateSession(sess), check.IsNil)
out, err = clt.GetSessions(apidefaults.Namespace)
c.Assert(err, check.IsNil)
c.Assert(out, check.DeepEquals, []session.Session{sess})
marshal := func(f events.EventFields) []byte {
data, err := json.Marshal(f)
if err != nil {
panic(err)
}
return data
}
uploadDir := c.MkDir()
// emit two events: "one" and "two" for this session, and event "three"
// for some other session
err = os.MkdirAll(filepath.Join(uploadDir, "upload", "sessions", apidefaults.Namespace), 0755)
c.Assert(err, check.IsNil)
forwarder, err := events.NewForwarder(events.ForwarderConfig{
Namespace: apidefaults.Namespace,
SessionID: sess.ID,
ServerID: teleport.ComponentUpload,
DataDir: uploadDir,
RecordSessions: true,
IAuditLog: clt,
})
c.Assert(err, check.IsNil)
err = forwarder.PostSessionSlice(events.SessionSlice{
Namespace: apidefaults.Namespace,
SessionID: string(sess.ID),
Chunks: []*events.SessionChunk{
{
Time: time.Now().UTC().UnixNano(),
EventIndex: 0,
EventType: events.SessionStartEvent,
Data: marshal(events.EventFields{events.EventLogin: "bob", "val": "one"}),
},
{
Time: time.Now().UTC().UnixNano(),
EventIndex: 1,
EventType: events.SessionEndEvent,
Data: marshal(events.EventFields{events.EventLogin: "bob", "val": "two"}),
},
},
Version: events.V3,
})
c.Assert(err, check.IsNil)
c.Assert(forwarder.Close(), check.IsNil)
anotherSessionID := session.NewID()
forwarder, err = events.NewForwarder(events.ForwarderConfig{
Namespace: apidefaults.Namespace,
SessionID: sess.ID,
ServerID: teleport.ComponentUpload,
DataDir: uploadDir,
RecordSessions: true,
IAuditLog: clt,
})
c.Assert(err, check.IsNil)
err = clt.PostSessionSlice(events.SessionSlice{
Namespace: apidefaults.Namespace,
SessionID: string(anotherSessionID),
Chunks: []*events.SessionChunk{
{
Time: time.Now().UTC().UnixNano(),
EventIndex: 0,
EventType: events.SessionStartEvent,
Data: marshal(events.EventFields{events.EventLogin: "alice"}),
},
{
Time: time.Now().UTC().UnixNano(),
EventIndex: 1,
EventType: events.SessionEndEvent,
Data: marshal(events.EventFields{events.EventLogin: "alice"}),
},
},
Version: events.V3,
})
c.Assert(err, check.IsNil)
c.Assert(forwarder.Close(), check.IsNil)
// start uploader process
eventsC := make(chan events.UploadEvent, 100)
uploader, err := events.NewUploader(events.UploaderConfig{
ServerID: "upload",
DataDir: uploadDir,
Namespace: apidefaults.Namespace,
Context: context.TODO(),
ScanPeriod: 100 * time.Millisecond,
AuditLog: clt,
EventsC: eventsC,
})
c.Assert(err, check.IsNil)
err = uploader.Scan()
c.Assert(err, check.IsNil)
// scanner should upload the events
select {
case event := <-eventsC:
c.Assert(event, check.NotNil)
c.Assert(event.Error, check.IsNil)
case <-time.After(time.Second):
c.Fatalf("Timeout wating for the upload event")
}
// ask for strictly session events:
e, err := clt.GetSessionEvents(apidefaults.Namespace, sess.ID, 0, true)
c.Assert(err, check.IsNil)
c.Assert(len(e), check.Equals, 2)
c.Assert(e[0].GetString("val"), check.Equals, "one")
c.Assert(e[1].GetString("val"), check.Equals, "two")
// try searching for events with no filter (empty query) - should get all 3 events:
to := time.Now().In(time.UTC).Add(time.Hour)
from := to.Add(-time.Hour * 2)
history, _, err := clt.SearchEvents(from, to, apidefaults.Namespace, nil, 0, types.EventOrderDescending, "")
c.Assert(err, check.IsNil)
c.Assert(history, check.NotNil)
// Extra event is the upload event
c.Assert(len(history), check.Equals, 5)
// try searching for only "session.end" events (real query)
history, _, err = clt.SearchEvents(from, to, apidefaults.Namespace, []string{events.SessionEndEvent}, 0, types.EventOrderDescending, "")
c.Assert(err, check.IsNil)
c.Assert(history, check.NotNil)
c.Assert(len(history), check.Equals, 2)
var found bool
for _, event := range history {
realEvent, ok := event.(*apievents.SessionEnd)
c.Assert(ok, check.Equals, true)
if realEvent.GetSessionID() == string(anotherSessionID) {
found = true
c.Assert(realEvent.Login, check.Equals, "alice")
}
}
c.Assert(found, check.Equals, true)
}
func (s *TLSSuite) TestOTPCRUD(c *check.C) {
clt, err := s.server.NewClient(TestAdmin())
c.Assert(err, check.IsNil)

View file

@ -145,9 +145,6 @@ type AuditLogConfig struct {
// ServerID is the id of the audit log server
ServerID string
// RecordSessions controls if sessions are recorded along with audit events.
RecordSessions bool
// RotationPeriod defines how frequently to rotate the log file
RotationPeriod time.Duration
@ -238,9 +235,8 @@ func (a *AuditLogConfig) CheckAndSetDefaults() error {
return nil
}
// NewAuditLog creates and returns a new Audit Log object whish will store its logfiles in
// a given directory. Session recording can be disabled by setting
// recordSessions to false.
// NewAuditLog creates and returns a new Audit Log object which will store its log files in
// a given directory.
func NewAuditLog(cfg AuditLogConfig) (*AuditLog, error) {
err := utils.RegisterPrometheusCollectors(prometheusCollectors...)
if err != nil {
@ -443,28 +439,6 @@ type sessionIndex struct {
indexFiles []string
}
func (idx *sessionIndex) fileNames() []string {
files := make([]string, 0, len(idx.indexFiles)+len(idx.events)+len(idx.chunks))
files = append(files, idx.indexFiles...)
for i := range idx.events {
files = append(files, idx.eventsFileName(i))
}
for i := range idx.chunks {
files = append(files, idx.chunksFileName(i))
}
// Enhanced events.
for k, v := range idx.enhancedEvents {
for i := range v {
files = append(files, idx.enhancedFileName(i, k))
}
}
return files
}
func (idx *sessionIndex) sort() {
sort.Slice(idx.events, func(i, j int) bool {
return idx.events[i].Index < idx.events[j].Index
@ -481,11 +455,6 @@ func (idx *sessionIndex) sort() {
}
}
func (idx *sessionIndex) enhancedFileName(index int, eventType string) string {
entry := idx.enhancedEvents[eventType][index]
return filepath.Join(idx.dataDir, entry.authServer, SessionLogsDir, idx.namespace, entry.FileName)
}
func (idx *sessionIndex) eventsFileName(index int) string {
entry := idx.events[index]
return filepath.Join(idx.dataDir, entry.authServer, SessionLogsDir, idx.namespace, entry.FileName)
@ -1246,6 +1215,7 @@ func NewLegacyHandler(cfg LegacyHandlerConfig) (*LegacyHandler, error) {
// LegacyHandler wraps local file uploader and handles
// old style uploads stored directly on disk
// TODO(zmb3): can we remove this now that 4.4 is unsupported?
type LegacyHandler struct {
MultipartHandler
cfg LegacyHandlerConfig

View file

@ -28,13 +28,10 @@ import (
"github.com/jonboulle/clockwork"
"gopkg.in/check.v1"
"github.com/gravitational/teleport"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/events"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/fixtures"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/trace"
)
@ -59,13 +56,13 @@ func (a *AuditTestSuite) TearDownSuite(c *check.C) {
// creates a file-based audit log and returns a proper *AuditLog pointer
// instead of the usual IAuditLog interface
func (a *AuditTestSuite) makeLog(c *check.C, dataDir string, recordSessions bool) (*AuditLog, error) {
return a.makeLogWithClock(c, dataDir, recordSessions, nil)
func (a *AuditTestSuite) makeLog(c *check.C, dataDir string) (*AuditLog, error) {
return a.makeLogWithClock(c, dataDir, nil)
}
// creates a file-based audit log and returns a proper *AuditLog pointer
// instead of the usual IAuditLog interface
func (a *AuditTestSuite) makeLogWithClock(c *check.C, dataDir string, recordSessions bool, clock clockwork.Clock) (*AuditLog, error) {
func (a *AuditTestSuite) makeLogWithClock(c *check.C, dataDir string, clock clockwork.Clock) (*AuditLog, error) {
handler, err := NewLegacyHandler(LegacyHandlerConfig{
Handler: NewMemoryUploader(),
Dir: dataDir,
@ -74,12 +71,11 @@ func (a *AuditTestSuite) makeLogWithClock(c *check.C, dataDir string, recordSess
return nil, trace.Wrap(err)
}
alog, err := NewAuditLog(AuditLogConfig{
DataDir: dataDir,
RecordSessions: recordSessions,
ServerID: "server1",
Clock: clock,
UIDGenerator: utils.NewFakeUID(),
UploadHandler: handler,
DataDir: dataDir,
ServerID: "server1",
Clock: clock,
UIDGenerator: utils.NewFakeUID(),
UploadHandler: handler,
})
if err != nil {
return nil, trace.Wrap(err)
@ -92,235 +88,17 @@ func (a *AuditTestSuite) SetUpTest(c *check.C) {
}
func (a *AuditTestSuite) TestNew(c *check.C) {
alog, err := a.makeLog(c, a.dataDir, true)
alog, err := a.makeLog(c, a.dataDir)
c.Assert(err, check.IsNil)
// close twice:
c.Assert(alog.Close(), check.IsNil)
c.Assert(alog.Close(), check.IsNil)
}
// TestSessionsOnOneAuthServer tests scenario when there are two auth servers
// and session is recorded on the first one
func (a *AuditTestSuite) TestSessionsOnOneAuthServer(c *check.C) {
fakeClock := clockwork.NewFakeClock()
uploader := NewMemoryUploader()
alog, err := NewAuditLog(AuditLogConfig{
Clock: fakeClock,
DataDir: a.dataDir,
RecordSessions: true,
ServerID: "server1",
UploadHandler: uploader,
})
c.Assert(err, check.IsNil)
alog2, err := NewAuditLog(AuditLogConfig{
Clock: fakeClock,
DataDir: a.dataDir,
RecordSessions: true,
ServerID: "server2",
UploadHandler: uploader,
})
c.Assert(err, check.IsNil)
uploadDir := c.MkDir()
err = os.MkdirAll(filepath.Join(uploadDir, "upload", "sessions", apidefaults.Namespace), 0755)
c.Assert(err, check.IsNil)
sessionID := string(session.NewID())
forwarder, err := NewForwarder(ForwarderConfig{
Namespace: apidefaults.Namespace,
SessionID: session.ID(sessionID),
ServerID: teleport.ComponentUpload,
DataDir: uploadDir,
RecordSessions: true,
IAuditLog: alog,
Clock: fakeClock,
})
c.Assert(err, check.IsNil)
// start the session and emit data stream to it
firstMessage := []byte("hello")
err = forwarder.PostSessionSlice(SessionSlice{
Namespace: apidefaults.Namespace,
SessionID: sessionID,
Chunks: []*SessionChunk{
// start the session
{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 0,
EventType: SessionStartEvent,
Data: marshal(EventFields{EventLogin: "bob"}),
},
// type "hello" into session "100"
{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 1,
ChunkIndex: 0,
Offset: 0,
EventType: SessionPrintEvent,
Data: firstMessage,
},
// emitting session end event should close the session
{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 4,
EventType: SessionEndEvent,
Data: marshal(EventFields{EventLogin: "bob"}),
},
},
Version: V3,
})
c.Assert(err, check.IsNil)
c.Assert(forwarder.Close(), check.IsNil)
upload(c, uploadDir, fakeClock, alog)
// does not matter which audit server is accessed the results should be the same
for _, a := range []*AuditLog{alog, alog2} {
// read the session bytes
history, err := a.GetSessionEvents(apidefaults.Namespace, session.ID(sessionID), 0, true)
c.Assert(err, check.IsNil)
c.Assert(history, check.HasLen, 3)
// make sure offsets were properly set (0 for the first event and 5 bytes for hello):
c.Assert(history[1][SessionByteOffset], check.Equals, float64(0))
c.Assert(history[1][SessionEventTimestamp], check.Equals, float64(0))
// fetch all bytes
buff, err := a.GetSessionChunk(apidefaults.Namespace, session.ID(sessionID), 0, 5000)
c.Assert(err, check.IsNil)
c.Assert(string(buff), check.Equals, string(firstMessage))
// with offset
buff, err = a.GetSessionChunk(apidefaults.Namespace, session.ID(sessionID), 2, 5000)
c.Assert(err, check.IsNil)
c.Assert(string(buff), check.Equals, string(firstMessage[2:]))
}
}
func upload(c *check.C, uploadDir string, clock clockwork.Clock, auditLog IAuditLog) {
// start uploader process
eventsC := make(chan UploadEvent, 100)
uploader, err := NewUploader(UploaderConfig{
ServerID: "upload",
DataDir: uploadDir,
Clock: clock,
Namespace: apidefaults.Namespace,
Context: context.TODO(),
ScanPeriod: 100 * time.Millisecond,
AuditLog: auditLog,
EventsC: eventsC,
})
c.Assert(err, check.IsNil)
// scanner should upload the events
err = uploader.Scan()
c.Assert(err, check.IsNil)
select {
case event := <-eventsC:
c.Assert(event, check.NotNil)
c.Assert(event.Error, check.IsNil)
case <-time.After(time.Second):
c.Fatalf("Timeout wating for the upload event")
}
}
func (a *AuditTestSuite) TestSessionRecordingOff(c *check.C) {
now := time.Now().In(time.UTC).Round(time.Second)
// create audit log with session recording disabled
fakeClock := clockwork.NewFakeClockAt(now)
alog, err := NewAuditLog(AuditLogConfig{
Clock: fakeClock,
DataDir: a.dataDir,
RecordSessions: true,
ServerID: "server1",
UploadHandler: NewMemoryUploader(),
})
c.Assert(err, check.IsNil)
username := "alice"
sessionID := string(session.NewID())
uploadDir := c.MkDir()
err = os.MkdirAll(filepath.Join(uploadDir, "upload", "sessions", apidefaults.Namespace), 0755)
c.Assert(err, check.IsNil)
forwarder, err := NewForwarder(ForwarderConfig{
Namespace: apidefaults.Namespace,
SessionID: session.ID(sessionID),
ServerID: teleport.ComponentUpload,
DataDir: uploadDir,
RecordSessions: false,
IAuditLog: alog,
Clock: fakeClock,
})
c.Assert(err, check.IsNil)
// start the session and emit data stream to it
firstMessage := []byte("hello")
err = forwarder.PostSessionSlice(SessionSlice{
Namespace: apidefaults.Namespace,
SessionID: sessionID,
Chunks: []*SessionChunk{
// start the session
{
Time: alog.Clock.Now().UTC().UnixNano(),
EventIndex: 0,
EventType: SessionStartEvent,
Data: marshal(EventFields{EventLogin: username}),
},
// type "hello" into session "100"
{
Time: alog.Clock.Now().UTC().UnixNano(),
EventIndex: 1,
ChunkIndex: 0,
Offset: 0,
EventType: SessionPrintEvent,
Data: firstMessage,
},
// end the session
{
Time: alog.Clock.Now().UTC().UnixNano(),
EventIndex: 0,
EventType: SessionEndEvent,
Data: marshal(EventFields{EventLogin: username}),
},
},
Version: V3,
})
c.Assert(err, check.IsNil)
c.Assert(forwarder.Close(), check.IsNil)
upload(c, uploadDir, fakeClock, alog)
// get all events from the audit log, should have two session event and one upload event
found, _, err := alog.SearchEvents(now.Add(-time.Hour), now.Add(time.Hour), apidefaults.Namespace, nil, 0, types.EventOrderAscending, "")
c.Assert(err, check.IsNil)
c.Assert(found, check.HasLen, 3)
eventA, okA := found[0].(*apievents.SessionStart)
eventB, okB := found[1].(*apievents.SessionEnd)
c.Assert(okA, check.Equals, true)
c.Assert(okB, check.Equals, true)
c.Assert(eventA.Login, check.Equals, username)
c.Assert(eventB.Login, check.Equals, username)
// inspect the session log for "200", should have two events
history, err := alog.GetSessionEvents(apidefaults.Namespace, session.ID(sessionID), 0, true)
c.Assert(err, check.IsNil)
c.Assert(history, check.HasLen, 2)
// try getting the session stream, should get an error
_, err = alog.GetSessionChunk(apidefaults.Namespace, session.ID(sessionID), 0, 5000)
c.Assert(err, check.NotNil)
}
func (a *AuditTestSuite) TestBasicLogging(c *check.C) {
// create audit log, write a couple of events into it, close it
clock := clockwork.NewFakeClock()
alog, err := a.makeLogWithClock(c, a.dataDir, true, clock)
alog, err := a.makeLogWithClock(c, a.dataDir, clock)
c.Assert(err, check.IsNil)
// emit regular event:
@ -344,7 +122,7 @@ func (a *AuditTestSuite) TestLogRotation(c *check.C) {
clock := clockwork.NewFakeClockAt(start)
// create audit log, write a couple of events into it, close it
alog, err := a.makeLogWithClock(c, a.dataDir, true, clock)
alog, err := a.makeLogWithClock(c, a.dataDir, clock)
c.Assert(err, check.IsNil)
defer func() {
c.Assert(alog.Close(), check.IsNil)
@ -388,222 +166,25 @@ func (a *AuditTestSuite) TestLogRotation(c *check.C) {
}
}
// TestForwardAndUpload tests forwarding server and upload
// server case
func (a *AuditTestSuite) TestForwardAndUpload(c *check.C) {
fakeClock := clockwork.NewFakeClock()
alog, err := NewAuditLog(AuditLogConfig{
DataDir: a.dataDir,
RecordSessions: true,
Clock: fakeClock,
ServerID: "remote",
UploadHandler: NewMemoryUploader(),
})
c.Assert(err, check.IsNil)
defer alog.Close()
a.forwardAndUpload(c, fakeClock, alog)
}
// TestLegacyHandler tests playback for legacy sessions
// that are stored on disk in unpacked format
func (a *AuditTestSuite) TestLegacyHandler(c *check.C) {
memory := NewMemoryUploader()
wrapper, err := NewLegacyHandler(LegacyHandlerConfig{
Handler: memory,
Dir: a.dataDir,
})
c.Assert(err, check.IsNil)
fakeClock := clockwork.NewFakeClock()
alog, err := NewAuditLog(AuditLogConfig{
DataDir: a.dataDir,
RecordSessions: true,
Clock: fakeClock,
ServerID: "remote",
UploadHandler: wrapper,
})
c.Assert(err, check.IsNil)
defer alog.Close()
sid, compare := a.forwardAndUpload(c, fakeClock, alog)
// Download the session in the old format
ctx := context.TODO()
tarball, err := os.CreateTemp("", "teleport-legacy")
c.Assert(err, check.IsNil)
defer os.RemoveAll(tarball.Name())
err = memory.Download(ctx, sid, tarball)
c.Assert(err, check.IsNil)
authServers, err := getAuthServers(a.dataDir)
c.Assert(err, check.IsNil)
c.Assert(authServers, check.HasLen, 1)
targetDir := filepath.Join(a.dataDir, authServers[0], SessionLogsDir, apidefaults.Namespace)
_, err = tarball.Seek(0, 0)
c.Assert(err, check.IsNil)
err = utils.Extract(tarball, targetDir)
c.Assert(err, check.IsNil)
unpacked, err := wrapper.IsUnpacked(ctx, sid)
c.Assert(err, check.IsNil)
c.Assert(unpacked, check.Equals, true)
// remove recording from the uploader
// and make sure that playback for the session still
// works
memory.Reset()
err = compare()
c.Assert(err, check.IsNil)
}
// TestExternalLog tests forwarding server and upload
// server case
func (a *AuditTestSuite) TestExternalLog(c *check.C) {
fileLog, err := NewFileLog(FileLogConfig{
Dir: c.MkDir(),
})
c.Assert(err, check.IsNil)
m := &MockAuditLog{
emitter: MockEmitter{},
}
fakeClock := clockwork.NewFakeClock()
alog, err := NewAuditLog(AuditLogConfig{
DataDir: a.dataDir,
RecordSessions: true,
Clock: fakeClock,
ServerID: "remote",
UploadHandler: NewMemoryUploader(),
ExternalLog: fileLog,
DataDir: a.dataDir,
Clock: fakeClock,
ServerID: "remote",
UploadHandler: NewMemoryUploader(),
ExternalLog: m,
})
c.Assert(err, check.IsNil)
defer alog.Close()
a.forwardAndUpload(c, fakeClock, alog)
}
// forwardAndUpload tests forwarding server and upload
// server case
func (a *AuditTestSuite) forwardAndUpload(c *check.C, fakeClock clockwork.Clock, alog IAuditLog) (session.ID, func() error) {
uploadDir := c.MkDir()
err := os.MkdirAll(filepath.Join(uploadDir, "upload", "sessions", apidefaults.Namespace), 0755)
c.Assert(err, check.IsNil)
sessionID := session.NewID()
forwarder, err := NewForwarder(ForwarderConfig{
Namespace: apidefaults.Namespace,
SessionID: sessionID,
ServerID: "upload",
DataDir: uploadDir,
RecordSessions: true,
IAuditLog: alog,
})
c.Assert(err, check.IsNil)
// start the session and emit data stream to it and wrap it up
firstMessage := []byte("hello")
err = forwarder.PostSessionSlice(SessionSlice{
Namespace: apidefaults.Namespace,
SessionID: string(sessionID),
Chunks: []*SessionChunk{
// start the seession
{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 0,
EventType: SessionStartEvent,
Data: marshal(EventFields{EventLogin: "bob"}),
},
// type "hello" into session "100"
{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 1,
ChunkIndex: 0,
Offset: 0,
EventType: SessionPrintEvent,
Data: firstMessage,
},
// emitting session end event should close the session
{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 4,
EventType: SessionEndEvent,
Data: marshal(EventFields{EventLogin: "bob"}),
},
},
Version: V2,
})
c.Assert(err, check.IsNil)
c.Assert(forwarder.Close(), check.IsNil)
upload(c, uploadDir, fakeClock, alog)
compare := func() error {
history, err := alog.GetSessionEvents(apidefaults.Namespace, sessionID, 0, true)
if err != nil {
return trace.Wrap(err)
}
if len(history) != 3 {
return trace.BadParameter("expected history of 3, got %v", len(history))
}
// make sure offsets were properly set (0 for the first event and 5 bytes for hello):
if history[1][SessionByteOffset].(float64) != float64(0) {
return trace.BadParameter("expected offset of 0, got %v", history[1][SessionByteOffset])
}
if history[1][SessionEventTimestamp].(float64) != float64(0) {
return trace.BadParameter("expected timestamp of 0, got %v", history[1][SessionEventTimestamp])
}
// fetch all bytes
buff, err := alog.GetSessionChunk(apidefaults.Namespace, sessionID, 0, 5000)
if err != nil {
return trace.Wrap(err)
}
if string(buff) != string(firstMessage) {
return trace.CompareFailed("%q != %q", string(buff), string(firstMessage))
}
// with offset
buff, err = alog.GetSessionChunk(apidefaults.Namespace, sessionID, 2, 5000)
if err != nil {
return trace.Wrap(err)
}
if string(buff) != string(firstMessage[2:]) {
return trace.CompareFailed("%q != %q", string(buff), string(firstMessage[2:]))
}
return nil
}
// trigger several parallel downloads, they should not fail
iterations := 50
resultsC := make(chan error, iterations)
for i := 0; i < iterations; i++ {
go func() {
resultsC <- compare()
}()
}
timeout := time.After(time.Second)
for i := 0; i < iterations; i++ {
select {
case err := <-resultsC:
c.Assert(err, check.IsNil)
case <-timeout:
c.Fatalf("timeout waiting for goroutines to finish")
}
}
return sessionID, compare
}
func marshal(f EventFields) []byte {
data, err := json.Marshal(f)
if err != nil {
panic(err)
}
return data
evt := &events.SessionConnect{}
c.Assert(alog.EmitAuditEvent(context.Background(), evt), check.IsNil)
c.Assert(m.emitter.events, check.HasLen, 1)
c.Assert(m.emitter.events[0], check.Equals, evt)
}

View file

@ -1,220 +0,0 @@
/*
Copyright 2018 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"context"
"encoding/json"
"sync"
"time"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
)
// ForwarderConfig forwards session log events
// to the auth server, and writes the session playback to disk
type ForwarderConfig struct {
// IAuditLog is the audit log to forward non-print events to
IAuditLog
// SessionID is a session id to write
SessionID session.ID
// ServerID is a serverID data directory
ServerID string
// DataDir is a data directory
DataDir string
// RecordSessions is a sessions recording setting
RecordSessions bool
// Namespace is a namespace of the session
Namespace string
// Clock is a clock to set for tests
Clock clockwork.Clock
// UID is UID generator
UID utils.UID
}
// CheckAndSetDefaults checks and sets default values
func (s *ForwarderConfig) CheckAndSetDefaults() error {
if s.IAuditLog == nil {
return trace.BadParameter("missing parameter bucket")
}
if s.DataDir == "" {
return trace.BadParameter("missing data dir")
}
if s.Clock == nil {
s.Clock = clockwork.NewRealClock()
}
if s.UID == nil {
s.UID = utils.NewRealUID()
}
return nil
}
// NewForwarder returns a new instance of session forwarder
// TODO(zmb3): this is not used outside of tests - remove it
func NewForwarder(cfg ForwarderConfig) (*Forwarder, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
diskLogger, err := NewDiskSessionLogger(DiskSessionLoggerConfig{
SessionID: cfg.SessionID,
DataDir: cfg.DataDir,
RecordSessions: cfg.RecordSessions,
Namespace: cfg.Namespace,
ServerID: cfg.ServerID,
Clock: cfg.Clock,
})
if err != nil {
return nil, trace.Wrap(err)
}
return &Forwarder{
ForwarderConfig: cfg,
sessionLogger: diskLogger,
enhancedIndexes: map[string]int64{
SessionCommandEvent: 0,
SessionDiskEvent: 0,
SessionNetworkEvent: 0,
},
}, nil
}
// ForwarderConfig forwards session log events
// to the auth server, and writes the session playback to disk
type Forwarder struct {
ForwarderConfig
sessionLogger *DiskSessionLogger
lastChunk *SessionChunk
eventIndex int64
enhancedIndexes map[string]int64
sync.Mutex
isClosed bool
}
// Closer releases connection and resources associated with log if any
func (l *Forwarder) Close() error {
l.Lock()
defer l.Unlock()
if l.isClosed {
return nil
}
l.isClosed = true
return l.sessionLogger.Finalize()
}
// EmitAuditEvent is not implemented
// Events are forwarded to the auth server and is then emitted from there
func (r *Forwarder) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error {
return trace.NotImplemented("not implemented")
}
// EmitAuditEventLegacy emits audit event
func (l *Forwarder) EmitAuditEventLegacy(event Event, fields EventFields) error {
err := UpdateEventFields(event, fields, l.Clock, l.UID)
if err != nil {
return trace.Wrap(err)
}
data, err := json.Marshal(fields)
if err != nil {
return trace.Wrap(err)
}
chunks := []*SessionChunk{
{
EventType: event.Name,
Data: data,
Time: time.Now().UTC().UnixNano(),
},
}
return l.PostSessionSlice(SessionSlice{
Namespace: l.Namespace,
SessionID: string(l.SessionID),
Version: V3,
Chunks: chunks,
})
}
// PostSessionSlice sends chunks of recorded session to the event log
func (l *Forwarder) PostSessionSlice(slice SessionSlice) error {
// setup slice sets slice version, properly numerates
// all chunks and
chunksWithoutPrintEvents, err := l.setupSlice(&slice)
if err != nil {
return trace.Wrap(err)
}
// log all events and session recording locally
err = l.sessionLogger.PostSessionSlice(slice)
if err != nil {
return trace.Wrap(err)
}
// no chunks to post (all chunks are print events)
if len(chunksWithoutPrintEvents) == 0 {
return nil
}
slice.Chunks = chunksWithoutPrintEvents
slice.Version = V3
err = l.IAuditLog.PostSessionSlice(slice)
return err
}
func (l *Forwarder) setupSlice(slice *SessionSlice) ([]*SessionChunk, error) {
l.Lock()
defer l.Unlock()
if l.isClosed {
return nil, trace.BadParameter("write on closed forwarder")
}
// Setup chunk indexes.
var chunks []*SessionChunk
for _, chunk := range slice.Chunks {
switch chunk.EventType {
case "":
return nil, trace.BadParameter("missing event type")
case SessionCommandEvent, SessionDiskEvent, SessionNetworkEvent:
chunk.EventIndex = l.enhancedIndexes[chunk.EventType]
l.enhancedIndexes[chunk.EventType]++
chunks = append(chunks, chunk)
case SessionPrintEvent:
chunk.EventIndex = l.eventIndex
l.eventIndex++
// Filter out chunks with session print events, as this logger forwards
// only audit events to the auth server.
if l.lastChunk != nil {
chunk.Offset = l.lastChunk.Offset + int64(len(l.lastChunk.Data))
chunk.Delay = diff(time.Unix(0, l.lastChunk.Time), time.Unix(0, chunk.Time)) + l.lastChunk.Delay
chunk.ChunkIndex = l.lastChunk.ChunkIndex + 1
}
l.lastChunk = chunk
default:
chunk.EventIndex = l.eventIndex
l.eventIndex++
chunks = append(chunks, chunk)
}
}
return chunks, nil
}

View file

@ -18,21 +18,10 @@ package events
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"
)
// UploadHandler is a function supplied by the user, it will upload
@ -62,267 +51,3 @@ type UploadEvent struct {
// Created is a time of when the event has been created
Created time.Time
}
// UploaderConfig sets up configuration for uploader service
type UploaderConfig struct {
// DataDir is data directory for session events files
DataDir string
// Clock is the clock replacement
Clock clockwork.Clock
// Namespace is logger namespace
Namespace string
// ServerID is a server ID
ServerID string
// Context is an optional context
Context context.Context
// ScanPeriod is a uploader dir scan period
ScanPeriod time.Duration
// ConcurrentUploads sets up how many parallel uploads to schedule
ConcurrentUploads int
// AuditLog is audit log client
AuditLog IAuditLog
// EventsC is an event channel used to signal events
// used in tests
EventsC chan UploadEvent
}
// CheckAndSetDefaults checks and sets default values of UploaderConfig
func (cfg *UploaderConfig) CheckAndSetDefaults() error {
if cfg.ServerID == "" {
return trace.BadParameter("missing parameter ServerID")
}
if cfg.AuditLog == nil {
return trace.BadParameter("missing parameter AuditLog")
}
if cfg.DataDir == "" {
return trace.BadParameter("missing parameter DataDir")
}
if cfg.Namespace == "" {
return trace.BadParameter("missing parameter Namespace")
}
if cfg.ConcurrentUploads <= 0 {
cfg.ConcurrentUploads = defaults.UploaderConcurrentUploads
}
if cfg.ScanPeriod <= 0 {
cfg.ScanPeriod = defaults.UploaderScanPeriod
}
if cfg.Context == nil {
cfg.Context = context.TODO()
}
if cfg.Clock == nil {
cfg.Clock = clockwork.NewRealClock()
}
return nil
}
// NewUploader creates new disk based session logger
func NewUploader(cfg UploaderConfig) (*Uploader, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
ctx, cancel := context.WithCancel(cfg.Context)
uploader := &Uploader{
UploaderConfig: cfg,
Entry: log.WithFields(log.Fields{
trace.Component: teleport.ComponentAuditLog,
}),
cancel: cancel,
ctx: ctx,
semaphore: make(chan struct{}, cfg.ConcurrentUploads),
scanDir: filepath.Join(cfg.DataDir, cfg.ServerID, SessionLogsDir, cfg.Namespace),
}
return uploader, nil
}
// Uploader implements a disk based session logger. The imporant
// property of the disk based logger is that it never fails and can be used as
// a fallback implementation behind more sophisticated loggers.
type Uploader struct {
UploaderConfig
semaphore chan struct{}
scanDir string
*log.Entry
cancel context.CancelFunc
ctx context.Context
}
func (u *Uploader) Serve() error {
t := time.NewTicker(u.ScanPeriod)
defer t.Stop()
for {
select {
case <-u.ctx.Done():
u.Debugf("Uploader is exiting.")
return nil
case <-t.C:
if err := u.Scan(); err != nil {
if trace.Unwrap(err) != errContext {
u.Warningf("Uploader scan failed: %v", trace.DebugReport(err))
}
}
}
}
}
var errContext = fmt.Errorf("context has closed")
func (u *Uploader) takeSemaphore() error {
select {
case u.semaphore <- struct{}{}:
return nil
case <-u.ctx.Done():
u.Debugf("Context is closing.")
return errContext
}
}
func (u *Uploader) releaseSemaphore() error {
select {
case <-u.semaphore:
return nil
case <-u.ctx.Done():
u.Debugf("Context is closing.")
return errContext
}
}
func (u *Uploader) removeFiles(sessionID session.ID) error {
df, err := os.Open(u.scanDir)
if err != nil {
return trace.ConvertSystemError(err)
}
defer df.Close()
entries, err := df.Readdir(-1)
if err != nil {
return trace.ConvertSystemError(err)
}
for i := range entries {
fi := entries[i]
if fi.IsDir() {
continue
}
if !strings.HasPrefix(fi.Name(), string(sessionID)) {
continue
}
path := filepath.Join(u.scanDir, fi.Name())
if err := os.Remove(path); err != nil {
u.Warningf("Failed to remove %v: %v.", path, trace.DebugReport(err))
}
u.Debugf("Removed %v.", path)
}
return nil
}
func (u *Uploader) emitEvent(e UploadEvent) {
if u.EventsC == nil {
return
}
select {
case u.EventsC <- e:
return
default:
u.Warningf("Skip send event on a blocked channel.")
}
}
func (u *Uploader) uploadFile(lockFilePath string, sessionID session.ID) error {
lockFile, err := os.OpenFile(lockFilePath, os.O_RDWR, 0)
if err != nil {
return trace.ConvertSystemError(err)
}
if err := utils.FSTryWriteLock(lockFile); err != nil {
return trace.Wrap(err)
}
reader, err := NewSessionArchive(u.DataDir, u.ServerID, u.Namespace, sessionID)
if err != nil {
return trace.Wrap(err)
}
if err := u.takeSemaphore(); err != nil {
return trace.Wrap(err)
}
go func() {
defer u.releaseSemaphore()
defer reader.Close()
defer lockFile.Close()
defer utils.FSUnlock(lockFile)
start := time.Now()
err := u.AuditLog.UploadSessionRecording(SessionRecording{
Namespace: u.Namespace,
SessionID: sessionID,
Recording: reader,
})
if err != nil {
u.emitEvent(UploadEvent{
SessionID: string(sessionID),
Error: err,
})
u.WithFields(log.Fields{"duration": time.Since(start), "session-id": sessionID}).Warningf("Session upload failed: %v", trace.DebugReport(err))
return
}
u.WithFields(log.Fields{"duration": time.Since(start), "session-id": sessionID}).Debugf("Session upload completed.")
u.emitEvent(UploadEvent{
SessionID: string(sessionID),
})
if err != nil {
u.Warningf("Failed to post upload event: %v. Will retry next time.", trace.DebugReport(err))
return
}
if err := u.removeFiles(sessionID); err != nil {
u.Warningf("Failed to remove files: %v.", err)
}
}()
return nil
}
// Scan scans the directory and uploads recordings
func (u *Uploader) Scan() error {
df, err := os.Open(u.scanDir)
err = trace.ConvertSystemError(err)
if err != nil {
return trace.Wrap(err)
}
defer df.Close()
entries, err := df.Readdir(-1)
if err != nil {
return trace.ConvertSystemError(err)
}
var count int
for i := range entries {
fi := entries[i]
if fi.IsDir() {
continue
}
if !strings.HasSuffix(fi.Name(), "completed") {
continue
}
parts := strings.Split(fi.Name(), ".")
if len(parts) < 2 {
u.Debugf("Uploader, skipping unknown file: %v", fi.Name())
continue
}
sessionID, err := session.ParseID(parts[0])
if err != nil {
u.Debugf("Skipping file with invalid name: %v.", parts[0])
continue
}
lockFilePath := filepath.Join(u.scanDir, fi.Name())
if err := u.uploadFile(lockFilePath, *sessionID); err != nil {
if trace.IsCompareFailed(err) {
u.Debugf("Uploader detected locked file %v, another process is uploading it.", lockFilePath)
continue
}
return trace.Wrap(err)
}
count++
}
return nil
}
func (u *Uploader) Stop() error {
u.cancel()
return nil
}

View file

@ -1127,10 +1127,7 @@ func (process *TeleportProcess) initAuthService() error {
} else {
// check if session recording has been disabled. note, we will continue
// logging audit events, we just won't record sessions.
recordSessions := true
if cfg.Auth.SessionRecordingConfig.GetMode() == types.RecordOff {
recordSessions = false
warningMessage := "Warning: Teleport session recording have been turned off. " +
"This is dangerous, you will not be able to save and playback sessions."
process.log.Warn(warningMessage)
@ -1160,12 +1157,11 @@ func (process *TeleportProcess) initAuthService() error {
}
auditServiceConfig := events.AuditLogConfig{
Context: process.ExitContext(),
DataDir: filepath.Join(cfg.DataDir, teleport.LogsDir),
RecordSessions: recordSessions,
ServerID: cfg.HostUUID,
UploadHandler: uploadHandler,
ExternalLog: externalLog,
Context: process.ExitContext(),
DataDir: filepath.Join(cfg.DataDir, teleport.LogsDir),
ServerID: cfg.HostUUID,
UploadHandler: uploadHandler,
ExternalLog: externalLog,
}
auditServiceConfig.UID, auditServiceConfig.GID, err = adminCreds()
if err != nil {