From 71c15e5835e0cc5f54c086eefa8c435c0e00ee12 Mon Sep 17 00:00:00 2001 From: Sasha Klizhentas Date: Sun, 31 Dec 2017 15:31:57 -0800 Subject: [PATCH] Add support for NFS-friendly log protocol. * Session events are delivered in continuous batches in a guaranteed order with every event and print event ordered from session start. * Each auth server writes to a separate folder on disk to make sure that no two processes write to the same file at a time. * When retrieving sessions, auth servers fetch and merge results recorded by each auth server. * Migrations and compatibility modes are in place for older clients not aware of the new format, but compatibility mode is not NFS friendly. * On disk migrations are launched automatically during auth server upgrades. --- docker/one.yaml | 1 - integration/integration_test.go | 50 +-- lib/auth/helpers.go | 5 +- lib/auth/password_test.go | 6 +- lib/auth/tun_test.go | 5 +- lib/events/api.go | 11 + lib/events/auditlog.go | 611 ++++++++++++++++++++------ lib/events/auditlog_test.go | 686 +++++++++++++++++++++++++++++- lib/events/compatsessionlog.go | 315 ++++++++++++++ lib/events/doc.go | 193 ++++++++- lib/events/sessionlog.go | 317 ++++++++------ lib/events/slice.pb.go | 247 +++++++++-- lib/events/slice.proto | 17 +- lib/reversetunnel/agent.go | 2 - lib/service/service.go | 2 + lib/srv/regular/sshserver.go | 9 + lib/srv/regular/sshserver_test.go | 11 + lib/srv/sess.go | 41 +- lib/state/log.go | 46 +- lib/state/log_test.go | 1 + lib/web/apiserver_test.go | 1 + 21 files changed, 2205 insertions(+), 372 deletions(-) create mode 100644 lib/events/compatsessionlog.go diff --git a/docker/one.yaml b/docker/one.yaml index 9b56b1562f2..cc4db00a412 100644 --- a/docker/one.yaml +++ b/docker/one.yaml @@ -33,5 +33,4 @@ ssh_service: proxy_service: enabled: yes - proxy_protocol: on diff --git a/integration/integration_test.go b/integration/integration_test.go index eaa4104200a..ea5601ba6ea 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -28,6 +28,7 @@ import ( "os/exec" "os/user" "path/filepath" + "regexp" "strconv" "strings" "testing" @@ -155,9 +156,9 @@ func (s *IntSuite) newTeleportWithConfig(c *check.C, logins []string, instanceSe return t } -// TestAudit creates a live session, records a bunch of data through it (>5MB) +// TestAuditOn creates a live session, records a bunch of data through it // and then reads it back and compares against simulated reality. -func (s *IntSuite) TestAudit(c *check.C) { +func (s *IntSuite) TestAuditOn(c *check.C) { var tests = []struct { inRecordLocation string inForwardAgent bool @@ -279,16 +280,6 @@ func (s *IntSuite) TestAudit(c *check.C) { // make sure it's us who joined! :) c.Assert(session.Parties[0].User, check.Equals, s.me.Username) - // lets add something to the session stream: - // write 1MB chunk - bigChunk := make([]byte, 1024*1024) - err = site.PostSessionChunk(defaults.Namespace, session.ID, bytes.NewReader(bigChunk)) - c.Assert(err, check.Equals, nil) - - // then add small prefix: - err = site.PostSessionChunk(defaults.Namespace, session.ID, bytes.NewBufferString("\nsuffix")) - c.Assert(err, check.Equals, nil) - // lets type "echo hi" followed by "enter" and then "exit" + "enter": myTerm.Type("\aecho hi\n\r\aexit\n\r\a") @@ -297,15 +288,17 @@ func (s *IntSuite) TestAudit(c *check.C) { // read back the entire session (we have to try several times until we get back // everything because the session is closing) - const expectedLen = 1048600 var sessionStream []byte - for i := 0; len(sessionStream) < expectedLen; i++ { + for i := 0; i < 5; i++ { sessionStream, err = site.GetSessionChunk(defaults.Namespace, session.ID, 0, events.MaxChunkBytes) c.Assert(err, check.IsNil) + if strings.Contains(string(sessionStream), "exit") { + break + } time.Sleep(time.Millisecond * 250) if i > 10 { // session stream keeps coming back short - c.Fatalf("stream is too short: <%d", expectedLen) + c.Fatal("stream is not getting data") } } @@ -316,11 +309,10 @@ func (s *IntSuite) TestAudit(c *check.C) { // hi // edsger ~: exit // logout - // <1MB of zeros here> - // suffix // - c.Assert(strings.Contains(string(sessionStream), "echo hi"), check.Equals, true) - c.Assert(strings.Contains(string(sessionStream), "\nsuffix"), check.Equals, true) + comment := check.Commentf("%q", string(sessionStream)) + c.Assert(strings.Contains(string(sessionStream), "echo hi"), check.Equals, true, comment) + c.Assert(strings.Contains(string(sessionStream), "exit"), check.Equals, true, comment) // now lets look at session events: history, err := site.GetSessionEvents(defaults.Namespace, session.ID, 0) @@ -364,18 +356,14 @@ func (s *IntSuite) TestAudit(c *check.C) { } c.Assert(start.GetString(events.SessionServerID), check.Equals, expectedServerID) - // find "\nsuffix" write and find our huge 1MB chunk - prefixFound, hugeChunkFound := false, false + // make sure data is recorded properly + out := &bytes.Buffer{} for _, e := range history { - if getChunk(e, 10) == "\nsuffix" { - prefixFound = true - } - if e.GetInt("bytes") == 1048576 { - hugeChunkFound = true - } + out.WriteString(getChunk(e, 1000)) } - c.Assert(prefixFound, check.Equals, true) - c.Assert(hugeChunkFound, check.Equals, true) + recorded := replaceNewlines(out.String()) + c.Assert(recorded, check.Matches, ".*exit.*") + c.Assert(recorded, check.Matches, ".*echo hi.*") // there should alwys be 'session.end' event end := findByType(events.SessionEndEvent) @@ -396,6 +384,10 @@ func (s *IntSuite) TestAudit(c *check.C) { } } +func replaceNewlines(in string) string { + return regexp.MustCompile(`\r?\n`).ReplaceAllString(in, `\n`) +} + // TestInteroperability checks if Teleport and OpenSSH behave in the same way // when executing commands. func (s *IntSuite) TestInteroperability(c *check.C) { diff --git a/lib/auth/helpers.go b/lib/auth/helpers.go index df5b263621b..91df6cf5c9d 100644 --- a/lib/auth/helpers.go +++ b/lib/auth/helpers.go @@ -90,7 +90,10 @@ func NewTestAuthServer(cfg TestAuthServerConfig) (*TestAuthServer, error) { } srv.AuditLog, err = events.NewAuditLog(events.AuditLogConfig{ - DataDir: cfg.Dir, RecordSessions: true}) + DataDir: cfg.Dir, + RecordSessions: true, + ServerID: cfg.ClusterName, + }) if err != nil { return nil, trace.Wrap(err) } diff --git a/lib/auth/password_test.go b/lib/auth/password_test.go index f9889fd5177..bd89b5715aa 100644 --- a/lib/auth/password_test.go +++ b/lib/auth/password_test.go @@ -106,10 +106,10 @@ func (s *PasswordSuite) TestTiming(c *C) { elapsedNotExists = elapsedNotExists + elapsed } - // elapsedDifference must be less than 20 ms + // elapsedDifference must be less than 40 ms (with some leeway for delays in runtime) elapsedDifference := elapsedExists/10 - elapsedNotExists/10 - comment := Commentf("elapsed difference (%v) greater than 30 ms", elapsedDifference) - c.Assert(elapsedDifference.Seconds() < 0.030, Equals, true, comment) + comment := Commentf("elapsed difference (%v) greater than 40 ms", elapsedDifference) + c.Assert(elapsedDifference.Seconds() < 0.040, Equals, true, comment) } func (s *PasswordSuite) TestChangePassword(c *C) { diff --git a/lib/auth/tun_test.go b/lib/auth/tun_test.go index 38e6d39234c..2057f6566ae 100644 --- a/lib/auth/tun_test.go +++ b/lib/auth/tun_test.go @@ -73,7 +73,10 @@ func (s *TunSuite) SetUpTest(c *C) { c.Assert(err, IsNil) s.alog, err = events.NewAuditLog(events.AuditLogConfig{ - DataDir: s.dir, RecordSessions: true}) + DataDir: s.dir, + RecordSessions: true, + ServerID: "sid1", + }) c.Assert(err, IsNil) s.sessionServer, err = session.New(s.bk) diff --git a/lib/events/api.go b/lib/events/api.go index 68777e38e28..c5b5ee44ad6 100644 --- a/lib/events/api.go +++ b/lib/events/api.go @@ -35,6 +35,9 @@ const ( RemoteAddr = "addr.remote" // client (user's) address EventCursor = "id" // event ID (used as cursor value for enumeration, not stored) + // EventIndex is an event index as received from the logging server + EventIndex = "ei" + // EventNamespace is a namespace of the session event EventNamespace = "namespace" @@ -122,6 +125,14 @@ const ( MaxChunkBytes = 1024 * 1024 * 5 ) +const ( + // V1 is the V1 version of slice chunks API, + // it is 0 because it was not defined before + V1 = 0 + // V2 is the V2 version of slice chunks API + V2 = 2 +) + // IAuditLog is the primary (and the only external-facing) interface for AuditLogger. // If you wish to implement a different kind of logger (not filesystem-based), you // have to implement this interface diff --git a/lib/events/auditlog.go b/lib/events/auditlog.go index 54fe650ea18..8b3fd5dbc81 100644 --- a/lib/events/auditlog.go +++ b/lib/events/auditlog.go @@ -23,6 +23,7 @@ import ( "encoding/json" "fmt" "io" + "io/ioutil" "net/url" "os" "path/filepath" @@ -97,6 +98,9 @@ type AuditLogConfig struct { // DataDir is the directory where audit log stores the data DataDir string + // ServerID is the id of the audit log server + ServerID string + // RecordSessions controls if sessions are recorded along with audit events. RecordSessions bool @@ -128,6 +132,9 @@ func (a *AuditLogConfig) CheckAndSetDefaults() error { if a.DataDir == "" { return trace.BadParameter("missing parameter DataDir") } + if a.ServerID == "" { + return trace.BadParameter("missing parameter ServerID") + } if a.Clock == nil { a.Clock = clockwork.NewRealClock() } @@ -154,22 +161,6 @@ func NewAuditLog(cfg AuditLogConfig) (*AuditLog, error) { if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } - // create a directory for session logs: - sessionDir := filepath.Join(cfg.DataDir, SessionLogsDir) - if err := os.MkdirAll(sessionDir, *cfg.DirMask); err != nil { - return nil, trace.ConvertSystemError(err) - } - - if cfg.UID != nil && cfg.GID != nil { - err := os.Chown(cfg.DataDir, *cfg.UID, *cfg.GID) - if err != nil { - return nil, trace.ConvertSystemError(err) - } - err = os.Chown(sessionDir, *cfg.UID, *cfg.GID) - if err != nil { - return nil, trace.ConvertSystemError(err) - } - } al := &AuditLog{ AuditLogConfig: cfg, @@ -183,6 +174,32 @@ func NewAuditLog(cfg AuditLogConfig) (*AuditLog, error) { return nil, trace.Wrap(err) } al.loggers = loggers + // create a directory for audit logs, audit log does not create + // session logs before migrations are run in case if the directory + // has to be moved + auditDir := filepath.Join(cfg.DataDir, cfg.ServerID) + if err := os.MkdirAll(auditDir, *cfg.DirMask); err != nil { + return nil, trace.ConvertSystemError(err) + } + if err := al.runMigrations(); err != nil { + return nil, trace.Wrap(err) + } + // create a directory for session logs: + sessionDir := filepath.Join(cfg.DataDir, cfg.ServerID, SessionLogsDir) + if err := os.MkdirAll(sessionDir, *cfg.DirMask); err != nil { + return nil, trace.ConvertSystemError(err) + } + if cfg.UID != nil && cfg.GID != nil { + err := os.Chown(cfg.DataDir, *cfg.UID, *cfg.GID) + if err != nil { + return nil, trace.ConvertSystemError(err) + } + err = os.Chown(sessionDir, *cfg.UID, *cfg.GID) + if err != nil { + return nil, trace.ConvertSystemError(err) + } + } + go al.periodicCloseInactiveLoggers() return al, nil } @@ -199,7 +216,7 @@ func (l *AuditLog) PostSessionSlice(slice SessionSlice) error { if len(slice.Chunks) == 0 { return trace.BadParameter("missing session chunks") } - sl, err := l.LoggerFor(slice.Namespace, session.ID(slice.SessionID)) + sl, err := l.LoggerFor(slice.Namespace, session.ID(slice.SessionID), slice.Version == V1) if err != nil { l.Errorf("failed to get logger: %v", trace.DebugReport(err)) return trace.BadParameter("audit.log: no session writer for %s", slice.SessionID) @@ -213,6 +230,8 @@ func (l *AuditLog) PostSessionSlice(slice SessionSlice) error { return nil } +// DELETE IN: 2.6.0 +// This method is no longer used in 2.5.0 // PostSessionChunk writes a new chunk of session stream into the audit log func (l *AuditLog) PostSessionChunk(namespace string, sid session.ID, reader io.Reader) error { tmp, err := utils.ReadAll(reader, 16*1024) @@ -230,16 +249,159 @@ func (l *AuditLog) PostSessionChunk(namespace string, sid session.ID, reader io. }) } +func (l *AuditLog) getAuthServers() ([]string, error) { + // scan the log directory: + df, err := os.Open(l.DataDir) + if err != nil { + return nil, trace.Wrap(err) + } + defer df.Close() + entries, err := df.Readdir(-1) + if err != nil { + return nil, trace.Wrap(err) + } + var authServers []string + for i := range entries { + fi := entries[i] + if fi.IsDir() { + authServers = append(authServers, filepath.Base(fi.Name())) + } + } + return authServers, nil +} + +type sessionIndex struct { + dataDir string + namespace string + sid session.ID + events []indexEntry + chunks []indexEntry +} + +func (idx *sessionIndex) sort() { + sort.Slice(idx.events, func(i, j int) bool { + return idx.events[i].Index < idx.events[j].Index + }) + sort.Slice(idx.chunks, func(i, j int) bool { + return idx.chunks[i].Offset < idx.chunks[j].Offset + }) +} + +func (idx *sessionIndex) eventsFileName(index int) string { + entry := idx.events[index] + return filepath.Join(idx.dataDir, entry.authServer, SessionLogsDir, idx.namespace, entry.FileName) +} + +func (idx *sessionIndex) eventsFile(afterN int) (int, error) { + for i := len(idx.events) - 1; i >= 0; i-- { + entry := idx.events[i] + if int64(afterN) >= entry.Index { + return i, nil + } + } + return -1, trace.NotFound("%v not found", afterN) +} + +func (idx *sessionIndex) chunksFile(offset int64) (string, int64, error) { + for i := len(idx.chunks) - 1; i >= 0; i-- { + entry := idx.chunks[i] + if offset >= entry.Offset { + return filepath.Join(idx.dataDir, entry.authServer, SessionLogsDir, idx.namespace, entry.FileName), entry.Offset, nil + } + } + return "", 0, trace.NotFound("%v not found", offset) +} + +func (l *AuditLog) readSessionIndex(namespace string, sid session.ID) (*sessionIndex, error) { + authServers, err := l.getAuthServers() + if err != nil { + return nil, trace.Wrap(err) + } + index := sessionIndex{ + sid: sid, + dataDir: l.DataDir, + namespace: namespace, + } + for _, authServer := range authServers { + indexFileName := filepath.Join(l.DataDir, authServer, SessionLogsDir, namespace, fmt.Sprintf("%v.index", sid)) + indexFile, err := os.OpenFile(indexFileName, os.O_RDONLY, 0640) + err = trace.ConvertSystemError(err) + if err != nil { + if !trace.IsNotFound(err) { + return nil, trace.Wrap(err) + } + continue + } + events, chunks, err := readIndexEntries(indexFile, authServer) + if err != nil { + return nil, trace.Wrap(err) + } + index.events = append(index.events, events...) + index.chunks = append(index.chunks, chunks...) + err = indexFile.Close() + if err != nil { + return nil, trace.Wrap(err) + } + } + index.sort() + return &index, nil +} + +func readIndexEntries(file *os.File, authServer string) (events []indexEntry, chunks []indexEntry, err error) { + scanner := bufio.NewScanner(file) + for lineNo := 0; scanner.Scan(); lineNo++ { + var entry indexEntry + if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil { + return nil, nil, trace.Wrap(err) + } + entry.authServer = authServer + switch entry.Type { + case fileTypeEvents: + events = append(events, entry) + case fileTypeChunks: + chunks = append(chunks, entry) + default: + return nil, nil, trace.BadParameter("unsupported type: %q", entry.Type) + } + } + return +} + // GetSessionChunk returns a reader which console and web clients request // to receive a live stream of a given session. The reader allows access to a // session stream range from offsetBytes to offsetBytes+maxBytes -// func (l *AuditLog) GetSessionChunk(namespace string, sid session.ID, offsetBytes, maxBytes int) ([]byte, error) { - l.Debugf("getSessionReader(%v, %v)", namespace, sid) + var data []byte + for { + out, err := l.getSessionChunk(namespace, sid, offsetBytes, maxBytes) + if err != nil { + if err == io.EOF { + return data, nil + } + return nil, trace.Wrap(err) + } + data = append(data, out...) + if len(data) == maxBytes || len(out) == 0 { + return data, nil + } + maxBytes = maxBytes - len(out) + offsetBytes = offsetBytes + len(out) + } +} + +func (l *AuditLog) getSessionChunk(namespace string, sid session.ID, offsetBytes, maxBytes int) ([]byte, error) { if namespace == "" { return nil, trace.BadParameter("missing parameter namespace") } - fstream, err := os.OpenFile(l.sessionStreamFn(namespace, sid), os.O_RDONLY, 0640) + idx, err := l.readSessionIndex(namespace, sid) + if err != nil { + return nil, trace.Wrap(err) + } + fileName, fileOffset, err := idx.chunksFile(int64(offsetBytes)) + if err != nil { + return nil, trace.Wrap(err) + } + fstream, err := os.OpenFile(fileName, os.O_RDONLY, 0640) if err != nil { log.Warning(err) return nil, trace.Wrap(err) @@ -247,12 +409,11 @@ func (l *AuditLog) GetSessionChunk(namespace string, sid session.ID, offsetBytes defer fstream.Close() // seek to 'offset' from the beginning - fstream.Seek(int64(offsetBytes), 0) + fstream.Seek(int64(offsetBytes)-fileOffset, 0) // copy up to maxBytes from the offset position: var buff bytes.Buffer io.Copy(&buff, io.LimitReader(fstream, int64(maxBytes))) - return buff.Bytes(), nil } @@ -267,7 +428,31 @@ func (l *AuditLog) GetSessionEvents(namespace string, sid session.ID, afterN int if namespace == "" { return nil, trace.BadParameter("missing parameter namespace") } - logFile, err := os.OpenFile(l.sessionLogFn(namespace, sid), os.O_RDONLY, 0640) + idx, err := l.readSessionIndex(namespace, sid) + if err != nil { + return nil, trace.Wrap(err) + } + fileIndex, err := idx.eventsFile(afterN) + if err != nil { + return nil, trace.Wrap(err) + } + events := make([]EventFields, 0, 256) + for i := fileIndex; i < len(idx.events); i++ { + skip := 0 + if i == fileIndex { + skip = afterN + } + out, err := l.fetchSessionEvents(idx.eventsFileName(i), skip) + if err != nil { + return nil, trace.Wrap(err) + } + events = append(events, out...) + } + return events, nil +} + +func (l *AuditLog) fetchSessionEvents(fileName string, afterN int) ([]EventFields, error) { + logFile, err := os.OpenFile(fileName, os.O_RDONLY, 0640) if err != nil { // no file found? this means no events have been logged yet if os.IsNotExist(err) { @@ -300,6 +485,44 @@ func (l *AuditLog) GetSessionEvents(namespace string, sid session.ID, afterN int func (l *AuditLog) EmitAuditEvent(eventType string, fields EventFields) error { l.Debugf("EmitAuditEvent(%v: %v)", eventType, fields) + if err := l.emitAuditEvent(eventType, fields); err != nil { + return trace.Wrap(err) + } + + // DELETE IN: 2.6.0 + // if this event is associated with a session -> forward it to the session log as well + // this means that this is a legacy client, so audit logger is going to use compatibility mode logger + sessionID := fields.GetString(SessionEventID) + + if sessionID != "" { + sl, err := l.LoggerFor(fields.GetString(EventNamespace), session.ID(sessionID), true) + if err == nil { + if err := sl.LogEvent(fields); err != nil { + l.Warningf("Failed to log event: %v.", err) + } + // Session ended? Get rid of the session logger then: + if eventType == SessionEndEvent { + l.removeLogger(sessionID) + if err := sl.Finalize(); err != nil { + log.Error(err) + } + } + } else { + l.Errorf("failed to get logger: %v", trace.DebugReport(err)) + } + } + return nil +} + +func (l *AuditLog) removeLogger(sessionID string) { + l.Debugf("Removing session logger for SID=%v.", sessionID) + l.Lock() + defer l.Unlock() + l.loggers.Remove(sessionID) +} + +// emitAuditEvent adds a new event to the log. Part of auth.IAuditLog interface. +func (l *AuditLog) emitAuditEvent(eventType string, fields EventFields) error { // see if the log needs to be rotated if err := l.rotateLog(); err != nil { log.Error(err) @@ -310,77 +533,215 @@ func (l *AuditLog) EmitAuditEvent(eventType string, fields EventFields) error { fields[EventTime] = l.Clock.Now().In(time.UTC).Round(time.Second) // line is the text to be logged - line := eventToLine(fields) - - // if this event is associated with a session -> forward it to the session log as well - sessionID := fields.GetString(SessionEventID) - if sessionID != "" { - sl, err := l.LoggerFor(fields.GetString(EventNamespace), session.ID(sessionID)) - if err == nil { - sl.LogEvent(fields) - - // Session ended? Get rid of the session logger then: - if eventType == SessionEndEvent { - l.Debugf("removing session logger for SID=%v", sessionID) - l.Lock() - l.loggers.Remove(sessionID) - l.Unlock() - if err := sl.Finalize(); err != nil { - log.Error(err) - } - } - } else { - l.Errorf("failed to get logger: %v", trace.DebugReport(err)) - } + line, err := json.Marshal(fields) + if err != nil { + return trace.Wrap(err) } // log it to the main log file: if l.file != nil { - fmt.Fprintln(l.file, line) + fmt.Fprintln(l.file, string(line)) } return nil } +// matchingFiles returns files matching the time restrictions of the query +// across multiple auth servers, returns a list of file names +func (l *AuditLog) matchingFiles(fromUTC, toUTC time.Time) ([]eventFile, error) { + authServers, err := l.getAuthServers() + if err != nil { + return nil, trace.Wrap(err) + } + + var filtered []eventFile + for _, serverID := range authServers { + // scan the log directory: + df, err := os.Open(filepath.Join(l.DataDir, serverID)) + if err != nil { + return nil, trace.Wrap(err) + } + defer df.Close() + entries, err := df.Readdir(-1) + if err != nil { + return nil, trace.Wrap(err) + } + for i := range entries { + fi := entries[i] + if fi.IsDir() || filepath.Ext(fi.Name()) != LogfileExt { + continue + } + fd := fi.ModTime().UTC() + if fd.After(fromUTC) && fd.Before(toUTC) { + eventFile := eventFile{ + FileInfo: fi, + path: filepath.Join(l.DataDir, serverID, fi.Name()), + } + filtered = append(filtered, eventFile) + } + } + } + // sort all accepted files by date + sort.Sort(byDate(filtered)) + return filtered, nil +} + +func (l *AuditLog) moveAuditLogFile(fileName string) error { + sourceFile := filepath.Join(l.DataDir, fileName) + targetFile := filepath.Join(l.DataDir, l.ServerID, fileName) + l.Infof("Migrating log file from %v to %v", sourceFile, targetFile) + if err := os.Rename(sourceFile, targetFile); err != nil { + return trace.ConvertSystemError(err) + } + return nil +} + +func (l *AuditLog) migrateSessionsDir() error { + sessionDir := filepath.Join(l.DataDir, SessionLogsDir) + targetDir := filepath.Join(l.DataDir, l.ServerID, SessionLogsDir) + _, err := utils.StatDir(targetDir) + if err == nil { + return nil + } + if !trace.IsNotFound(err) { + return trace.Wrap(err) + } + // transform the recorded files to the new index format + recordingsDir := filepath.Join(l.DataDir, SessionLogsDir, defaults.Namespace) + fileInfos, err := listDir(recordingsDir) + if err != nil { + // source directory does not exist, means nothing to migrate + if !trace.IsNotFound(err) { + return trace.Wrap(err) + } + return nil + } + for _, fi := range fileInfos { + if fi.IsDir() { + l.Debugf("Migrating, skipping directory %v", fi.Name()) + continue + } + + // only trigger migrations on .log + // to avoid double migrations attempts or migrating recordings + // that were already migrated + if !strings.HasSuffix(fi.Name(), "session.log") { + continue + } + + parts := strings.Split(fi.Name(), ".") + if len(parts) < 2 { + l.Debugf("Migrating, skipping unknown file: %v", fi.Name()) + } + sessionID := parts[0] + sourceEventsFile := filepath.Join(recordingsDir, fmt.Sprintf("%v.session.log", sessionID)) + targetEventsFile := filepath.Join(recordingsDir, fmt.Sprintf("%v-0.events", sessionID)) + l.Debugf("Migrating, session ID %v. Renamed %v to %v", sessionID, sourceEventsFile, targetEventsFile) + err := os.Rename(sourceEventsFile, targetEventsFile) + if err != nil { + return trace.Wrap(err) + } + sourceChunksFile := filepath.Join(recordingsDir, fmt.Sprintf("%v.session.bytes", sessionID)) + targetChunksFile := filepath.Join(recordingsDir, fmt.Sprintf("%v-0.chunks", sessionID)) + l.Debugf("Migrating session ID %v. Renamed %v to %v", sessionID, sourceChunksFile, targetChunksFile) + err = os.Rename(sourceChunksFile, targetChunksFile) + if err != nil { + return trace.Wrap(err) + } + indexFileName := filepath.Join(recordingsDir, fmt.Sprintf("%v.index", sessionID)) + + eventsData, err := json.Marshal(indexEntry{ + FileName: filepath.Base(targetEventsFile), + Type: fileTypeEvents, + Index: 0, + }) + if err != nil { + return trace.Wrap(err) + } + + chunksData, err := json.Marshal(indexEntry{ + FileName: filepath.Base(targetChunksFile), + Type: fileTypeChunks, + Offset: 0, + }) + if err != nil { + return trace.Wrap(err) + } + + err = ioutil.WriteFile(indexFileName, + []byte( + fmt.Sprintf("%v\n%v\n", + string(eventsData), + string(chunksData), + )), + 0640, + ) + if err != nil { + return trace.Wrap(err) + } + l.Debugf("Migrating session ID %v. Wrote index file %v.", indexFileName) + } + l.Infof("Moving sessions folder from %v to %v", sessionDir, targetDir) + if err := os.Rename(sessionDir, targetDir); err != nil { + return trace.ConvertSystemError(err) + } + return nil +} + +func listDir(dir string) ([]os.FileInfo, error) { + df, err := os.Open(dir) + if err != nil { + return nil, trace.ConvertSystemError(err) + } + defer df.Close() + entries, err := df.Readdir(-1) + if err != nil { + return nil, trace.ConvertSystemError(err) + } + return entries, nil +} + +// DELETE IN: 2.6.0 +// runMigrations runs migrations for audit logs format +func (l *AuditLog) runMigrations() error { + // migrate sessions directory + if err := l.migrateSessionsDir(); err != nil { + return trace.Wrap(err) + } + // move audit log files to the auth server id + fileInfos, err := listDir(l.DataDir) + if err != nil { + return trace.Wrap(err) + } + for _, fi := range fileInfos { + if !fi.IsDir() { + if err := l.moveAuditLogFile(fi.Name()); err != nil { + return trace.Wrap(err) + } + } + } + + return nil +} + // SearchEvents finds events. Results show up sorted by date (newest first) func (l *AuditLog) SearchEvents(fromUTC, toUTC time.Time, query string) ([]EventFields, error) { l.Debugf("SearchEvents(%v, %v, query=%v)", fromUTC, toUTC, query) - queryVals, err := url.ParseQuery(query) - if err != nil { - return nil, trace.BadParameter("missing parameter query", query) - } // how many days of logs to search? days := int(toUTC.Sub(fromUTC).Hours() / 24) if days < 0 { return nil, trace.BadParameter("query", query) } - - // scan the log directory: - df, err := os.Open(l.DataDir) + queryVals, err := url.ParseQuery(query) + if err != nil { + return nil, trace.BadParameter("missing parameter query", query) + } + filtered, err := l.matchingFiles(fromUTC, toUTC) if err != nil { return nil, trace.Wrap(err) } - defer df.Close() - entries, err := df.Readdir(-1) - if err != nil { - return nil, trace.Wrap(err) - } - filtered := make([]os.FileInfo, 0, days) - for i := range entries { - fi := entries[i] - if fi.IsDir() || filepath.Ext(fi.Name()) != LogfileExt { - continue - } - fd := fi.ModTime().UTC() - if fd.After(fromUTC) && fd.Before(toUTC) { - filtered = append(filtered, fi) - } - } - // sort all accepted files by date - sort.Sort(byDate(filtered)) - // search within each file: events := make([]EventFields, 0) for i := range filtered { - found, err := l.findInFile(filepath.Join(l.DataDir, filtered[i].Name()), queryVals) + found, err := l.findInFile(filtered[i].path, queryVals) if err != nil { return nil, trace.Wrap(err) } @@ -391,7 +752,7 @@ func (l *AuditLog) SearchEvents(fromUTC, toUTC time.Time, query string) ([]Event // SearchSessionEvents searches for session related events. Used to find completed sessions. func (l *AuditLog) SearchSessionEvents(fromUTC, toUTC time.Time) ([]EventFields, error) { - l.Infof("SearchSessionEvents(%v, %v)", fromUTC, toUTC) + l.Debugf("SearchSessionEvents(%v, %v)", fromUTC, toUTC) // only search for specific event types query := url.Values{} @@ -403,8 +764,13 @@ func (l *AuditLog) SearchSessionEvents(fromUTC, toUTC time.Time) ([]EventFields, return l.SearchEvents(fromUTC, toUTC, query.Encode()) } +type eventFile struct { + os.FileInfo + path string +} + // byDate implements sort.Interface. -type byDate []os.FileInfo +type byDate []eventFile func (f byDate) Len() int { return len(f) } func (f byDate) Less(i, j int) bool { return f[i].ModTime().Before(f[j].ModTime()) } @@ -415,10 +781,13 @@ func (f byDate) Swap(i, j int) { f[i], f[j] = f[j], f[i] } // // You can pass multiple types like "event=session.start&event=session.end" func (l *AuditLog) findInFile(fn string, query url.Values) ([]EventFields, error) { - l.Infof("findInFile(%s, %v)", fn, query) + l.Debugf("Called findInFile(%s, %v).", fn, query) retval := make([]EventFields, 0) - eventFilter := query[EventType] + eventFilter, ok := query[EventType] + if !ok && len(query) > 0 { + return nil, nil + } doFilter := len(eventFilter) > 0 // open the log file: @@ -471,7 +840,7 @@ func (l *AuditLog) rotateLog() (err error) { openLogFile := func() error { l.Lock() defer l.Unlock() - logfname := filepath.Join(l.DataDir, + logfname := filepath.Join(l.DataDir, l.ServerID, fileTime.Format("2006-01-02.15:04:05")+LogfileExt) l.file, err = os.OpenFile(logfname, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640) if err != nil { @@ -515,29 +884,9 @@ func (l *AuditLog) Close() error { return nil } -// sessionStreamFn helper determines the name of the stream file for a given -// session by its ID -func (l *AuditLog) sessionStreamFn(namespace string, sid session.ID) string { - return filepath.Join( - l.DataDir, - SessionLogsDir, - namespace, - fmt.Sprintf("%s%s", sid, SessionStreamPrefix)) -} - -// sessionLogFn helper determines the name of the stream file for a given -// session by its ID -func (l *AuditLog) sessionLogFn(namespace string, sid session.ID) string { - return filepath.Join( - l.DataDir, - SessionLogsDir, - namespace, - fmt.Sprintf("%s%s", sid, SessionLogPrefix)) -} - // LoggerFor creates a logger for a specified session. Session loggers allow // to group all events into special "session log files" for easier audit -func (l *AuditLog) LoggerFor(namespace string, sid session.ID) (SessionLogger, error) { +func (l *AuditLog) LoggerFor(namespace string, sid session.ID, compatibilityMode bool) (SessionLogger, error) { l.Lock() defer l.Unlock() @@ -556,7 +905,7 @@ func (l *AuditLog) LoggerFor(namespace string, sid session.ID) (SessionLogger, e return sessionLogger, nil } // make sure session logs dir is present - sdir := filepath.Join(l.DataDir, SessionLogsDir, namespace) + sdir := filepath.Join(l.DataDir, l.ServerID, SessionLogsDir, namespace) if err := os.Mkdir(sdir, *l.DirMask); err != nil { if !os.IsExist(err) { return nil, trace.Wrap(err) @@ -567,19 +916,38 @@ func (l *AuditLog) LoggerFor(namespace string, sid session.ID) (SessionLogger, e return nil, trace.ConvertSystemError(err) } } - sessionLogger, err := NewDiskSessionLogger(DiskSessionLoggerConfig{ - SessionID: sid, - EventsFileName: l.sessionLogFn(namespace, sid), - StreamFileName: l.sessionStreamFn(namespace, sid), - Clock: l.Clock, - RecordSessions: l.RecordSessions, - }) - if err != nil { - return nil, trace.Wrap(err) + // DELETE IN: 2.6.0 + // Compatibility mode is required for migration from 2.5.0 to 2.6.0 + if compatibilityMode { + l.Debugf("Using compatibility mode logger for session id %v.", sid) + sessionLogger, err := NewCompatSessionLogger(CompatSessionLoggerConfig{ + SessionID: sid, + DataDir: sdir, + Clock: l.Clock, + RecordSessions: l.RecordSessions, + }) + if err != nil { + return nil, trace.Wrap(err) + } + l.loggers.Set(string(sid), sessionLogger, l.SessionIdlePeriod) + auditOpenFiles.Inc() + return sessionLogger, nil + } else { + sessionLogger, err := NewDiskSessionLogger(DiskSessionLoggerConfig{ + SessionID: sid, + DataDir: sdir, + Clock: l.Clock, + RecordSessions: l.RecordSessions, + AuditLog: l, + }) + if err != nil { + return nil, trace.Wrap(err) + } + l.loggers.Set(string(sid), sessionLogger, l.SessionIdlePeriod) + auditOpenFiles.Inc() + return sessionLogger, nil } - l.loggers.Set(string(sid), sessionLogger, l.SessionIdlePeriod) - auditOpenFiles.Inc() - return sessionLogger, nil + } func (l *AuditLog) asyncCloseSessionLogger(key string, val interface{}) { @@ -587,14 +955,14 @@ func (l *AuditLog) asyncCloseSessionLogger(key string, val interface{}) { } func (l *AuditLog) closeSessionLogger(key string, val interface{}) { - l.Debugf("closing session logger %v", key) + l.Debugf("Closing session logger %v.", key) logger, ok := val.(SessionLogger) if !ok { - l.Warningf("warning, not valid value type %T for %v", val, key) + l.Warningf("Warning, not valid value type %T for %v.", val, key) return } if err := logger.Finalize(); err != nil { - log.Warningf("failed to finalize: %v", trace.DebugReport(err)) + log.Warningf("Failed to finalize: %v.", trace.DebugReport(err)) } } @@ -616,17 +984,6 @@ func (l *AuditLog) closeInactiveLoggers() { expired := l.loggers.RemoveExpired(10) if expired != 0 { - l.Debugf("closed %v inactive session loggers", expired) + l.Debugf("Closed %v inactive session loggers.", expired) } } - -// eventToLine helper creates a loggable line/string for a given event -func eventToLine(fields EventFields) string { - jbytes, err := json.Marshal(fields) - jsonString := string(jbytes) - if err != nil { - log.Error(err) - jsonString = "" - } - return jsonString -} diff --git a/lib/events/auditlog_test.go b/lib/events/auditlog_test.go index 8e185a85640..31c8a50ffcf 100644 --- a/lib/events/auditlog_test.go +++ b/lib/events/auditlog_test.go @@ -2,9 +2,11 @@ package events import ( "bytes" + "encoding/json" "fmt" "io/ioutil" "os" + "path/filepath" "testing" "time" @@ -36,6 +38,7 @@ func (a *AuditTestSuite) makeLog(c *check.C, dataDir string, recordSessions bool alog, err := NewAuditLog(AuditLogConfig{ DataDir: dataDir, RecordSessions: recordSessions, + ServerID: "server1", }) if err != nil { return nil, trace.Wrap(err) @@ -59,7 +62,8 @@ func (a *AuditTestSuite) TestNew(c *check.C) { c.Assert(alog.Close(), check.IsNil) } -func (a *AuditTestSuite) TestComplexLogging(c *check.C) { +// DELETE IN: 2.6.0 +func (a *AuditTestSuite) TestCompatComplexLogging(c *check.C) { now := time.Now().In(time.UTC).Round(time.Second) // create audit log, write a couple of events into it, close it @@ -138,7 +142,443 @@ func (a *AuditTestSuite) TestComplexLogging(c *check.C) { c.Assert(found[0].GetString(EventLogin), check.Equals, "vincent") } -func (a *AuditTestSuite) TestSessionRecordingOff(c *check.C) { +// TestSessionsOnOneAuthServer tests scenario when there are two auth servers +// but session is recorded only on the one +func (a *AuditTestSuite) TestSessionsOnOneAuthServer(c *check.C) { + fakeClock := clockwork.NewFakeClock() + + alog, err := NewAuditLog(AuditLogConfig{ + Clock: fakeClock, + DataDir: a.dataDir, + RecordSessions: true, + ServerID: "server1", + }) + c.Assert(err, check.IsNil) + + alog2, err := NewAuditLog(AuditLogConfig{ + Clock: fakeClock, + DataDir: a.dataDir, + RecordSessions: true, + ServerID: "server2", + }) + c.Assert(err, check.IsNil) + + sessionID := "100" + // start the session and emit data stream to it + firstMessage := []byte("hello") + err = alog.PostSessionSlice(SessionSlice{ + Namespace: defaults.Namespace, + SessionID: sessionID, + Chunks: []*SessionChunk{ + // start the seession + &SessionChunk{ + Time: fakeClock.Now().UTC().UnixNano(), + EventIndex: 0, + EventType: SessionStartEvent, + Data: marshal(EventFields{EventLogin: "bob"}), + }, + // type "hello" into session "100" + &SessionChunk{ + Time: fakeClock.Now().UTC().UnixNano(), + EventIndex: 1, + ChunkIndex: 0, + Offset: 0, + EventType: SessionPrintEvent, + Data: firstMessage, + }, + // emitting session end event should close the session + &SessionChunk{ + Time: fakeClock.Now().UTC().UnixNano(), + EventIndex: 4, + EventType: SessionEndEvent, + Data: marshal(EventFields{EventLogin: "bob"}), + }, + }, + Version: V2, + }) + c.Assert(err, check.IsNil) + + // does not matter which audit server is accessed the results should be the same + for _, a := range []*AuditLog{alog, alog2} { + // read the session bytes + history, err := a.GetSessionEvents(defaults.Namespace, session.ID(sessionID), 0) + c.Assert(err, check.IsNil) + c.Assert(history, check.HasLen, 3) + + // make sure offsets were properly set (0 for the first event and 5 bytes for hello): + c.Assert(history[1][SessionByteOffset], check.Equals, float64(0)) + c.Assert(history[1][SessionEventTimestamp], check.Equals, float64(0)) + + // fetch all bytes + buff, err := a.GetSessionChunk(defaults.Namespace, session.ID(sessionID), 0, 5000) + c.Assert(err, check.IsNil) + c.Assert(string(buff), check.Equals, string(firstMessage)) + + // with offset + buff, err = a.GetSessionChunk(defaults.Namespace, session.ID(sessionID), 2, 5000) + c.Assert(err, check.IsNil) + c.Assert(string(buff), check.Equals, string(firstMessage[2:])) + } +} + +// TestSessionsTwoAuthServers tests two auth servers behind the load balancer handling the event stream +// for the same session +func (a *AuditTestSuite) TestSessionsTwoAuthServers(c *check.C) { + fakeClock := clockwork.NewFakeClock() + + alog, err := NewAuditLog(AuditLogConfig{ + Clock: fakeClock, + DataDir: a.dataDir, + RecordSessions: true, + ServerID: "server1", + }) + c.Assert(err, check.IsNil) + + alog2, err := NewAuditLog(AuditLogConfig{ + Clock: fakeClock, + DataDir: a.dataDir, + RecordSessions: true, + ServerID: "server2", + }) + c.Assert(err, check.IsNil) + + sessionID := "100" + // start the session and emit data stream to it + firstMessage := []byte("hello") + err = alog.PostSessionSlice(SessionSlice{ + Namespace: defaults.Namespace, + SessionID: sessionID, + Chunks: []*SessionChunk{ + // start the seession + &SessionChunk{ + Time: fakeClock.Now().UTC().UnixNano(), + EventIndex: 0, + EventType: SessionStartEvent, + Data: marshal(EventFields{EventLogin: "bob"}), + }, + // type "hello" into session "100" + &SessionChunk{ + Time: fakeClock.Now().UTC().UnixNano(), + EventIndex: 1, + ChunkIndex: 0, + Offset: 0, + EventType: SessionPrintEvent, + Data: firstMessage, + }, + }, + Version: V2, + }) + c.Assert(err, check.IsNil) + c.Assert(alog.loggers.Len(), check.Equals, 1) + + // now fake sleep past expiration + firstDelay := defaults.SessionIdlePeriod * 2 + fakeClock.Advance(firstDelay) + + // logger for idle session should be closed + alog.closeInactiveLoggers() + c.Assert(alog.loggers.Len(), check.Equals, 0) + + // send another event to the session via second auth server + secondMessage := []byte("good day") + err = alog2.PostSessionSlice(SessionSlice{ + Namespace: defaults.Namespace, + SessionID: sessionID, + Chunks: []*SessionChunk{ + // notice how offsets are sent by the client + &SessionChunk{ + Time: fakeClock.Now().UTC().UnixNano(), + Delay: int64(firstDelay / time.Millisecond), + EventIndex: 2, + ChunkIndex: 1, + Offset: int64(len(firstMessage)), + EventType: SessionPrintEvent, + Data: secondMessage, + }, + }, + Version: V2, + }) + c.Assert(err, check.IsNil) + c.Assert(alog2.loggers.Len(), check.Equals, 1) + + // emit next event 17 milliseconds later to the first auth server + thirdMessage := []byte("test") + secondDelay := 17 * time.Millisecond + fakeClock.Advance(secondDelay) + err = alog.PostSessionSlice(SessionSlice{ + Namespace: defaults.Namespace, + SessionID: sessionID, + Chunks: []*SessionChunk{ + // notice how offsets are sent by the client + &SessionChunk{ + Time: fakeClock.Now().UTC().UnixNano(), + EventIndex: 3, + ChunkIndex: 2, + Delay: int64((firstDelay + secondDelay) / time.Millisecond), + Offset: int64(len(firstMessage) + len(secondMessage)), + EventType: SessionPrintEvent, + Data: thirdMessage, + }, + // emitting session end event should close the session + &SessionChunk{ + Time: fakeClock.Now().UTC().UnixNano(), + EventIndex: 4, + EventType: SessionEndEvent, + Data: marshal(EventFields{EventLogin: "bob"}), + }, + }, + Version: V2, + }) + c.Assert(err, check.IsNil) + + // emitting session end event should close the session + c.Assert(alog.loggers.Len(), check.Equals, 0) + + // does not matter which audit server is accessed the results should be the same + for _, a := range []*AuditLog{alog, alog2} { + // read the session bytes + history, err := a.GetSessionEvents(defaults.Namespace, session.ID(sessionID), 1) + c.Assert(err, check.IsNil) + c.Assert(history, check.HasLen, 4) + + // make sure offsets were properly set (0 for the first event and 5 bytes for hello): + c.Assert(history[0][SessionByteOffset], check.Equals, float64(0)) + c.Assert(history[1][SessionByteOffset], check.Equals, float64(len(firstMessage))) + c.Assert(history[2][SessionByteOffset], check.Equals, float64(len(firstMessage)+len(secondMessage))) + + // make sure delays are right + c.Assert(history[0][SessionEventTimestamp], check.Equals, float64(0)) + c.Assert(history[1][SessionEventTimestamp], check.Equals, float64(firstDelay/time.Millisecond)) + c.Assert(history[2][SessionEventTimestamp], check.Equals, float64((firstDelay+secondDelay)/time.Millisecond)) + + // fetch all bytes + buff, err := a.GetSessionChunk(defaults.Namespace, session.ID(sessionID), 0, 5000) + c.Assert(err, check.IsNil) + c.Assert(string(buff), check.Equals, "hellogood daytest") + + // with offset + buff, err = a.GetSessionChunk(defaults.Namespace, session.ID(sessionID), 2, 5000) + c.Assert(err, check.IsNil) + c.Assert(string(buff), check.Equals, "llogood daytest") + + // with another offset at the boundary of the first message + buff, err = a.GetSessionChunk(defaults.Namespace, session.ID(sessionID), len(firstMessage), 5000) + c.Assert(err, check.IsNil) + c.Assert(string(buff), check.Equals, "good daytest") + + // with another offset after the boundary of the first message + buff, err = a.GetSessionChunk(defaults.Namespace, session.ID(sessionID), len(firstMessage)+1, 5000) + c.Assert(err, check.IsNil) + c.Assert(string(buff), check.Equals, "ood daytest") + + // with another offset after the boundary of the third message + buff, err = a.GetSessionChunk(defaults.Namespace, session.ID(sessionID), len(firstMessage)+len(secondMessage), 5000) + c.Assert(err, check.IsNil) + c.Assert(string(buff), check.Equals, "test") + + // with another offset outside the boundaries + buff, err = a.GetSessionChunk(defaults.Namespace, session.ID(sessionID), len(firstMessage)+len(secondMessage)+len(thirdMessage), 5000) + c.Assert(err, check.IsNil) + c.Assert(string(buff), check.Equals, "") + } +} + +// TestSearchTwoAuthServers tests search on two auth servers behind the load balancer handling the event stream +// for the same session +func (a *AuditTestSuite) TestSearchTwoAuthServers(c *check.C) { + startTime := time.Now().UTC() + + alog, err := NewAuditLog(AuditLogConfig{ + DataDir: a.dataDir, + RecordSessions: true, + ServerID: "server1", + }) + c.Assert(err, check.IsNil) + + alog2, err := NewAuditLog(AuditLogConfig{ + DataDir: a.dataDir, + RecordSessions: true, + ServerID: "server2", + }) + c.Assert(err, check.IsNil) + + sessionID := "100" + // start the session and emit data stream to it + firstMessage := []byte("hello") + err = alog.PostSessionSlice(SessionSlice{ + Namespace: defaults.Namespace, + SessionID: sessionID, + Chunks: []*SessionChunk{ + // start the seession + &SessionChunk{ + Time: time.Now().UTC().UnixNano(), + EventIndex: 0, + EventType: SessionStartEvent, + Data: marshal(EventFields{EventLogin: "bob"}), + }, + // type "hello" into session "100" + &SessionChunk{ + Time: time.Now().UTC().UnixNano(), + EventIndex: 1, + ChunkIndex: 0, + Offset: 0, + EventType: SessionPrintEvent, + Data: firstMessage, + }, + }, + Version: V2, + }) + c.Assert(err, check.IsNil) + c.Assert(alog.loggers.Len(), check.Equals, 1) + + // send another event to the session via second auth server + secondMessage := []byte("good day") + err = alog2.PostSessionSlice(SessionSlice{ + Namespace: defaults.Namespace, + SessionID: sessionID, + Chunks: []*SessionChunk{ + // notice how offsets are sent by the client + &SessionChunk{ + Time: time.Now().UTC().UnixNano(), + EventIndex: 2, + ChunkIndex: 1, + Offset: int64(len(firstMessage)), + EventType: SessionPrintEvent, + Data: secondMessage, + }, + }, + Version: V2, + }) + c.Assert(err, check.IsNil) + c.Assert(alog2.loggers.Len(), check.Equals, 1) + + // emit next event 17 milliseconds later to the first auth server + thirdMessage := []byte("test") + err = alog.PostSessionSlice(SessionSlice{ + Namespace: defaults.Namespace, + SessionID: sessionID, + Chunks: []*SessionChunk{ + // notice how offsets are sent by the client + &SessionChunk{ + Time: time.Now().UTC().UnixNano(), + EventIndex: 3, + ChunkIndex: 2, + Offset: int64(len(firstMessage) + len(secondMessage)), + EventType: SessionPrintEvent, + Data: thirdMessage, + }, + // emitting session end event should close the session + &SessionChunk{ + EventIndex: 4, + EventType: SessionEndEvent, + Data: marshal(EventFields{EventLogin: "bob"}), + }, + }, + Version: V2, + }) + c.Assert(err, check.IsNil) + + // emitting session end event should close the session + c.Assert(alog.loggers.Len(), check.Equals, 0) + + // does not matter which audit server is accessed the results should be the same + for _, a := range []*AuditLog{alog, alog2} { + comment := check.Commentf("auth server %v", a.ServerID) + + // search events, start time is in the future + query := fmt.Sprintf("%s=%s", EventType, SessionStartEvent) + found, err := a.SearchEvents(startTime.Add(time.Hour), startTime.Add(time.Hour), query) + c.Assert(err, check.IsNil) + c.Assert(len(found), check.Equals, 0, comment) + + // try searching (wrong query) + found, err = a.SearchEvents(startTime, startTime.Add(time.Hour), "foo=bar") + c.Assert(err, check.IsNil) + c.Assert(len(found), check.Equals, 0, comment) + + // try searching (good query: for "session start") + found, err = a.SearchEvents(startTime.Add(-time.Hour), startTime.Add(time.Hour), query) + c.Assert(err, check.IsNil) + c.Assert(len(found), check.Equals, 1, comment) + c.Assert(found[0].GetString(EventLogin), check.Equals, "bob", comment) + + // try searching (empty query means "anything") + found, err = alog.SearchEvents(startTime.Add(-time.Hour), startTime.Add(time.Hour), "") + c.Assert(err, check.IsNil) + c.Assert(len(found), check.Equals, 2) // total number of events logged in this test + c.Assert(found[0].GetString(EventType), check.Equals, SessionStartEvent, comment) + c.Assert(found[1].GetString(EventType), check.Equals, SessionEndEvent, comment) + } +} + +type file struct { + path string + contents []byte +} + +var v1Files = []file{ + { + path: "2017-12-14.00:00:00.log", + contents: []byte(`{"event":"user.login","method":"oidc","time":"2017-12-13T17:38:34Z","user":"alice@example.com"} +{"addr.local":"172.10.1.20:3022","addr.remote":"172.10.1.254:52406","event":"session.start","login":"root","namespace":"default","server_id":"3e79a2d7-c9e3-4d3f-96ce-1e1346b4900c","sid":"74a5fc73-e02c-11e7-aee2-0242ac0a0101","size":"80:25","time":"2017-12-13T17:38:40Z","user":"alice@example.com"} +{"event":"session.leave","namespace":"default","server_id":"020130c8-b41f-4da5-a061-74c2b0e2b40b","sid":"75aef036-e02c-11e7-aee2-0242ac0a0101","time":"2017-12-13T17:38:42Z","user":"alice@example.com"} +`), + }, + { + path: "sessions/default/74a5fc73-e02c-11e7-aee2-0242ac0a0101.session.log", + contents: []byte(`{"addr.local":"172.10.1.20:3022","addr.remote":"172.10.1.254:52406","event":"session.start","login":"root","namespace":"default","server_id":"3e79a2d7-c9e3-4d3f-96ce-1e1346b4900c","sid":"74a5fc73-e02c-11e7-aee2-0242ac0a0101","size":"80:25","time":"2017-12-13T17:38:40Z","user":"alice@example.com"} +{"event":"session.leave","namespace":"default","server_id":"3e79a2d7-c9e3-4d3f-96ce-1e1346b4900c","sid":"74a5fc73-e02c-11e7-aee2-0242ac0a0101","time":"2017-12-13T17:38:41Z","user":"alice@example.com"} +{"time":"2017-12-13T17:38:40.038Z","event":"print","bytes":31,"ms":0,"offset":0} +`), + }, + { + path: "sessions/default/74a5fc73-e02c-11e7-aee2-0242ac0a0101.session.bytes", + contents: []byte(`"this string is exactly 31 bytes"`), + }, +} + +// DELETE IN: 2.6.0 +// TestMigrationsToV2 tests migrations to V2 loging format +func (a *AuditTestSuite) TestMigrationsToV2(c *check.C) { + for _, file := range v1Files { + fileName := filepath.Join(a.dataDir, file.path) + err := os.MkdirAll(filepath.Dir(fileName), 0755) + c.Assert(err, check.IsNil) + err = ioutil.WriteFile(fileName, file.contents, 06440) + c.Assert(err, check.IsNil) + } + + // create audit log with session recording disabled + alog, err := a.makeLog(c, a.dataDir, false) + c.Assert(err, check.IsNil) + + // sessions have been migrated + sid := "74a5fc73-e02c-11e7-aee2-0242ac0a0101" + events, err := alog.GetSessionEvents(defaults.Namespace, session.ID(sid), 0) + c.Assert(err, check.IsNil) + c.Assert(events, check.HasLen, 3) + + // global events were migrated + events, err = alog.SearchEvents(time.Time{}, time.Now().Add(time.Hour), "") + c.Assert(err, check.IsNil) + c.Assert(events, check.HasLen, 3) + + // second time migration is idempotent + alog, err = a.makeLog(c, a.dataDir, false) + c.Assert(err, check.IsNil) + + events, err = alog.GetSessionEvents(defaults.Namespace, session.ID(sid), 0) + c.Assert(err, check.IsNil) + c.Assert(events, check.HasLen, 3) + + // global events were migrated + events, err = alog.SearchEvents(time.Time{}, time.Now().Add(time.Hour), "") + c.Assert(err, check.IsNil) + c.Assert(events, check.HasLen, 3) +} + +// DELETE IN: 2.6.0 +func (a *AuditTestSuite) TestCompatSessionRecordingOff(c *check.C) { now := time.Now().In(time.UTC).Round(time.Second) // create audit log with session recording disabled @@ -175,6 +615,69 @@ func (a *AuditTestSuite) TestSessionRecordingOff(c *check.C) { c.Assert(err, check.NotNil) } +func (a *AuditTestSuite) TestSessionRecordingOff(c *check.C) { + now := time.Now().In(time.UTC).Round(time.Second) + + // create audit log with session recording disabled + alog, err := a.makeLog(c, a.dataDir, false) + c.Assert(err, check.IsNil) + alog.Clock = clockwork.NewFakeClockAt(now) + + username := "alice" + sessionID := "200" + + // start the session and emit data stream to it + firstMessage := []byte("hello") + err = alog.PostSessionSlice(SessionSlice{ + Namespace: defaults.Namespace, + SessionID: sessionID, + Chunks: []*SessionChunk{ + // start the session + &SessionChunk{ + Time: alog.Clock.Now().UTC().UnixNano(), + EventIndex: 0, + EventType: SessionStartEvent, + Data: marshal(EventFields{EventLogin: username}), + }, + // type "hello" into session "100" + &SessionChunk{ + Time: alog.Clock.Now().UTC().UnixNano(), + EventIndex: 1, + ChunkIndex: 0, + Offset: 0, + EventType: SessionPrintEvent, + Data: firstMessage, + }, + // end the session + &SessionChunk{ + Time: alog.Clock.Now().UTC().UnixNano(), + EventIndex: 0, + EventType: SessionEndEvent, + Data: marshal(EventFields{EventLogin: username}), + }, + }, + Version: V2, + }) + c.Assert(err, check.IsNil) + c.Assert(alog.loggers.Len(), check.Equals, 0) + + // get all events from the audit log, should have two events + found, err := alog.SearchEvents(now.Add(-time.Hour), now.Add(time.Hour), "") + c.Assert(err, check.IsNil) + c.Assert(found, check.HasLen, 2) + c.Assert(found[0].GetString(EventLogin), check.Equals, username) + c.Assert(found[1].GetString(EventLogin), check.Equals, username) + + // inspect the session log for "200", should have two events + history, err := alog.GetSessionEvents(defaults.Namespace, session.ID(sessionID), 0) + c.Assert(err, check.IsNil) + c.Assert(history, check.HasLen, 2) + + // try getting the session stream, should get an error + _, err = alog.GetSessionChunk(defaults.Namespace, session.ID(sessionID), 0, 5000) + c.Assert(err, check.NotNil) +} + func (a *AuditTestSuite) TestBasicLogging(c *check.C) { now := time.Now().In(time.UTC).Round(time.Second) // create audit log, write a couple of events into it, close it @@ -195,23 +698,26 @@ func (a *AuditTestSuite) TestBasicLogging(c *check.C) { fmt.Sprintf("{\"apples?\":\"yes\",\"event\":\"user.joined\",\"time\":\"%s\"}\n", now.Format(time.RFC3339))) } -// TestAutoClose tests scenario with auto closing of inactive sessions -func (a *AuditTestSuite) TestAutoClose(c *check.C) { +// DELETE IN: 2.6.0 +// TestCompatAutoClose tests scenario with auto closing of inactive sessions for compatibility logger +func (a *AuditTestSuite) TestCompatAutoClose(c *check.C) { // create audit log, write a couple of events into it, close it fakeClock := clockwork.NewFakeClock() alog, err := NewAuditLog(AuditLogConfig{ DataDir: a.dataDir, RecordSessions: true, Clock: fakeClock, + ServerID: "autoclose1", }) c.Assert(err, check.IsNil) + sessionID := "100" + // start the session and emit data stream to it - err = alog.EmitAuditEvent(SessionStartEvent, EventFields{SessionEventID: "100", EventLogin: "bob", EventNamespace: defaults.Namespace}) + err = alog.EmitAuditEvent(SessionStartEvent, EventFields{SessionEventID: sessionID, EventLogin: "bob", EventNamespace: defaults.Namespace}) c.Assert(err, check.IsNil) c.Assert(alog.loggers.Len(), check.Equals, 1) - sessionID := "100" // type "hello" into session "100": firstMessage := "hello" err = alog.PostSessionChunk(defaults.Namespace, session.ID(sessionID), bytes.NewBufferString(firstMessage)) @@ -256,16 +762,141 @@ func (a *AuditTestSuite) TestAutoClose(c *check.C) { c.Assert(history[0][SessionEventTimestamp], check.Equals, float64(0)) c.Assert(history[1][SessionEventTimestamp], check.Equals, float64(firstDelay/time.Millisecond)) c.Assert(history[2][SessionEventTimestamp], check.Equals, float64((firstDelay+secondDelay)/time.Millisecond)) + } -// TestCloseOutstanding makes sure the logger closed outstanding sessions -func (a *AuditTestSuite) TestCloseOutstanding(c *check.C) { +// TestAutoClose tests scenario with auto closing of inactive sessions +func (a *AuditTestSuite) TestAutoClose(c *check.C) { // create audit log, write a couple of events into it, close it fakeClock := clockwork.NewFakeClock() alog, err := NewAuditLog(AuditLogConfig{ DataDir: a.dataDir, RecordSessions: true, Clock: fakeClock, + ServerID: "autoclose1", + }) + c.Assert(err, check.IsNil) + + sessionID := "100" + // start the session and emit data stream to it + firstMessage := []byte("hello") + err = alog.PostSessionSlice(SessionSlice{ + Namespace: defaults.Namespace, + SessionID: sessionID, + Chunks: []*SessionChunk{ + // start the seession + &SessionChunk{ + Time: fakeClock.Now().UTC().UnixNano(), + EventIndex: 0, + EventType: SessionStartEvent, + Data: marshal(EventFields{EventLogin: "bob"}), + }, + // type "hello" into session "100" + &SessionChunk{ + Time: fakeClock.Now().UTC().UnixNano(), + EventIndex: 1, + ChunkIndex: 0, + Offset: 0, + EventType: SessionPrintEvent, + Data: firstMessage, + }, + }, + Version: V2, + }) + c.Assert(err, check.IsNil) + c.Assert(alog.loggers.Len(), check.Equals, 1) + + // now fake sleep past expiration + firstDelay := defaults.SessionIdlePeriod * 2 + fakeClock.Advance(firstDelay) + + // logger for idle session should be closed + alog.closeInactiveLoggers() + c.Assert(alog.loggers.Len(), check.Equals, 0) + + // send another event to the session + secondMessage := []byte("good day") + err = alog.PostSessionSlice(SessionSlice{ + Namespace: defaults.Namespace, + SessionID: sessionID, + Chunks: []*SessionChunk{ + // notice how offsets are sent by the client + &SessionChunk{ + Time: fakeClock.Now().UTC().UnixNano(), + Delay: int64(firstDelay / time.Millisecond), + EventIndex: 2, + ChunkIndex: 1, + Offset: int64(len(firstMessage)), + EventType: SessionPrintEvent, + Data: secondMessage, + }, + }, + Version: V2, + }) + c.Assert(err, check.IsNil) + c.Assert(alog.loggers.Len(), check.Equals, 1) + + // emit next event 17 milliseconds later + thirdMessage := []byte("test") + secondDelay := 17 * time.Millisecond + fakeClock.Advance(secondDelay) + err = alog.PostSessionSlice(SessionSlice{ + Namespace: defaults.Namespace, + SessionID: sessionID, + Chunks: []*SessionChunk{ + // notice how offsets are sent by the client + &SessionChunk{ + Time: fakeClock.Now().UTC().UnixNano(), + EventIndex: 3, + ChunkIndex: 2, + Delay: int64((firstDelay + secondDelay) / time.Millisecond), + Offset: int64(len(firstMessage) + len(secondMessage)), + EventType: SessionPrintEvent, + Data: thirdMessage, + }, + // emitting session end event should close the session + &SessionChunk{ + Time: fakeClock.Now().UTC().UnixNano(), + EventIndex: 4, + EventType: SessionEndEvent, + Data: marshal(EventFields{EventLogin: "bob"}), + }, + }, + Version: V2, + }) + c.Assert(err, check.IsNil) + + // emitting session end event should close the session + c.Assert(alog.loggers.Len(), check.Equals, 0) + + // read the session bytes + history, err := alog.GetSessionEvents(defaults.Namespace, session.ID(sessionID), 1) + c.Assert(err, check.IsNil) + c.Assert(history, check.HasLen, 4) + + // make sure offsets were properly set (0 for the first event and 5 bytes for hello): + c.Assert(history[0][SessionByteOffset], check.Equals, float64(0)) + c.Assert(history[1][SessionByteOffset], check.Equals, float64(len(firstMessage))) + c.Assert(history[2][SessionByteOffset], check.Equals, float64(len(firstMessage)+len(secondMessage))) + + // make sure delays are right + c.Assert(history[0][SessionEventTimestamp], check.Equals, float64(0)) + c.Assert(history[1][SessionEventTimestamp], check.Equals, float64(firstDelay/time.Millisecond)) + c.Assert(history[2][SessionEventTimestamp], check.Equals, float64((firstDelay+secondDelay)/time.Millisecond)) + +} + +// DELETE IN: 2.6.0 +// TestCompatCloseOutstanding makes sure the logger working in compatibility mode +// closed outstanding sessions +func (a *AuditTestSuite) TestCompatCloseOutstanding(c *check.C) { + // create audit log, write a couple of events into it, close it + fakeClock := clockwork.NewFakeClock() + alog, err := NewAuditLog(AuditLogConfig{ + DataDir: a.dataDir, + RecordSessions: true, + Clock: fakeClock, + ServerID: "outstanding", }) c.Assert(err, check.IsNil) @@ -277,3 +908,42 @@ func (a *AuditTestSuite) TestCloseOutstanding(c *check.C) { alog.Close() c.Assert(alog.loggers.Len(), check.Equals, 0) } + +// TestCloseOutstanding makes sure the logger closed outstanding sessions +func (a *AuditTestSuite) TestCloseOutstanding(c *check.C) { + // create audit log, write a couple of events into it, close it + fakeClock := clockwork.NewFakeClock() + alog, err := NewAuditLog(AuditLogConfig{ + DataDir: a.dataDir, + RecordSessions: true, + Clock: fakeClock, + ServerID: "outstanding", + }) + c.Assert(err, check.IsNil) + // start the session and emit data stream to it + err = alog.PostSessionSlice(SessionSlice{ + Namespace: defaults.Namespace, + SessionID: "100", + Chunks: []*SessionChunk{ + &SessionChunk{ + EventIndex: 0, + EventType: SessionStartEvent, + Data: marshal(EventFields{EventLogin: "bob"}), + }, + }, + Version: V2, + }) + c.Assert(err, check.IsNil) + c.Assert(alog.loggers.Len(), check.Equals, 1) + + alog.Close() + c.Assert(alog.loggers.Len(), check.Equals, 0) +} + +func marshal(f EventFields) []byte { + data, err := json.Marshal(f) + if err != nil { + panic(err) + } + return data +} diff --git a/lib/events/compatsessionlog.go b/lib/events/compatsessionlog.go new file mode 100644 index 00000000000..a882bca55f0 --- /dev/null +++ b/lib/events/compatsessionlog.go @@ -0,0 +1,315 @@ +/* +Copyright 2017 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "bytes" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "github.com/gravitational/teleport" + "github.com/gravitational/teleport/lib/session" + "github.com/gravitational/trace" + "github.com/jonboulle/clockwork" + log "github.com/sirupsen/logrus" +) + +// DELETE IN: 2.6.0 +// CompatSessionLogger is used only during upgrades from 2.4.0 to 2.5.0 +// Should be deleted in 2.6.0 releases +// CompatSessionLoggerConfig sets up parameters for disk session logger +// associated with the session ID +type CompatSessionLoggerConfig struct { + // SessionID is the session id of the logger + SessionID session.ID + // DataDir is data directory for session events files + DataDir string + // Clock is the clock replacement + Clock clockwork.Clock + // RecordSessions controls if sessions are recorded along with audit events. + RecordSessions bool +} + +// DELETE IN: 2.6.0 +// CompatSessionLogger is used only during upgrades from 2.4.0 to 2.5.0 +// Should be deleted in 2.6.0 releases +// NewCompatSessionLogger creates new disk based session logger +func NewCompatSessionLogger(cfg CompatSessionLoggerConfig) (*CompatSessionLogger, error) { + var err error + + lastPrintEvent, err := readLastPrintEvent(eventsFileName(cfg.DataDir, cfg.SessionID, 0)) + if err != nil { + if !trace.IsNotFound(err) { + return nil, trace.Wrap(err) + } + // no last event is ok + lastPrintEvent = nil + } + + indexFile, err := os.OpenFile(filepath.Join(cfg.DataDir, fmt.Sprintf("%v.index", cfg.SessionID.String())), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640) + if err != nil { + return nil, trace.Wrap(err) + } + + sessionLogger := &CompatSessionLogger{ + Entry: log.WithFields(log.Fields{ + trace.Component: teleport.ComponentAuditLog, + trace.ComponentFields: log.Fields{ + "sid": cfg.SessionID, + }, + }), + indexFile: indexFile, + CompatSessionLoggerConfig: cfg, + lastPrintEvent: lastPrintEvent, + } + return sessionLogger, nil +} + +// CompatSessionLogger implements a disk based session logger. The imporant +// property of the disk based logger is that it never fails and can be used as +// a fallback implementation behind more sophisticated loggers. +type CompatSessionLogger struct { + CompatSessionLoggerConfig + + *log.Entry + + sync.Mutex + + indexFile *os.File + eventsFile *os.File + chunksFile *os.File + + // lastPrintEvent is the last written session event + lastPrintEvent *printEvent +} + +// LogEvent logs an event associated with this session +func (sl *CompatSessionLogger) LogEvent(fields EventFields) error { + if err := sl.openEventsFile(); err != nil { + return trace.Wrap(err) + } + + if _, ok := fields[EventTime]; !ok { + fields[EventTime] = sl.Clock.Now().In(time.UTC).Round(time.Millisecond) + } + + data, err := json.Marshal(fields) + if err != nil { + return trace.Wrap(err) + } + + _, err = fmt.Fprintln(sl.eventsFile, string(data)) + if err != nil { + return trace.Wrap(err) + } + + return nil +} + +// readLastEvent reads last event from the file, it opens +// the file in read only mode and closes it after +func readLastPrintEvent(fileName string) (*printEvent, error) { + f, err := os.Open(fileName) + if err != nil { + return nil, trace.ConvertSystemError(err) + } + defer f.Close() + info, err := f.Stat() + if err != nil { + return nil, trace.ConvertSystemError(err) + } + if info.Size() == 0 { + return nil, trace.NotFound("no events found") + } + bufSize := int64(512) + if info.Size() < bufSize { + bufSize = info.Size() + } + buf := make([]byte, bufSize) + _, err = f.ReadAt(buf, info.Size()-bufSize) + if err != nil { + return nil, trace.ConvertSystemError(err) + } + lines := bytes.Split(buf, []byte("\n")) + if len(lines) == 0 { + return nil, trace.BadParameter("expected some lines, got %q", string(buf)) + } + for i := len(lines) - 1; i > 0; i-- { + line := bytes.TrimSpace(lines[i]) + if len(line) == 0 { + continue + } + var event printEvent + if err = json.Unmarshal(line, &event); err != nil { + return nil, trace.Wrap(err) + } + if event.Type != SessionPrintEvent { + continue + } + return &event, nil + } + return nil, trace.NotFound("no session print events found") +} + +// Close is called when clients close on the requested "session writer". +// We ignore their requests because this writer (file) should be closed only +// when the session logger is closed +func (sl *CompatSessionLogger) Close() error { + sl.Debugf("Close") + return nil +} + +// Finalize is called by the session when it's closing. This is where we're +// releasing audit resources associated with the session +func (sl *CompatSessionLogger) Finalize() error { + sl.Lock() + defer sl.Unlock() + + auditOpenFiles.Dec() + + if sl.indexFile != nil { + sl.indexFile.Close() + } + + if sl.chunksFile != nil { + sl.chunksFile.Close() + } + + if sl.eventsFile != nil { + sl.eventsFile.Close() + } + + return nil +} + +// WriteChunk takes a stream of bytes (usually the output from a session terminal) +// and writes it into a "stream file", for future replay of interactive sessions. +func (sl *CompatSessionLogger) WriteChunk(chunk *SessionChunk) (written int, err error) { + sl.Lock() + defer sl.Unlock() + + // when session recording is turned off, don't record the session byte stream + if sl.RecordSessions == false { + return len(chunk.Data), nil + } + + if err := sl.openChunksFile(); err != nil { + return -1, trace.Wrap(err) + } + + if written, err = sl.chunksFile.Write(chunk.Data); err != nil { + return written, trace.Wrap(err) + } + + err = sl.writePrintEvent(time.Unix(0, chunk.Time), len(chunk.Data)) + return written, trace.Wrap(err) +} + +// writePrintEvent logs print event indicating write to the session +func (sl *CompatSessionLogger) writePrintEvent(start time.Time, bytesWritten int) error { + if err := sl.openEventsFile(); err != nil { + return trace.Wrap(err) + } + + start = start.In(time.UTC).Round(time.Millisecond) + offset := int64(0) + delayMilliseconds := int64(0) + if sl.lastPrintEvent != nil { + offset = sl.lastPrintEvent.Offset + sl.lastPrintEvent.Bytes + delayMilliseconds = diff(sl.lastPrintEvent.Start, start) + sl.lastPrintEvent.DelayMilliseconds + } + event := printEvent{ + Start: start, + Type: SessionPrintEvent, + Bytes: int64(bytesWritten), + DelayMilliseconds: delayMilliseconds, + Offset: offset, + } + bytes, err := json.Marshal(event) + if err != nil { + return trace.Wrap(err) + } + _, err = fmt.Fprintln(sl.eventsFile, string(bytes)) + if err != nil { + return trace.Wrap(err) + } + sl.lastPrintEvent = &event + return trace.Wrap(err) +} + +func (sl *CompatSessionLogger) openEventsFile() error { + if sl.eventsFile != nil { + return nil + } + eventsFileName := eventsFileName(sl.DataDir, sl.SessionID, 0) + + // udpate the index file to write down that new events file has been created + data, err := json.Marshal(indexEntry{ + FileName: filepath.Base(eventsFileName), + Type: fileTypeEvents, + Index: 0, + }) + if err != nil { + return trace.Wrap(err) + } + + _, err = fmt.Fprintf(sl.indexFile, "%v\n", string(data)) + if err != nil { + return trace.Wrap(err) + } + + // open new events file for writing + sl.eventsFile, err = os.OpenFile(eventsFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640) + if err != nil { + return trace.Wrap(err) + } + return nil +} + +func (sl *CompatSessionLogger) openChunksFile() error { + if sl.chunksFile != nil { + return nil + } + // chunksFileName consists of session id and the first global offset recorded + chunksFileName := chunksFileName(sl.DataDir, sl.SessionID, 0) + + // udpate the index file to write down that new chunks file has been created + data, err := json.Marshal(indexEntry{ + FileName: filepath.Base(chunksFileName), + Type: fileTypeChunks, + Offset: 0, + }) + if err != nil { + return trace.Wrap(err) + } + + _, err = fmt.Fprintf(sl.indexFile, "%v\n", string(data)) + if err != nil { + return trace.Wrap(err) + } + + // open new chunks file for writing + sl.chunksFile, err = os.OpenFile(chunksFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640) + if err != nil { + return trace.Wrap(err) + } + return nil +} diff --git a/lib/events/doc.go b/lib/events/doc.go index bd24a1953cf..c92535bd51b 100644 --- a/lib/events/doc.go +++ b/lib/events/doc.go @@ -15,19 +15,77 @@ limitations under the License. */ /* -Package events currently implements the audit log using a simple filesystem backend. -"Implements" means it implements events.IAuditLog interface (see events/api.go) +Package events implements the audit log interface events.IAuditLog +using filesystem backend. + +Audit logs +---------- + +Audit logs are events associated with user logins, server access +and session log events like session.start. + +Example audit log event: + +{"addr.local":"172.10.1.20:3022", + "addr.remote":"172.10.1.254:58866", + "event":"session.start", + "login":"root", + "user":"klizhentas@gmail.com" +} + +Session Logs +------------ + +Session logs are a series of events and recorded SSH interactive session playback. + +Example session log event: + +{ + "time":"2018-01-04T02:12:40.245Z", + "event":"print", + "bytes":936, + "ms":40962, + "offset":16842, + "ei":31, + "ci":29 +} + +Print event fields +------------------ + +Print event specifies session output - PTY io recorded by Teleport node or Proxy +based on the configuration. + +* "offset" is an offset in bytes from a start of a session +* "ms" is a delay in milliseconds from the last event occured +* "ci" is a chunk index ordering only print events +* "ei" is an event index ordering events from the first one + +As in example of print event above, "ei" - is a session event index - 31, +while "ci" is a chunk index - meaning that this event is 29th in a row of print events. + +Client streaming session logs +------------------------------ + +Session related logs are delivered in order defined by clients. +Every event is ordered and has a session-local index, every next event has index incremented. + +Client deliveres session events in batches, where every event in the batch +is guaranteed to be in continuous order (e.g. no cases with events +delivered in a single batch to have missing event or chunk index). + +Disk File format +---------------- + +On disk file format is designed to be compatible with NFS filesystems and provides +guarantee that only one auth server writes to the file at a time. + +Main Audit Log Format +===================== The main log files are saved as: - /var/lib/teleport/log/.log -Each session has its own session log stored as two files - /var/lib/teleport/log/.session.log - /var/lib/teleport/log/.session.bytes - -Where: - - .session.log (same events as in the main log, but related to the session) - - .session.bytes (recorded session bytes: PTY IO) + /var/lib/teleport/log//.log The log file is rotated every 24 hours. The old files must be cleaned up or archived by an external tool. @@ -45,5 +103,120 @@ Common JSON fields Examples: 2016-04-25 22:37:29 +0000 UTC,session.start,{"addr.local":"127.0.0.1:3022","addr.remote":"127.0.0.1:35732","login":"root","sid":"4a9d97de-0b36-11e6-a0b3-d8cb8ae5080e","user":"vincent"} 2016-04-25 22:54:31 +0000 UTC,exec,{"addr.local":"127.0.0.1:3022","addr.remote":"127.0.0.1:35949","command":"-bash -c ls /","login":"root","user":"vincent"} + +Session log file format +======================= + +Each session has its own session log stored as several files: + +Index file contains a list of event files and chunks files associated with a session: + + /var/lib/teleport/log/sessions//.index + +The format of the index file contains of two or more lines with pointers to other files: + +{"file_name":"-.events","type":"events","index":} +{"file_name":"-.chunks","type":"chunks","offset":} + +Files: + + /var/lib/teleport/log//-.events + /var/lib/teleport/log//-.chunks + +Where: + - .events (same events as in the main log, but related to the session) + - .chunks (recorded session bytes: PTY IO) + +Examples +~~~~~~~~ + +**Single auth server** + +In the simplest case, single auth server a1 log for a single session id s1 +will consist of three files: + +/var/lib/teleport/a1/s1.index + +With contents: + +{"file_name":"s1-0.events","type":"events","index":0} +{"file_name":"s1-0.chunks","type":"chunks","offset":0} + +This means that all session events are located in s1-0.events file starting from +the first event with index 0 and all chunks are located in file s1-0.chunks file +with the byte offset from the start - 0. + +File with session events /var/lib/teleport/a1/s1-0.events will contain: + +{"ei":0,"event":"session.start", ...} +{"ei":1,"event":"resize",...} +{"ei":2,"ci":0, "event":"print","bytes":40,"offset":0} +{"ei":3,"event":"session.end", ...} + +File with recorded session /var/lib/teleport/a1/s1-0.chunks will contain 40 bytes +emitted by print event with chunk index 0 + +**Multiple Auth Servers** + +In high availability mode scenario, multiple auth servers will be + deployed behind a load balancer. + +Any auth server can go down during session and clients will retry the delivery +to the other auth server. + +Both auth servers have mounted /var/lib/teleport/log as a shared NFS folder. + +To make sure that only one auth server writes to a file at a time, +each auth server writes to it's own file in a sub folder named +with host UUID of the server. + +Client sends the chunks of events related to the session s1 in order, +but load balancer sends first batch of event to the first server a1, +and the second batch of event to the second server a2. + +Server a1 will produce the following file: + +/var/lib/teleport/a1/s1.index + +With contents: + +{"file_name":"s1-0.events","type":"events","index":0} +{"file_name":"s1-0.chunks","type":"chunks","offset":0} + +Events file /var/lib/teleport/a1/s1-0.events will contain: + +{"ei":0,"event":"session.start", ...} +{"ei":1,"event":"resize",...} +{"ei":2,"ci":0, "event":"print","bytes":40,"offset":0} + +Events file /var/lib/teleport/a1/s1-0.chunks will contain 40 bytes +emitted by print event with chunk index. + +Server a2 will produce the following file: + +/var/lib/teleport/a2/s1.index + +With contents: + +{"file_name":"s1-3.events","type":"events","index":3} +{"file_name":"s1-40.chunks","type":"chunks","offset":40} + +Events file /var/lib/teleport/a2/s1-4.events will contain: + +{"ei":3,"ci":1, "event":"print","bytes":15,"ms":713,"offset":40} +{"ei":4,"event":"session.end", ...} + +Events file /var/lib/teleport/a2/s1-40.chunks will contain 15 bytes emitted +by print event with chunk index 1 and comes after delay of 713 milliseconds. + +Offset 40 indicates that the first chunk stored in the file s1-40.chunks +comes at an offset of 40 bytes from the start of the session. + +Log Search and Playback +----------------------- + +Log search and playback is aware of multiple auth servers, merges +indexes, event streams stored on multiple auth servers. + */ package events diff --git a/lib/events/sessionlog.go b/lib/events/sessionlog.go index 4116c1f33a3..e2b0598b7aa 100644 --- a/lib/events/sessionlog.go +++ b/lib/events/sessionlog.go @@ -17,10 +17,10 @@ limitations under the License. package events import ( - "bytes" "encoding/json" "fmt" "os" + "path/filepath" "sync" "time" @@ -34,7 +34,7 @@ import ( // SessionLogger is an interface that all session loggers must implement. type SessionLogger interface { // LogEvent logs events associated with this session. - LogEvent(fields EventFields) + LogEvent(fields EventFields) error // Close is called when clients close on the requested "session writer". // We ignore their requests because this writer (file) should be closed only @@ -56,59 +56,36 @@ type SessionLogger interface { type DiskSessionLoggerConfig struct { // SessionID is the session id of the logger SessionID session.ID - // EventsFileName is the events file name - EventsFileName string - // StreamFileName is the byte stream file name - StreamFileName string + // DataDir is data directory for session events files + DataDir string // Clock is the clock replacement Clock clockwork.Clock // RecordSessions controls if sessions are recorded along with audit events. RecordSessions bool + // AuditLog is the audit log + AuditLog *AuditLog } // NewDiskSessionLogger creates new disk based session logger func NewDiskSessionLogger(cfg DiskSessionLoggerConfig) (*DiskSessionLogger, error) { var err error - lastPrintEvent, err := readLastPrintEvent(cfg.EventsFileName) - if err != nil { - if !trace.IsNotFound(err) { - return nil, trace.Wrap(err) - } - // no last event is ok - lastPrintEvent = nil - } - - // if session recording is on, create a stream file that stores all the - // bytes of the session - var fstream *os.File - if cfg.RecordSessions { - fstream, err = os.OpenFile(cfg.StreamFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640) - if err != nil { - return nil, trace.Wrap(err) - } - } - - // create a new session file that stores all the audit events that occured - // related to the session - fevents, err := os.OpenFile(cfg.EventsFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640) + indexFile, err := os.OpenFile(filepath.Join(cfg.DataDir, fmt.Sprintf("%v.index", cfg.SessionID.String())), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640) if err != nil { return nil, trace.Wrap(err) } sessionLogger := &DiskSessionLogger{ + DiskSessionLoggerConfig: cfg, Entry: log.WithFields(log.Fields{ trace.Component: teleport.ComponentAuditLog, trace.ComponentFields: log.Fields{ "sid": cfg.SessionID, }, }), - sid: cfg.SessionID, - streamFile: fstream, - eventsFile: fevents, - clock: cfg.Clock, - lastPrintEvent: lastPrintEvent, - recordSessions: cfg.RecordSessions, + indexFile: indexFile, + lastEventIndex: -1, + lastChunkIndex: -1, } return sessionLogger, nil } @@ -117,86 +94,28 @@ func NewDiskSessionLogger(cfg DiskSessionLoggerConfig) (*DiskSessionLogger, erro // property of the disk based logger is that it never fails and can be used as // a fallback implementation behind more sophisticated loggers. type DiskSessionLogger struct { + DiskSessionLoggerConfig + *log.Entry sync.Mutex sid session.ID - // eventsFile stores logged events, just like the main logger, except - // these are all associated with this session + indexFile *os.File eventsFile *os.File + chunksFile *os.File - // streamFile stores bytes from the session terminal I/O for replaying - streamFile *os.File - - // clock provides real of fake clock (for tests) - clock clockwork.Clock - - // lastPrintEvent is the last written session event - lastPrintEvent *printEvent + lastEventIndex int64 + lastChunkIndex int64 // recordSessions controls if sessions are recorded along with audit events. recordSessions bool } // LogEvent logs an event associated with this session -func (sl *DiskSessionLogger) LogEvent(fields EventFields) { - if _, ok := fields[EventTime]; !ok { - fields[EventTime] = sl.clock.Now().In(time.UTC).Round(time.Millisecond) - } - - if sl.eventsFile != nil { - _, err := fmt.Fprintln(sl.eventsFile, eventToLine(fields)) - if err != nil { - log.Error(trace.DebugReport(err)) - } - } -} - -// readLastEvent reads last event from the file, it opens -// the file in read only mode and closes it after -func readLastPrintEvent(fileName string) (*printEvent, error) { - f, err := os.Open(fileName) - if err != nil { - return nil, trace.ConvertSystemError(err) - } - defer f.Close() - info, err := f.Stat() - if err != nil { - return nil, trace.ConvertSystemError(err) - } - if info.Size() == 0 { - return nil, trace.NotFound("no events found") - } - bufSize := int64(512) - if info.Size() < bufSize { - bufSize = info.Size() - } - buf := make([]byte, bufSize) - _, err = f.ReadAt(buf, info.Size()-bufSize) - if err != nil { - return nil, trace.ConvertSystemError(err) - } - lines := bytes.Split(buf, []byte("\n")) - if len(lines) == 0 { - return nil, trace.BadParameter("expected some lines, got %q", string(buf)) - } - for i := len(lines) - 1; i > 0; i-- { - line := bytes.TrimSpace(lines[i]) - if len(line) == 0 { - continue - } - var event printEvent - if err = json.Unmarshal(line, &event); err != nil { - return nil, trace.Wrap(err) - } - if event.Type != SessionPrintEvent { - continue - } - return &event, nil - } - return nil, trace.NotFound("no session print events found") +func (sl *DiskSessionLogger) LogEvent(fields EventFields) error { + panic("does not work") } // Close is called when clients close on the requested "session writer". @@ -213,17 +132,98 @@ func (sl *DiskSessionLogger) Finalize() error { sl.Lock() defer sl.Unlock() + return sl.finalize() +} + +func (sl *DiskSessionLogger) finalize() error { auditOpenFiles.Dec() - if sl.streamFile != nil { - sl.streamFile.Close() - sl.streamFile = nil - } - if sl.eventsFile != nil { - sl.eventsFile.Close() - sl.eventsFile = nil + if sl.indexFile != nil { + sl.indexFile.Close() } + if sl.chunksFile != nil { + sl.chunksFile.Close() + } + + if sl.eventsFile != nil { + sl.eventsFile.Close() + } + + return nil +} + +// eventsFileName consists of session id and the first global event index recorded there +func eventsFileName(dataDir string, sessionID session.ID, eventIndex int64) string { + return filepath.Join(dataDir, fmt.Sprintf("%v-%v.events", sessionID.String(), eventIndex)) +} + +// chunksFileName consists of session id and the first global offset recorded +func chunksFileName(dataDir string, sessionID session.ID, offset int64) string { + return filepath.Join(dataDir, fmt.Sprintf("%v-%v.chunks", sessionID.String(), offset)) +} + +func (sl *DiskSessionLogger) openEventsFile(eventIndex int64) error { + if sl.eventsFile != nil { + err := sl.eventsFile.Close() + if err != nil { + sl.Warningf("Failed to close file: %v", trace.DebugReport(err)) + } + } + eventsFileName := eventsFileName(sl.DataDir, sl.SessionID, eventIndex) + + // udpate the index file to write down that new events file has been created + data, err := json.Marshal(indexEntry{ + FileName: filepath.Base(eventsFileName), + Type: fileTypeEvents, + Index: eventIndex, + }) + if err != nil { + return trace.Wrap(err) + } + + _, err = fmt.Fprintf(sl.indexFile, "%v\n", string(data)) + if err != nil { + return trace.Wrap(err) + } + + // open new events file for writing + sl.eventsFile, err = os.OpenFile(eventsFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640) + if err != nil { + return trace.Wrap(err) + } + return nil +} + +func (sl *DiskSessionLogger) openChunksFile(offset int64) error { + if sl.chunksFile != nil { + err := sl.chunksFile.Close() + if err != nil { + sl.Warningf("Failed to close file: %v", trace.DebugReport(err)) + } + } + chunksFileName := chunksFileName(sl.DataDir, sl.SessionID, offset) + + // udpate the index file to write down that new chunks file has been created + data, err := json.Marshal(indexEntry{ + FileName: filepath.Base(chunksFileName), + Type: fileTypeChunks, + Offset: offset, + }) + if err != nil { + return trace.Wrap(err) + } + + _, err = fmt.Fprintf(sl.indexFile, "%v\n", string(data)) + if err != nil { + return trace.Wrap(err) + } + + // open new chunks file for writing + sl.chunksFile, err = os.OpenFile(chunksFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640) + if err != nil { + return trace.Wrap(err) + } return nil } @@ -233,49 +233,73 @@ func (sl *DiskSessionLogger) WriteChunk(chunk *SessionChunk) (written int, err e sl.Lock() defer sl.Unlock() - // when session recording is turned off, don't record the session byte stream - if sl.recordSessions == false { + // this section enforces the following invariant: + // a single events file only contains successive events + if sl.lastEventIndex == -1 || chunk.EventIndex-1 != sl.lastEventIndex { + if err := sl.openEventsFile(chunk.EventIndex); err != nil { + return -1, trace.Wrap(err) + } + } + sl.lastEventIndex = chunk.EventIndex + + if chunk.EventType != SessionPrintEvent { + if chunk.EventType == SessionEndEvent { + defer sl.closeLogger() + } + var fields EventFields + err := json.Unmarshal(chunk.Data, &fields) + if err != nil { + return -1, trace.Wrap(err) + } + fields[EventIndex] = chunk.EventIndex + fields[EventTime] = sl.Clock.Now().In(time.UTC).Round(time.Millisecond) + fields[EventType] = chunk.EventType + data, err := json.Marshal(fields) + if err != nil { + return -1, trace.Wrap(err) + } + if err := sl.AuditLog.emitAuditEvent(chunk.EventType, fields); err != nil { + return -1, trace.Wrap(err) + } + return fmt.Fprintln(sl.eventsFile, string(data)) + } + if !sl.RecordSessions { return len(chunk.Data), nil } - - if sl.streamFile == nil || sl.eventsFile == nil { - return 0, trace.BadParameter("session %v: attempt to write to a closed file", sl.sid) - } - - if written, err = sl.streamFile.Write(chunk.Data); err != nil { - return written, trace.Wrap(err) - } - - err = sl.writePrintEvent(time.Unix(0, chunk.Time), len(chunk.Data)) - return written, trace.Wrap(err) -} - -// writePrintEvent logs print event indicating write to the session -func (sl *DiskSessionLogger) writePrintEvent(start time.Time, bytesWritten int) error { - start = start.In(time.UTC).Round(time.Millisecond) - offset := int64(0) - delayMilliseconds := int64(0) - if sl.lastPrintEvent != nil { - offset = sl.lastPrintEvent.Offset + sl.lastPrintEvent.Bytes - delayMilliseconds = diff(sl.lastPrintEvent.Start, start) + sl.lastPrintEvent.DelayMilliseconds + eventStart := time.Unix(0, chunk.Time).In(time.UTC).Round(time.Millisecond) + // this section enforces the following invariant: + // a single chunks file only contains successive chunks + if sl.lastChunkIndex == -1 || chunk.ChunkIndex-1 != sl.lastChunkIndex { + if err := sl.openChunksFile(chunk.Offset); err != nil { + return -1, trace.Wrap(err) + } } + sl.lastChunkIndex = chunk.ChunkIndex event := printEvent{ - Start: start, + Start: eventStart, Type: SessionPrintEvent, - Bytes: int64(bytesWritten), - DelayMilliseconds: delayMilliseconds, - Offset: offset, + Bytes: int64(len(chunk.Data)), + DelayMilliseconds: chunk.Delay, + Offset: chunk.Offset, + EventIndex: chunk.EventIndex, + ChunkIndex: chunk.ChunkIndex, } bytes, err := json.Marshal(event) if err != nil { - return trace.Wrap(err) + return -1, trace.Wrap(err) } _, err = fmt.Fprintln(sl.eventsFile, string(bytes)) if err != nil { - return trace.Wrap(err) + return -1, trace.Wrap(err) + } + return sl.chunksFile.Write(chunk.Data) +} + +func (sl *DiskSessionLogger) closeLogger() { + sl.AuditLog.removeLogger(sl.SessionID.String()) + if err := sl.finalize(); err != nil { + log.Error(err) } - sl.lastPrintEvent = &event - return trace.Wrap(err) } func diff(before, after time.Time) int64 { @@ -286,6 +310,19 @@ func diff(before, after time.Time) int64 { return d } +const ( + fileTypeChunks = "chunks" + fileTypeEvents = "events" +) + +type indexEntry struct { + FileName string `json:"file_name"` + Type string `json:"type"` + Index int64 `json:"index"` + Offset int64 `json:"offset,"` + authServer string +} + type printEvent struct { // Start is event start Start time.Time `json:"time"` @@ -297,4 +334,8 @@ type printEvent struct { DelayMilliseconds int64 `json:"ms"` // Offset int64 is the offset in bytes in the session file Offset int64 `json:"offset"` + // EventIndex is the global event index + EventIndex int64 `json:"ei"` + // ChunkIndex is the global chunk index + ChunkIndex int64 `json:"ci"` } diff --git a/lib/events/slice.pb.go b/lib/events/slice.pb.go index bc89ca6da78..df48da27afa 100644 --- a/lib/events/slice.pb.go +++ b/lib/events/slice.pb.go @@ -40,9 +40,14 @@ const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package // SessionSlice is a slice of submitted chunks type SessionSlice struct { - Namespace string `protobuf:"bytes,1,opt,name=Namespace,json=namespace,proto3" json:"Namespace,omitempty"` - SessionID string `protobuf:"bytes,2,opt,name=SessionID,json=sessionID,proto3" json:"SessionID,omitempty"` - Chunks []*SessionChunk `protobuf:"bytes,3,rep,name=Chunks,json=chunks" json:"Chunks,omitempty"` + // Namespace is a session namespace + Namespace string `protobuf:"bytes,1,opt,name=Namespace,json=namespace,proto3" json:"Namespace,omitempty"` + // SessionID is a session ID associated with this chunk + SessionID string `protobuf:"bytes,2,opt,name=SessionID,json=sessionID,proto3" json:"SessionID,omitempty"` + // Chunks is a list of submitted session chunks + Chunks []*SessionChunk `protobuf:"bytes,3,rep,name=Chunks,json=chunks" json:"Chunks,omitempty"` + // Version specifies session slice version + Version int64 `protobuf:"varint,4,opt,name=Version,json=version,proto3" json:"Version,omitempty"` } func (m *SessionSlice) Reset() { *m = SessionSlice{} } @@ -59,10 +64,20 @@ func (m *SessionSlice) GetChunks() []*SessionChunk { // SessionChunk is a chunk to be posted in the context of the session type SessionChunk struct { - // Time is the occurrence of this event + // Time is the occurence of this event Time int64 `protobuf:"varint,2,opt,name=Time,json=time,proto3" json:"Time,omitempty"` - // Data is captured data + // Data is captured data, contains event fields in case of event, session data otherwise Data []byte `protobuf:"bytes,3,opt,name=Data,json=data,proto3" json:"Data,omitempty"` + // EventType is event type + EventType string `protobuf:"bytes,4,opt,name=EventType,json=eventType,proto3" json:"EventType,omitempty"` + // EventIndex is the event global index + EventIndex int64 `protobuf:"varint,5,opt,name=EventIndex,json=eventIndex,proto3" json:"EventIndex,omitempty"` + // Index is the autoincremented chunk index + ChunkIndex int64 `protobuf:"varint,6,opt,name=ChunkIndex,json=chunkIndex,proto3" json:"ChunkIndex,omitempty"` + // Offset is an offset from the previous chunk in bytes + Offset int64 `protobuf:"varint,7,opt,name=Offset,json=offset,proto3" json:"Offset,omitempty"` + // Delay is a delay from the previous event in milliseconds + Delay int64 `protobuf:"varint,8,opt,name=Delay,json=delay,proto3" json:"Delay,omitempty"` } func (m *SessionChunk) Reset() { *m = SessionChunk{} } @@ -220,6 +235,11 @@ func (m *SessionSlice) MarshalTo(data []byte) (int, error) { i += n } } + if m.Version != 0 { + data[i] = 0x20 + i++ + i = encodeVarintSlice(data, i, uint64(m.Version)) + } return i, nil } @@ -249,6 +269,32 @@ func (m *SessionChunk) MarshalTo(data []byte) (int, error) { i = encodeVarintSlice(data, i, uint64(len(m.Data))) i += copy(data[i:], m.Data) } + if len(m.EventType) > 0 { + data[i] = 0x22 + i++ + i = encodeVarintSlice(data, i, uint64(len(m.EventType))) + i += copy(data[i:], m.EventType) + } + if m.EventIndex != 0 { + data[i] = 0x28 + i++ + i = encodeVarintSlice(data, i, uint64(m.EventIndex)) + } + if m.ChunkIndex != 0 { + data[i] = 0x30 + i++ + i = encodeVarintSlice(data, i, uint64(m.ChunkIndex)) + } + if m.Offset != 0 { + data[i] = 0x38 + i++ + i = encodeVarintSlice(data, i, uint64(m.Offset)) + } + if m.Delay != 0 { + data[i] = 0x40 + i++ + i = encodeVarintSlice(data, i, uint64(m.Delay)) + } return i, nil } @@ -296,6 +342,9 @@ func (m *SessionSlice) Size() (n int) { n += 1 + l + sovSlice(uint64(l)) } } + if m.Version != 0 { + n += 1 + sovSlice(uint64(m.Version)) + } return n } @@ -309,6 +358,22 @@ func (m *SessionChunk) Size() (n int) { if l > 0 { n += 1 + l + sovSlice(uint64(l)) } + l = len(m.EventType) + if l > 0 { + n += 1 + l + sovSlice(uint64(l)) + } + if m.EventIndex != 0 { + n += 1 + sovSlice(uint64(m.EventIndex)) + } + if m.ChunkIndex != 0 { + n += 1 + sovSlice(uint64(m.ChunkIndex)) + } + if m.Offset != 0 { + n += 1 + sovSlice(uint64(m.Offset)) + } + if m.Delay != 0 { + n += 1 + sovSlice(uint64(m.Delay)) + } return n } @@ -443,6 +508,25 @@ func (m *SessionSlice) Unmarshal(data []byte) error { return err } iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType) + } + m.Version = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSlice + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Version |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipSlice(data[iNdEx:]) @@ -543,6 +627,111 @@ func (m *SessionChunk) Unmarshal(data []byte) error { m.Data = []byte{} } iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field EventType", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSlice + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSlice + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.EventType = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EventIndex", wireType) + } + m.EventIndex = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSlice + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.EventIndex |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ChunkIndex", wireType) + } + m.ChunkIndex = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSlice + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.ChunkIndex |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Offset", wireType) + } + m.Offset = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSlice + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Offset |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Delay", wireType) + } + m.Delay = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSlice + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Delay |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipSlice(data[iNdEx:]) @@ -672,25 +861,31 @@ var ( func init() { proto.RegisterFile("slice.proto", fileDescriptorSlice) } var fileDescriptorSlice = []byte{ - // 316 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x5c, 0x90, 0x41, 0x4b, 0xc3, 0x30, - 0x18, 0x86, 0x17, 0x3b, 0x8a, 0xcb, 0x76, 0x90, 0x20, 0x52, 0xa6, 0x94, 0xb1, 0x53, 0x0f, 0x9a, - 0xc2, 0x04, 0xef, 0xea, 0x14, 0x04, 0xf1, 0xd0, 0x79, 0xf2, 0x22, 0x69, 0xf7, 0xd9, 0x05, 0xd7, - 0x24, 0x34, 0x5f, 0xd5, 0xfe, 0x13, 0x7f, 0x92, 0x47, 0x7f, 0x82, 0xcc, 0x3f, 0x22, 0x4d, 0x3b, - 0x41, 0x6f, 0xf9, 0xde, 0xe7, 0x4d, 0xde, 0x7c, 0x2f, 0x1d, 0xda, 0xb5, 0xcc, 0x80, 0x9b, 0x52, - 0xa3, 0x66, 0x3e, 0xbc, 0x80, 0x42, 0x3b, 0x7e, 0xc8, 0x25, 0xae, 0xaa, 0x94, 0x67, 0xba, 0x88, - 0xf3, 0xd2, 0x64, 0x27, 0x90, 0x69, 0x5b, 0x5b, 0x84, 0x6e, 0xcc, 0x05, 0xc2, 0xab, 0xa8, 0x63, - 0x5c, 0xc9, 0x72, 0xf9, 0x68, 0x44, 0x89, 0x75, 0x9c, 0x6b, 0x9d, 0xaf, 0x41, 0x18, 0x69, 0xbb, - 0x63, 0x2c, 0x8c, 0x8c, 0x85, 0x52, 0x1a, 0x05, 0x4a, 0xad, 0x6c, 0x9b, 0x31, 0x3e, 0xec, 0xa8, - 0x9b, 0xd2, 0xea, 0x29, 0x86, 0xc2, 0x60, 0xdd, 0xc2, 0xe9, 0x1b, 0x1d, 0x2d, 0xc0, 0x5a, 0xa9, - 0xd5, 0xa2, 0xf9, 0x16, 0x3b, 0xa2, 0x83, 0x3b, 0x51, 0x80, 0x35, 0x22, 0x83, 0x80, 0x4c, 0x48, - 0x34, 0x48, 0x06, 0x6a, 0x2b, 0x34, 0xb4, 0x73, 0xdf, 0xcc, 0x83, 0x9d, 0x96, 0xda, 0xad, 0xc0, - 0x8e, 0xa9, 0x7f, 0xb9, 0xaa, 0xd4, 0xb3, 0x0d, 0xbc, 0x89, 0x17, 0x0d, 0x67, 0xfb, 0xbc, 0xdd, - 0x8e, 0x77, 0x77, 0x1c, 0x4c, 0xfc, 0xcc, 0x79, 0xa6, 0x67, 0xbf, 0xc9, 0x4e, 0x67, 0x8c, 0xf6, - 0xef, 0x65, 0x01, 0xee, 0x59, 0x2f, 0xe9, 0xa3, 0x2c, 0xa0, 0xd1, 0xe6, 0x02, 0x45, 0xe0, 0x4d, - 0x48, 0x34, 0x4a, 0xfa, 0x4b, 0x81, 0x62, 0x96, 0xd0, 0xdd, 0xf3, 0x6a, 0x29, 0xf1, 0x56, 0xe7, - 0xec, 0x9a, 0xb2, 0x45, 0x95, 0x16, 0x12, 0xff, 0xec, 0xf0, 0x3f, 0xd7, 0xa9, 0xe3, 0x03, 0xde, - 0xf6, 0xc0, 0xb7, 0x3d, 0xf0, 0xab, 0xa6, 0x87, 0x69, 0x2f, 0x22, 0x17, 0x7b, 0x1f, 0x9b, 0x90, - 0x7c, 0x6e, 0x42, 0xf2, 0xb5, 0x09, 0xc9, 0xfb, 0x77, 0xd8, 0x4b, 0x7d, 0xe7, 0x3a, 0xfd, 0x09, - 0x00, 0x00, 0xff, 0xff, 0x21, 0xc6, 0xeb, 0xd4, 0xae, 0x01, 0x00, 0x00, + // 405 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x5c, 0x91, 0xc1, 0x8e, 0xd3, 0x30, + 0x10, 0x86, 0xd7, 0xa4, 0x4d, 0x37, 0xde, 0x3d, 0x20, 0x6b, 0xb5, 0xb2, 0x0a, 0x8a, 0xaa, 0x9e, + 0x72, 0x80, 0x44, 0x5a, 0x9e, 0x00, 0xe8, 0x22, 0xad, 0x84, 0x40, 0x4a, 0x57, 0x1c, 0xb8, 0x20, + 0x37, 0x99, 0xa6, 0x16, 0x8d, 0x6d, 0xc5, 0x93, 0x85, 0xbc, 0x06, 0x27, 0x9e, 0x87, 0x13, 0x47, + 0x1e, 0x01, 0x95, 0x17, 0x41, 0xb6, 0xd3, 0x45, 0xec, 0x2d, 0xff, 0xf7, 0x8d, 0x3d, 0x13, 0x0f, + 0x3d, 0xb3, 0x7b, 0x59, 0x41, 0x6e, 0x3a, 0x8d, 0x9a, 0xc5, 0x70, 0x07, 0x0a, 0xed, 0xfc, 0x63, + 0x23, 0x71, 0xd7, 0x6f, 0xf2, 0x4a, 0xb7, 0x45, 0xd3, 0x99, 0xea, 0x39, 0x54, 0xda, 0x0e, 0x16, + 0x61, 0x8c, 0x8d, 0x40, 0xf8, 0x22, 0x86, 0x02, 0x77, 0xb2, 0xab, 0x3f, 0x19, 0xd1, 0xe1, 0x50, + 0x34, 0x5a, 0x37, 0x7b, 0x10, 0x46, 0xda, 0xf1, 0xb3, 0x10, 0x46, 0x16, 0x42, 0x29, 0x8d, 0x02, + 0xa5, 0x56, 0x36, 0xf4, 0x98, 0x3f, 0x19, 0xad, 0x4f, 0x9b, 0x7e, 0x5b, 0x40, 0x6b, 0x70, 0x08, + 0x72, 0xf9, 0x8d, 0xd0, 0xf3, 0x35, 0x58, 0x2b, 0xb5, 0x5a, 0xbb, 0xb9, 0xd8, 0x53, 0x9a, 0xbc, + 0x13, 0x2d, 0x58, 0x23, 0x2a, 0xe0, 0x64, 0x41, 0xb2, 0xa4, 0x4c, 0xd4, 0x11, 0x38, 0x3b, 0x56, + 0xdf, 0xac, 0xf8, 0xa3, 0x60, 0xed, 0x11, 0xb0, 0x67, 0x34, 0x7e, 0xbd, 0xeb, 0xd5, 0x67, 0xcb, + 0xa3, 0x45, 0x94, 0x9d, 0x5d, 0x5d, 0xe4, 0xe1, 0xf7, 0xf2, 0xf1, 0x8c, 0x97, 0x65, 0x5c, 0xf9, + 0x1a, 0xc6, 0xe9, 0xec, 0x03, 0x74, 0x8e, 0xf3, 0xc9, 0x82, 0x64, 0x51, 0x39, 0xbb, 0x0b, 0x71, + 0xf9, 0xe3, 0xdf, 0x50, 0xfe, 0x08, 0x63, 0x74, 0x72, 0x2b, 0x5b, 0xf0, 0x1d, 0xa3, 0x72, 0x82, + 0xb2, 0x05, 0xc7, 0x56, 0x02, 0x05, 0x8f, 0x16, 0x24, 0x3b, 0x2f, 0x27, 0xb5, 0x40, 0xe1, 0xc6, + 0xbb, 0x76, 0x1d, 0x6f, 0x07, 0x03, 0xfe, 0xd2, 0xa4, 0x4c, 0xe0, 0x08, 0x58, 0x4a, 0xa9, 0xb7, + 0x37, 0xaa, 0x86, 0xaf, 0x7c, 0xea, 0xef, 0xa2, 0x70, 0x4f, 0x9c, 0xf7, 0xed, 0x82, 0x8f, 0x83, + 0xaf, 0xee, 0x09, 0xbb, 0xa4, 0xf1, 0xfb, 0xed, 0xd6, 0x02, 0xf2, 0x99, 0x77, 0xb1, 0xf6, 0x89, + 0x5d, 0xd0, 0xe9, 0x0a, 0xf6, 0x62, 0xe0, 0xa7, 0x1e, 0x4f, 0x6b, 0x17, 0xae, 0x4a, 0x7a, 0xfa, + 0xb2, 0xaf, 0x25, 0xbe, 0xd5, 0x0d, 0x7b, 0x43, 0xd9, 0xba, 0xdf, 0xb4, 0x12, 0xff, 0x7b, 0xea, + 0x87, 0xcf, 0xe3, 0xe9, 0xfc, 0x32, 0x0f, 0xfb, 0xca, 0x8f, 0xfb, 0xca, 0xaf, 0xdd, 0xbe, 0x96, + 0x27, 0x19, 0x79, 0xf5, 0xf8, 0xe7, 0x21, 0x25, 0xbf, 0x0e, 0x29, 0xf9, 0x7d, 0x48, 0xc9, 0xf7, + 0x3f, 0xe9, 0xc9, 0x26, 0xf6, 0x55, 0x2f, 0xfe, 0x06, 0x00, 0x00, 0xff, 0xff, 0xa6, 0xf1, 0x9c, + 0x76, 0x56, 0x02, 0x00, 0x00, } diff --git a/lib/events/slice.proto b/lib/events/slice.proto index 01e4d1911fc..2940f606fc0 100644 --- a/lib/events/slice.proto +++ b/lib/events/slice.proto @@ -6,17 +6,32 @@ import "google/protobuf/empty.proto"; // SessionSlice is a slice of submitted chunks message SessionSlice { + // Namespace is a session namespace string Namespace = 1; + // SessionID is a session ID associated with this chunk string SessionID = 2; + // Chunks is a list of submitted session chunks repeated SessionChunk Chunks = 3; + // Version specifies session slice version + int64 Version = 4; } // SessionChunk is a chunk to be posted in the context of the session message SessionChunk { // Time is the occurence of this event int64 Time = 2; - // Data is captured data + // Data is captured data, contains event fields in case of event, session data otherwise bytes Data = 3; + // EventType is event type + string EventType = 4; + // EventIndex is the event global index + int64 EventIndex = 5; + // Index is the autoincremented chunk index + int64 ChunkIndex = 6; + // Offset is an offset from the previous chunk in bytes + int64 Offset = 7; + // Delay is a delay from the previous event in milliseconds + int64 Delay = 8; } service AuditLog { diff --git a/lib/reversetunnel/agent.go b/lib/reversetunnel/agent.go index 39c390fa7ba..e9920c14bd6 100644 --- a/lib/reversetunnel/agent.go +++ b/lib/reversetunnel/agent.go @@ -25,7 +25,6 @@ import ( "fmt" "io" "net" - "runtime/debug" "sync" "time" @@ -204,7 +203,6 @@ func (a *Agent) getState() string { // Close signals to close all connections and operations func (a *Agent) Close() error { - a.Info(string(debug.Stack())) a.cancel() return nil } diff --git a/lib/service/service.go b/lib/service/service.go index 34ee69c9cdf..321a23f74e8 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -390,6 +390,7 @@ func (process *TeleportProcess) initAuthService(authority sshca.Authority) error auditConfig := events.AuditLogConfig{ DataDir: filepath.Join(cfg.DataDir, "log"), RecordSessions: recordSessions, + ServerID: cfg.HostUUID, } if runtime.GOOS == teleport.LinuxOS { // if the user member of adm linux group, @@ -874,6 +875,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { regular.SetCiphers(cfg.Ciphers), regular.SetKEXAlgorithms(cfg.KEXAlgorithms), regular.SetMACAlgorithms(cfg.MACAlgorithms), + regular.SetNamespace(defaults.Namespace), ) if err != nil { return trace.Wrap(err) diff --git a/lib/srv/regular/sshserver.go b/lib/srv/regular/sshserver.go index 60a44079b9a..ab2f495de69 100644 --- a/lib/srv/regular/sshserver.go +++ b/lib/srv/regular/sshserver.go @@ -294,6 +294,15 @@ func New(addr utils.NetAddr, } } + // TODO(klizhentas): replace function arguments with struct + if s.alog == nil { + return nil, trace.BadParameter("setup valid AuditLog parameter using SetAuditLog") + } + + if s.namespace == "" { + return nil, trace.BadParameter("setup valid namespace parameter using SetNamespace") + } + var component string if s.proxyMode { component = teleport.ComponentProxy diff --git a/lib/srv/regular/sshserver_test.go b/lib/srv/regular/sshserver_test.go index 80fc5402c0d..dac019ec020 100644 --- a/lib/srv/regular/sshserver_test.go +++ b/lib/srv/regular/sshserver_test.go @@ -473,6 +473,8 @@ func (s *SrvSuite) TestProxyReverseTunnel(c *C) { utils.NetAddr{}, SetProxyMode(reverseTunnelServer), SetSessionServer(s.proxyClient), + SetAuditLog(s.nodeClient), + SetNamespace(defaults.Namespace), ) c.Assert(err, IsNil) c.Assert(proxy.Start(), IsNil) @@ -552,7 +554,10 @@ func (s *SrvSuite) TestProxyReverseTunnel(c *C) { }, ), SetSessionServer(s.nodeClient), + SetAuditLog(s.nodeClient), + SetNamespace(defaults.Namespace), ) + c.Assert(err, IsNil) srv2.uuid = bobAddr c.Assert(err, IsNil) c.Assert(srv2.Start(), IsNil) @@ -638,6 +643,8 @@ func (s *SrvSuite) TestProxyRoundRobin(c *C) { utils.NetAddr{}, SetProxyMode(reverseTunnelServer), SetSessionServer(s.proxyClient), + SetAuditLog(s.nodeClient), + SetNamespace(defaults.Namespace), ) c.Assert(err, IsNil) c.Assert(proxy.Start(), IsNil) @@ -734,6 +741,8 @@ func (s *SrvSuite) TestProxyDirectAccess(c *C) { utils.NetAddr{}, SetProxyMode(reverseTunnelServer), SetSessionServer(s.proxyClient), + SetAuditLog(s.nodeClient), + SetNamespace(defaults.Namespace), ) c.Assert(err, IsNil) c.Assert(proxy.Start(), IsNil) @@ -835,6 +844,8 @@ func (s *SrvSuite) TestLimiter(c *C) { SetLimiter(limiter), SetShell("/bin/sh"), SetSessionServer(s.nodeClient), + SetAuditLog(s.nodeClient), + SetNamespace(defaults.Namespace), ) c.Assert(err, IsNil) c.Assert(srv.Start(), IsNil) diff --git a/lib/srv/sess.go b/lib/srv/sess.go index 8b9dd595876..73491d5d391 100644 --- a/lib/srv/sess.go +++ b/lib/srv/sess.go @@ -89,7 +89,7 @@ func (r *SessionRegistry) Close() { func (s *SessionRegistry) OpenSession(ch ssh.Channel, req *ssh.Request, ctx *ServerContext) error { if ctx.session != nil { // emit "joined session" event: - s.srv.EmitAuditEvent(events.SessionJoinEvent, events.EventFields{ + ctx.session.recorder.alog.EmitAuditEvent(events.SessionJoinEvent, events.EventFields{ events.SessionEventID: string(ctx.session.id), events.EventNamespace: s.srv.GetNamespace(), events.EventLogin: ctx.Identity.Login, @@ -137,7 +137,7 @@ func (s *SessionRegistry) leaveSession(party *party) error { } // emit "session leave" event (party left the session) - s.srv.EmitAuditEvent(events.SessionLeaveEvent, events.EventFields{ + sess.recorder.alog.EmitAuditEvent(events.SessionLeaveEvent, events.EventFields{ events.SessionEventID: string(sess.id), events.EventUser: party.user, events.SessionServerID: party.serverID, @@ -165,18 +165,17 @@ func (s *SessionRegistry) leaveSession(party *party) error { delete(s.sessions, sess.id) s.Unlock() - // close recorder to free up associated resources - // and flush data - if sess.recorder != nil { - sess.recorder.Close() - } - // send an event indicating that this session has ended - s.srv.EmitAuditEvent(events.SessionEndEvent, events.EventFields{ + sess.recorder.alog.EmitAuditEvent(events.SessionEndEvent, events.EventFields{ events.SessionEventID: string(sess.id), events.EventUser: party.user, events.EventNamespace: s.srv.GetNamespace(), }) + + // close recorder to free up associated resources + // and flush data + sess.recorder.Close() + if err := sess.Close(); err != nil { log.Error(err) } @@ -220,7 +219,7 @@ func (s *SessionRegistry) NotifyWinChange(params rsession.TerminalParams, ctx *S } sid := ctx.session.id // report this to the event/audit log: - s.srv.EmitAuditEvent(events.ResizeEvent, events.EventFields{ + ctx.session.recorder.alog.EmitAuditEvent(events.ResizeEvent, events.EventFields{ events.EventNamespace: s.srv.GetNamespace(), events.SessionEventID: sid, events.EventLogin: ctx.Identity.Login, @@ -582,8 +581,17 @@ func (s *session) start(ch ssh.Channel, ctx *ServerContext) error { params := s.term.GetTerminalParams() + // start recording this session + auditLog := s.registry.srv.GetAuditLog() + var err error + s.recorder, err = newSessionRecorder(auditLog, ctx.srv.GetNamespace(), s.id) + if err != nil { + return trace.Wrap(err) + } + s.writer.addWriter("session-recorder", s.recorder, true) + // emit "new session created" event: - s.registry.srv.EmitAuditEvent(events.SessionStartEvent, events.EventFields{ + s.recorder.alog.EmitAuditEvent(events.SessionStartEvent, events.EventFields{ events.EventNamespace: ctx.srv.GetNamespace(), events.SessionEventID: string(s.id), events.SessionServerID: ctx.srv.ID(), @@ -594,17 +602,6 @@ func (s *session) start(ch ssh.Channel, ctx *ServerContext) error { events.TerminalSize: params.Serialize(), }) - // start recording this session - auditLog := s.registry.srv.GetAuditLog() - if auditLog != nil { - var err error - s.recorder, err = newSessionRecorder(auditLog, ctx.srv.GetNamespace(), s.id) - if err != nil { - return trace.Wrap(err) - } - s.writer.addWriter("session-recorder", s.recorder, true) - } - // start asynchronous loop of synchronizing session state with // the session server (terminal size and activity) go s.pollAndSync() diff --git a/lib/state/log.go b/lib/state/log.go index 10058b75b4d..a3919798c47 100644 --- a/lib/state/log.go +++ b/lib/state/log.go @@ -19,6 +19,7 @@ package state import ( "context" + "encoding/json" "io" "io/ioutil" "time" @@ -205,6 +206,8 @@ type CachingAuditLog struct { throttleStart time.Time waitCtx context.Context waitCtxCancel context.CancelFunc + lastChunk *events.SessionChunk + eventIndex int64 } func (ll *CachingAuditLog) add(chunks []*events.SessionChunk) { @@ -298,6 +301,14 @@ type flushOpts struct { noRetry bool } +func diff(before, after time.Time) int64 { + d := int64(after.Sub(before) / time.Millisecond) + if d < 0 { + return 0 + } + return d +} + func (ll *CachingAuditLog) flush(opts flushOpts) { if len(ll.chunks) == 0 { return @@ -308,10 +319,24 @@ func (ll *CachingAuditLog) flush(opts flushOpts) { } } chunks := ll.reset() + for i := range chunks { + chunk := chunks[i] + chunk.EventIndex = ll.eventIndex + ll.eventIndex += 1 + if chunk.EventType == events.SessionPrintEvent { + if ll.lastChunk != nil { + chunk.Offset = ll.lastChunk.Offset + int64(len(ll.lastChunk.Data)) + chunk.Delay = diff(time.Unix(0, ll.lastChunk.Time), time.Unix(0, chunk.Time)) + ll.lastChunk.Delay + chunk.ChunkIndex = ll.lastChunk.ChunkIndex + 1 + } + ll.lastChunk = chunk + } + } slice := events.SessionSlice{ Namespace: ll.Namespace, SessionID: ll.SessionID, Chunks: chunks, + Version: events.V2, } err := ll.postSlice(slice) if err == nil { @@ -404,7 +429,18 @@ func (ll *CachingAuditLog) Close() error { } func (ll *CachingAuditLog) EmitAuditEvent(eventType string, fields events.EventFields) error { - return ll.Server.EmitAuditEvent(eventType, fields) + data, err := json.Marshal(fields) + if err != nil { + return trace.Wrap(err) + } + chunks := []*events.SessionChunk{ + { + EventType: eventType, + Data: data, + Time: time.Now().UTC().UnixNano(), + }, + } + return ll.post(chunks) } func (ll *CachingAuditLog) PostSessionChunk(namespace string, sid session.ID, reader io.Reader) error { @@ -414,14 +450,18 @@ func (ll *CachingAuditLog) PostSessionChunk(namespace string, sid session.ID, re } chunks := []*events.SessionChunk{ { - Data: data, - Time: time.Now().UTC().UnixNano(), + EventType: events.SessionPrintEvent, + Data: data, + Time: time.Now().UTC().UnixNano(), }, } return ll.post(chunks) } func (ll *CachingAuditLog) PostSessionSlice(slice events.SessionSlice) error { + for i := range slice.Chunks { + slice.Chunks[i].EventType = events.SessionPrintEvent + } return ll.post(slice.Chunks) } diff --git a/lib/state/log_test.go b/lib/state/log_test.go index 566b9a94025..38bb56a8a45 100644 --- a/lib/state/log_test.go +++ b/lib/state/log_test.go @@ -60,6 +60,7 @@ func (s *CacheLogSuite) newSlice(data string) *events.SessionSlice { Namespace: "ns", SessionID: "s1", Chunks: []*events.SessionChunk{chunk}, + Version: events.V2, } } diff --git a/lib/web/apiserver_test.go b/lib/web/apiserver_test.go index ad8410fd156..97dcb82ae99 100644 --- a/lib/web/apiserver_test.go +++ b/lib/web/apiserver_test.go @@ -216,6 +216,7 @@ func (s *WebSuite) SetUpTest(c *C) { regular.SetProxyMode(revTunServer), regular.SetSessionServer(s.proxyClient), regular.SetAuditLog(s.proxyClient), + regular.SetNamespace(defaults.Namespace), ) c.Assert(err, IsNil)