Merge pull request #1549 from gravitational/sasha/nfs2

Add support for NFS-friendly log protocol.
This commit is contained in:
Alexander Klizhentas 2018-01-05 10:39:12 -08:00 committed by GitHub
commit 19a6e5ed4b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 2205 additions and 372 deletions

View file

@ -33,5 +33,4 @@ ssh_service:
proxy_service:
enabled: yes
proxy_protocol: on

View file

@ -28,6 +28,7 @@ import (
"os/exec"
"os/user"
"path/filepath"
"regexp"
"strconv"
"strings"
"testing"
@ -155,9 +156,9 @@ func (s *IntSuite) newTeleportWithConfig(c *check.C, logins []string, instanceSe
return t
}
// TestAudit creates a live session, records a bunch of data through it (>5MB)
// TestAuditOn creates a live session, records a bunch of data through it
// and then reads it back and compares against simulated reality.
func (s *IntSuite) TestAudit(c *check.C) {
func (s *IntSuite) TestAuditOn(c *check.C) {
var tests = []struct {
inRecordLocation string
inForwardAgent bool
@ -279,16 +280,6 @@ func (s *IntSuite) TestAudit(c *check.C) {
// make sure it's us who joined! :)
c.Assert(session.Parties[0].User, check.Equals, s.me.Username)
// lets add something to the session stream:
// write 1MB chunk
bigChunk := make([]byte, 1024*1024)
err = site.PostSessionChunk(defaults.Namespace, session.ID, bytes.NewReader(bigChunk))
c.Assert(err, check.Equals, nil)
// then add small prefix:
err = site.PostSessionChunk(defaults.Namespace, session.ID, bytes.NewBufferString("\nsuffix"))
c.Assert(err, check.Equals, nil)
// lets type "echo hi" followed by "enter" and then "exit" + "enter":
myTerm.Type("\aecho hi\n\r\aexit\n\r\a")
@ -297,15 +288,17 @@ func (s *IntSuite) TestAudit(c *check.C) {
// read back the entire session (we have to try several times until we get back
// everything because the session is closing)
const expectedLen = 1048600
var sessionStream []byte
for i := 0; len(sessionStream) < expectedLen; i++ {
for i := 0; i < 5; i++ {
sessionStream, err = site.GetSessionChunk(defaults.Namespace, session.ID, 0, events.MaxChunkBytes)
c.Assert(err, check.IsNil)
if strings.Contains(string(sessionStream), "exit") {
break
}
time.Sleep(time.Millisecond * 250)
if i > 10 {
// session stream keeps coming back short
c.Fatalf("stream is too short: <%d", expectedLen)
c.Fatal("stream is not getting data")
}
}
@ -316,11 +309,10 @@ func (s *IntSuite) TestAudit(c *check.C) {
// hi
// edsger ~: exit
// logout
// <1MB of zeros here>
// suffix
//
c.Assert(strings.Contains(string(sessionStream), "echo hi"), check.Equals, true)
c.Assert(strings.Contains(string(sessionStream), "\nsuffix"), check.Equals, true)
comment := check.Commentf("%q", string(sessionStream))
c.Assert(strings.Contains(string(sessionStream), "echo hi"), check.Equals, true, comment)
c.Assert(strings.Contains(string(sessionStream), "exit"), check.Equals, true, comment)
// now lets look at session events:
history, err := site.GetSessionEvents(defaults.Namespace, session.ID, 0)
@ -364,18 +356,14 @@ func (s *IntSuite) TestAudit(c *check.C) {
}
c.Assert(start.GetString(events.SessionServerID), check.Equals, expectedServerID)
// find "\nsuffix" write and find our huge 1MB chunk
prefixFound, hugeChunkFound := false, false
// make sure data is recorded properly
out := &bytes.Buffer{}
for _, e := range history {
if getChunk(e, 10) == "\nsuffix" {
prefixFound = true
}
if e.GetInt("bytes") == 1048576 {
hugeChunkFound = true
}
out.WriteString(getChunk(e, 1000))
}
c.Assert(prefixFound, check.Equals, true)
c.Assert(hugeChunkFound, check.Equals, true)
recorded := replaceNewlines(out.String())
c.Assert(recorded, check.Matches, ".*exit.*")
c.Assert(recorded, check.Matches, ".*echo hi.*")
// there should alwys be 'session.end' event
end := findByType(events.SessionEndEvent)
@ -396,6 +384,10 @@ func (s *IntSuite) TestAudit(c *check.C) {
}
}
func replaceNewlines(in string) string {
return regexp.MustCompile(`\r?\n`).ReplaceAllString(in, `\n`)
}
// TestInteroperability checks if Teleport and OpenSSH behave in the same way
// when executing commands.
func (s *IntSuite) TestInteroperability(c *check.C) {

View file

@ -90,7 +90,10 @@ func NewTestAuthServer(cfg TestAuthServerConfig) (*TestAuthServer, error) {
}
srv.AuditLog, err = events.NewAuditLog(events.AuditLogConfig{
DataDir: cfg.Dir, RecordSessions: true})
DataDir: cfg.Dir,
RecordSessions: true,
ServerID: cfg.ClusterName,
})
if err != nil {
return nil, trace.Wrap(err)
}

View file

@ -106,10 +106,10 @@ func (s *PasswordSuite) TestTiming(c *C) {
elapsedNotExists = elapsedNotExists + elapsed
}
// elapsedDifference must be less than 20 ms
// elapsedDifference must be less than 40 ms (with some leeway for delays in runtime)
elapsedDifference := elapsedExists/10 - elapsedNotExists/10
comment := Commentf("elapsed difference (%v) greater than 30 ms", elapsedDifference)
c.Assert(elapsedDifference.Seconds() < 0.030, Equals, true, comment)
comment := Commentf("elapsed difference (%v) greater than 40 ms", elapsedDifference)
c.Assert(elapsedDifference.Seconds() < 0.040, Equals, true, comment)
}
func (s *PasswordSuite) TestChangePassword(c *C) {

View file

@ -73,7 +73,10 @@ func (s *TunSuite) SetUpTest(c *C) {
c.Assert(err, IsNil)
s.alog, err = events.NewAuditLog(events.AuditLogConfig{
DataDir: s.dir, RecordSessions: true})
DataDir: s.dir,
RecordSessions: true,
ServerID: "sid1",
})
c.Assert(err, IsNil)
s.sessionServer, err = session.New(s.bk)

View file

@ -35,6 +35,9 @@ const (
RemoteAddr = "addr.remote" // client (user's) address
EventCursor = "id" // event ID (used as cursor value for enumeration, not stored)
// EventIndex is an event index as received from the logging server
EventIndex = "ei"
// EventNamespace is a namespace of the session event
EventNamespace = "namespace"
@ -122,6 +125,14 @@ const (
MaxChunkBytes = 1024 * 1024 * 5
)
const (
// V1 is the V1 version of slice chunks API,
// it is 0 because it was not defined before
V1 = 0
// V2 is the V2 version of slice chunks API
V2 = 2
)
// IAuditLog is the primary (and the only external-facing) interface for AuditLogger.
// If you wish to implement a different kind of logger (not filesystem-based), you
// have to implement this interface

View file

@ -23,6 +23,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
"path/filepath"
@ -97,6 +98,9 @@ type AuditLogConfig struct {
// DataDir is the directory where audit log stores the data
DataDir string
// ServerID is the id of the audit log server
ServerID string
// RecordSessions controls if sessions are recorded along with audit events.
RecordSessions bool
@ -128,6 +132,9 @@ func (a *AuditLogConfig) CheckAndSetDefaults() error {
if a.DataDir == "" {
return trace.BadParameter("missing parameter DataDir")
}
if a.ServerID == "" {
return trace.BadParameter("missing parameter ServerID")
}
if a.Clock == nil {
a.Clock = clockwork.NewRealClock()
}
@ -154,22 +161,6 @@ func NewAuditLog(cfg AuditLogConfig) (*AuditLog, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
// create a directory for session logs:
sessionDir := filepath.Join(cfg.DataDir, SessionLogsDir)
if err := os.MkdirAll(sessionDir, *cfg.DirMask); err != nil {
return nil, trace.ConvertSystemError(err)
}
if cfg.UID != nil && cfg.GID != nil {
err := os.Chown(cfg.DataDir, *cfg.UID, *cfg.GID)
if err != nil {
return nil, trace.ConvertSystemError(err)
}
err = os.Chown(sessionDir, *cfg.UID, *cfg.GID)
if err != nil {
return nil, trace.ConvertSystemError(err)
}
}
al := &AuditLog{
AuditLogConfig: cfg,
@ -183,6 +174,32 @@ func NewAuditLog(cfg AuditLogConfig) (*AuditLog, error) {
return nil, trace.Wrap(err)
}
al.loggers = loggers
// create a directory for audit logs, audit log does not create
// session logs before migrations are run in case if the directory
// has to be moved
auditDir := filepath.Join(cfg.DataDir, cfg.ServerID)
if err := os.MkdirAll(auditDir, *cfg.DirMask); err != nil {
return nil, trace.ConvertSystemError(err)
}
if err := al.runMigrations(); err != nil {
return nil, trace.Wrap(err)
}
// create a directory for session logs:
sessionDir := filepath.Join(cfg.DataDir, cfg.ServerID, SessionLogsDir)
if err := os.MkdirAll(sessionDir, *cfg.DirMask); err != nil {
return nil, trace.ConvertSystemError(err)
}
if cfg.UID != nil && cfg.GID != nil {
err := os.Chown(cfg.DataDir, *cfg.UID, *cfg.GID)
if err != nil {
return nil, trace.ConvertSystemError(err)
}
err = os.Chown(sessionDir, *cfg.UID, *cfg.GID)
if err != nil {
return nil, trace.ConvertSystemError(err)
}
}
go al.periodicCloseInactiveLoggers()
return al, nil
}
@ -199,7 +216,7 @@ func (l *AuditLog) PostSessionSlice(slice SessionSlice) error {
if len(slice.Chunks) == 0 {
return trace.BadParameter("missing session chunks")
}
sl, err := l.LoggerFor(slice.Namespace, session.ID(slice.SessionID))
sl, err := l.LoggerFor(slice.Namespace, session.ID(slice.SessionID), slice.Version == V1)
if err != nil {
l.Errorf("failed to get logger: %v", trace.DebugReport(err))
return trace.BadParameter("audit.log: no session writer for %s", slice.SessionID)
@ -213,6 +230,8 @@ func (l *AuditLog) PostSessionSlice(slice SessionSlice) error {
return nil
}
// DELETE IN: 2.6.0
// This method is no longer used in 2.5.0
// PostSessionChunk writes a new chunk of session stream into the audit log
func (l *AuditLog) PostSessionChunk(namespace string, sid session.ID, reader io.Reader) error {
tmp, err := utils.ReadAll(reader, 16*1024)
@ -230,16 +249,159 @@ func (l *AuditLog) PostSessionChunk(namespace string, sid session.ID, reader io.
})
}
func (l *AuditLog) getAuthServers() ([]string, error) {
// scan the log directory:
df, err := os.Open(l.DataDir)
if err != nil {
return nil, trace.Wrap(err)
}
defer df.Close()
entries, err := df.Readdir(-1)
if err != nil {
return nil, trace.Wrap(err)
}
var authServers []string
for i := range entries {
fi := entries[i]
if fi.IsDir() {
authServers = append(authServers, filepath.Base(fi.Name()))
}
}
return authServers, nil
}
type sessionIndex struct {
dataDir string
namespace string
sid session.ID
events []indexEntry
chunks []indexEntry
}
func (idx *sessionIndex) sort() {
sort.Slice(idx.events, func(i, j int) bool {
return idx.events[i].Index < idx.events[j].Index
})
sort.Slice(idx.chunks, func(i, j int) bool {
return idx.chunks[i].Offset < idx.chunks[j].Offset
})
}
func (idx *sessionIndex) eventsFileName(index int) string {
entry := idx.events[index]
return filepath.Join(idx.dataDir, entry.authServer, SessionLogsDir, idx.namespace, entry.FileName)
}
func (idx *sessionIndex) eventsFile(afterN int) (int, error) {
for i := len(idx.events) - 1; i >= 0; i-- {
entry := idx.events[i]
if int64(afterN) >= entry.Index {
return i, nil
}
}
return -1, trace.NotFound("%v not found", afterN)
}
func (idx *sessionIndex) chunksFile(offset int64) (string, int64, error) {
for i := len(idx.chunks) - 1; i >= 0; i-- {
entry := idx.chunks[i]
if offset >= entry.Offset {
return filepath.Join(idx.dataDir, entry.authServer, SessionLogsDir, idx.namespace, entry.FileName), entry.Offset, nil
}
}
return "", 0, trace.NotFound("%v not found", offset)
}
func (l *AuditLog) readSessionIndex(namespace string, sid session.ID) (*sessionIndex, error) {
authServers, err := l.getAuthServers()
if err != nil {
return nil, trace.Wrap(err)
}
index := sessionIndex{
sid: sid,
dataDir: l.DataDir,
namespace: namespace,
}
for _, authServer := range authServers {
indexFileName := filepath.Join(l.DataDir, authServer, SessionLogsDir, namespace, fmt.Sprintf("%v.index", sid))
indexFile, err := os.OpenFile(indexFileName, os.O_RDONLY, 0640)
err = trace.ConvertSystemError(err)
if err != nil {
if !trace.IsNotFound(err) {
return nil, trace.Wrap(err)
}
continue
}
events, chunks, err := readIndexEntries(indexFile, authServer)
if err != nil {
return nil, trace.Wrap(err)
}
index.events = append(index.events, events...)
index.chunks = append(index.chunks, chunks...)
err = indexFile.Close()
if err != nil {
return nil, trace.Wrap(err)
}
}
index.sort()
return &index, nil
}
func readIndexEntries(file *os.File, authServer string) (events []indexEntry, chunks []indexEntry, err error) {
scanner := bufio.NewScanner(file)
for lineNo := 0; scanner.Scan(); lineNo++ {
var entry indexEntry
if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil {
return nil, nil, trace.Wrap(err)
}
entry.authServer = authServer
switch entry.Type {
case fileTypeEvents:
events = append(events, entry)
case fileTypeChunks:
chunks = append(chunks, entry)
default:
return nil, nil, trace.BadParameter("unsupported type: %q", entry.Type)
}
}
return
}
// GetSessionChunk returns a reader which console and web clients request
// to receive a live stream of a given session. The reader allows access to a
// session stream range from offsetBytes to offsetBytes+maxBytes
//
func (l *AuditLog) GetSessionChunk(namespace string, sid session.ID, offsetBytes, maxBytes int) ([]byte, error) {
l.Debugf("getSessionReader(%v, %v)", namespace, sid)
var data []byte
for {
out, err := l.getSessionChunk(namespace, sid, offsetBytes, maxBytes)
if err != nil {
if err == io.EOF {
return data, nil
}
return nil, trace.Wrap(err)
}
data = append(data, out...)
if len(data) == maxBytes || len(out) == 0 {
return data, nil
}
maxBytes = maxBytes - len(out)
offsetBytes = offsetBytes + len(out)
}
}
func (l *AuditLog) getSessionChunk(namespace string, sid session.ID, offsetBytes, maxBytes int) ([]byte, error) {
if namespace == "" {
return nil, trace.BadParameter("missing parameter namespace")
}
fstream, err := os.OpenFile(l.sessionStreamFn(namespace, sid), os.O_RDONLY, 0640)
idx, err := l.readSessionIndex(namespace, sid)
if err != nil {
return nil, trace.Wrap(err)
}
fileName, fileOffset, err := idx.chunksFile(int64(offsetBytes))
if err != nil {
return nil, trace.Wrap(err)
}
fstream, err := os.OpenFile(fileName, os.O_RDONLY, 0640)
if err != nil {
log.Warning(err)
return nil, trace.Wrap(err)
@ -247,12 +409,11 @@ func (l *AuditLog) GetSessionChunk(namespace string, sid session.ID, offsetBytes
defer fstream.Close()
// seek to 'offset' from the beginning
fstream.Seek(int64(offsetBytes), 0)
fstream.Seek(int64(offsetBytes)-fileOffset, 0)
// copy up to maxBytes from the offset position:
var buff bytes.Buffer
io.Copy(&buff, io.LimitReader(fstream, int64(maxBytes)))
return buff.Bytes(), nil
}
@ -267,7 +428,31 @@ func (l *AuditLog) GetSessionEvents(namespace string, sid session.ID, afterN int
if namespace == "" {
return nil, trace.BadParameter("missing parameter namespace")
}
logFile, err := os.OpenFile(l.sessionLogFn(namespace, sid), os.O_RDONLY, 0640)
idx, err := l.readSessionIndex(namespace, sid)
if err != nil {
return nil, trace.Wrap(err)
}
fileIndex, err := idx.eventsFile(afterN)
if err != nil {
return nil, trace.Wrap(err)
}
events := make([]EventFields, 0, 256)
for i := fileIndex; i < len(idx.events); i++ {
skip := 0
if i == fileIndex {
skip = afterN
}
out, err := l.fetchSessionEvents(idx.eventsFileName(i), skip)
if err != nil {
return nil, trace.Wrap(err)
}
events = append(events, out...)
}
return events, nil
}
func (l *AuditLog) fetchSessionEvents(fileName string, afterN int) ([]EventFields, error) {
logFile, err := os.OpenFile(fileName, os.O_RDONLY, 0640)
if err != nil {
// no file found? this means no events have been logged yet
if os.IsNotExist(err) {
@ -300,6 +485,44 @@ func (l *AuditLog) GetSessionEvents(namespace string, sid session.ID, afterN int
func (l *AuditLog) EmitAuditEvent(eventType string, fields EventFields) error {
l.Debugf("EmitAuditEvent(%v: %v)", eventType, fields)
if err := l.emitAuditEvent(eventType, fields); err != nil {
return trace.Wrap(err)
}
// DELETE IN: 2.6.0
// if this event is associated with a session -> forward it to the session log as well
// this means that this is a legacy client, so audit logger is going to use compatibility mode logger
sessionID := fields.GetString(SessionEventID)
if sessionID != "" {
sl, err := l.LoggerFor(fields.GetString(EventNamespace), session.ID(sessionID), true)
if err == nil {
if err := sl.LogEvent(fields); err != nil {
l.Warningf("Failed to log event: %v.", err)
}
// Session ended? Get rid of the session logger then:
if eventType == SessionEndEvent {
l.removeLogger(sessionID)
if err := sl.Finalize(); err != nil {
log.Error(err)
}
}
} else {
l.Errorf("failed to get logger: %v", trace.DebugReport(err))
}
}
return nil
}
func (l *AuditLog) removeLogger(sessionID string) {
l.Debugf("Removing session logger for SID=%v.", sessionID)
l.Lock()
defer l.Unlock()
l.loggers.Remove(sessionID)
}
// emitAuditEvent adds a new event to the log. Part of auth.IAuditLog interface.
func (l *AuditLog) emitAuditEvent(eventType string, fields EventFields) error {
// see if the log needs to be rotated
if err := l.rotateLog(); err != nil {
log.Error(err)
@ -310,77 +533,215 @@ func (l *AuditLog) EmitAuditEvent(eventType string, fields EventFields) error {
fields[EventTime] = l.Clock.Now().In(time.UTC).Round(time.Second)
// line is the text to be logged
line := eventToLine(fields)
// if this event is associated with a session -> forward it to the session log as well
sessionID := fields.GetString(SessionEventID)
if sessionID != "" {
sl, err := l.LoggerFor(fields.GetString(EventNamespace), session.ID(sessionID))
if err == nil {
sl.LogEvent(fields)
// Session ended? Get rid of the session logger then:
if eventType == SessionEndEvent {
l.Debugf("removing session logger for SID=%v", sessionID)
l.Lock()
l.loggers.Remove(sessionID)
l.Unlock()
if err := sl.Finalize(); err != nil {
log.Error(err)
}
}
} else {
l.Errorf("failed to get logger: %v", trace.DebugReport(err))
}
line, err := json.Marshal(fields)
if err != nil {
return trace.Wrap(err)
}
// log it to the main log file:
if l.file != nil {
fmt.Fprintln(l.file, line)
fmt.Fprintln(l.file, string(line))
}
return nil
}
// matchingFiles returns files matching the time restrictions of the query
// across multiple auth servers, returns a list of file names
func (l *AuditLog) matchingFiles(fromUTC, toUTC time.Time) ([]eventFile, error) {
authServers, err := l.getAuthServers()
if err != nil {
return nil, trace.Wrap(err)
}
var filtered []eventFile
for _, serverID := range authServers {
// scan the log directory:
df, err := os.Open(filepath.Join(l.DataDir, serverID))
if err != nil {
return nil, trace.Wrap(err)
}
defer df.Close()
entries, err := df.Readdir(-1)
if err != nil {
return nil, trace.Wrap(err)
}
for i := range entries {
fi := entries[i]
if fi.IsDir() || filepath.Ext(fi.Name()) != LogfileExt {
continue
}
fd := fi.ModTime().UTC()
if fd.After(fromUTC) && fd.Before(toUTC) {
eventFile := eventFile{
FileInfo: fi,
path: filepath.Join(l.DataDir, serverID, fi.Name()),
}
filtered = append(filtered, eventFile)
}
}
}
// sort all accepted files by date
sort.Sort(byDate(filtered))
return filtered, nil
}
func (l *AuditLog) moveAuditLogFile(fileName string) error {
sourceFile := filepath.Join(l.DataDir, fileName)
targetFile := filepath.Join(l.DataDir, l.ServerID, fileName)
l.Infof("Migrating log file from %v to %v", sourceFile, targetFile)
if err := os.Rename(sourceFile, targetFile); err != nil {
return trace.ConvertSystemError(err)
}
return nil
}
func (l *AuditLog) migrateSessionsDir() error {
sessionDir := filepath.Join(l.DataDir, SessionLogsDir)
targetDir := filepath.Join(l.DataDir, l.ServerID, SessionLogsDir)
_, err := utils.StatDir(targetDir)
if err == nil {
return nil
}
if !trace.IsNotFound(err) {
return trace.Wrap(err)
}
// transform the recorded files to the new index format
recordingsDir := filepath.Join(l.DataDir, SessionLogsDir, defaults.Namespace)
fileInfos, err := listDir(recordingsDir)
if err != nil {
// source directory does not exist, means nothing to migrate
if !trace.IsNotFound(err) {
return trace.Wrap(err)
}
return nil
}
for _, fi := range fileInfos {
if fi.IsDir() {
l.Debugf("Migrating, skipping directory %v", fi.Name())
continue
}
// only trigger migrations on .log
// to avoid double migrations attempts or migrating recordings
// that were already migrated
if !strings.HasSuffix(fi.Name(), "session.log") {
continue
}
parts := strings.Split(fi.Name(), ".")
if len(parts) < 2 {
l.Debugf("Migrating, skipping unknown file: %v", fi.Name())
}
sessionID := parts[0]
sourceEventsFile := filepath.Join(recordingsDir, fmt.Sprintf("%v.session.log", sessionID))
targetEventsFile := filepath.Join(recordingsDir, fmt.Sprintf("%v-0.events", sessionID))
l.Debugf("Migrating, session ID %v. Renamed %v to %v", sessionID, sourceEventsFile, targetEventsFile)
err := os.Rename(sourceEventsFile, targetEventsFile)
if err != nil {
return trace.Wrap(err)
}
sourceChunksFile := filepath.Join(recordingsDir, fmt.Sprintf("%v.session.bytes", sessionID))
targetChunksFile := filepath.Join(recordingsDir, fmt.Sprintf("%v-0.chunks", sessionID))
l.Debugf("Migrating session ID %v. Renamed %v to %v", sessionID, sourceChunksFile, targetChunksFile)
err = os.Rename(sourceChunksFile, targetChunksFile)
if err != nil {
return trace.Wrap(err)
}
indexFileName := filepath.Join(recordingsDir, fmt.Sprintf("%v.index", sessionID))
eventsData, err := json.Marshal(indexEntry{
FileName: filepath.Base(targetEventsFile),
Type: fileTypeEvents,
Index: 0,
})
if err != nil {
return trace.Wrap(err)
}
chunksData, err := json.Marshal(indexEntry{
FileName: filepath.Base(targetChunksFile),
Type: fileTypeChunks,
Offset: 0,
})
if err != nil {
return trace.Wrap(err)
}
err = ioutil.WriteFile(indexFileName,
[]byte(
fmt.Sprintf("%v\n%v\n",
string(eventsData),
string(chunksData),
)),
0640,
)
if err != nil {
return trace.Wrap(err)
}
l.Debugf("Migrating session ID %v. Wrote index file %v.", indexFileName)
}
l.Infof("Moving sessions folder from %v to %v", sessionDir, targetDir)
if err := os.Rename(sessionDir, targetDir); err != nil {
return trace.ConvertSystemError(err)
}
return nil
}
func listDir(dir string) ([]os.FileInfo, error) {
df, err := os.Open(dir)
if err != nil {
return nil, trace.ConvertSystemError(err)
}
defer df.Close()
entries, err := df.Readdir(-1)
if err != nil {
return nil, trace.ConvertSystemError(err)
}
return entries, nil
}
// DELETE IN: 2.6.0
// runMigrations runs migrations for audit logs format
func (l *AuditLog) runMigrations() error {
// migrate sessions directory
if err := l.migrateSessionsDir(); err != nil {
return trace.Wrap(err)
}
// move audit log files to the auth server id
fileInfos, err := listDir(l.DataDir)
if err != nil {
return trace.Wrap(err)
}
for _, fi := range fileInfos {
if !fi.IsDir() {
if err := l.moveAuditLogFile(fi.Name()); err != nil {
return trace.Wrap(err)
}
}
}
return nil
}
// SearchEvents finds events. Results show up sorted by date (newest first)
func (l *AuditLog) SearchEvents(fromUTC, toUTC time.Time, query string) ([]EventFields, error) {
l.Debugf("SearchEvents(%v, %v, query=%v)", fromUTC, toUTC, query)
queryVals, err := url.ParseQuery(query)
if err != nil {
return nil, trace.BadParameter("missing parameter query", query)
}
// how many days of logs to search?
days := int(toUTC.Sub(fromUTC).Hours() / 24)
if days < 0 {
return nil, trace.BadParameter("query", query)
}
// scan the log directory:
df, err := os.Open(l.DataDir)
queryVals, err := url.ParseQuery(query)
if err != nil {
return nil, trace.BadParameter("missing parameter query", query)
}
filtered, err := l.matchingFiles(fromUTC, toUTC)
if err != nil {
return nil, trace.Wrap(err)
}
defer df.Close()
entries, err := df.Readdir(-1)
if err != nil {
return nil, trace.Wrap(err)
}
filtered := make([]os.FileInfo, 0, days)
for i := range entries {
fi := entries[i]
if fi.IsDir() || filepath.Ext(fi.Name()) != LogfileExt {
continue
}
fd := fi.ModTime().UTC()
if fd.After(fromUTC) && fd.Before(toUTC) {
filtered = append(filtered, fi)
}
}
// sort all accepted files by date
sort.Sort(byDate(filtered))
// search within each file:
events := make([]EventFields, 0)
for i := range filtered {
found, err := l.findInFile(filepath.Join(l.DataDir, filtered[i].Name()), queryVals)
found, err := l.findInFile(filtered[i].path, queryVals)
if err != nil {
return nil, trace.Wrap(err)
}
@ -391,7 +752,7 @@ func (l *AuditLog) SearchEvents(fromUTC, toUTC time.Time, query string) ([]Event
// SearchSessionEvents searches for session related events. Used to find completed sessions.
func (l *AuditLog) SearchSessionEvents(fromUTC, toUTC time.Time) ([]EventFields, error) {
l.Infof("SearchSessionEvents(%v, %v)", fromUTC, toUTC)
l.Debugf("SearchSessionEvents(%v, %v)", fromUTC, toUTC)
// only search for specific event types
query := url.Values{}
@ -403,8 +764,13 @@ func (l *AuditLog) SearchSessionEvents(fromUTC, toUTC time.Time) ([]EventFields,
return l.SearchEvents(fromUTC, toUTC, query.Encode())
}
type eventFile struct {
os.FileInfo
path string
}
// byDate implements sort.Interface.
type byDate []os.FileInfo
type byDate []eventFile
func (f byDate) Len() int { return len(f) }
func (f byDate) Less(i, j int) bool { return f[i].ModTime().Before(f[j].ModTime()) }
@ -415,10 +781,13 @@ func (f byDate) Swap(i, j int) { f[i], f[j] = f[j], f[i] }
//
// You can pass multiple types like "event=session.start&event=session.end"
func (l *AuditLog) findInFile(fn string, query url.Values) ([]EventFields, error) {
l.Infof("findInFile(%s, %v)", fn, query)
l.Debugf("Called findInFile(%s, %v).", fn, query)
retval := make([]EventFields, 0)
eventFilter := query[EventType]
eventFilter, ok := query[EventType]
if !ok && len(query) > 0 {
return nil, nil
}
doFilter := len(eventFilter) > 0
// open the log file:
@ -471,7 +840,7 @@ func (l *AuditLog) rotateLog() (err error) {
openLogFile := func() error {
l.Lock()
defer l.Unlock()
logfname := filepath.Join(l.DataDir,
logfname := filepath.Join(l.DataDir, l.ServerID,
fileTime.Format("2006-01-02.15:04:05")+LogfileExt)
l.file, err = os.OpenFile(logfname, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640)
if err != nil {
@ -515,29 +884,9 @@ func (l *AuditLog) Close() error {
return nil
}
// sessionStreamFn helper determines the name of the stream file for a given
// session by its ID
func (l *AuditLog) sessionStreamFn(namespace string, sid session.ID) string {
return filepath.Join(
l.DataDir,
SessionLogsDir,
namespace,
fmt.Sprintf("%s%s", sid, SessionStreamPrefix))
}
// sessionLogFn helper determines the name of the stream file for a given
// session by its ID
func (l *AuditLog) sessionLogFn(namespace string, sid session.ID) string {
return filepath.Join(
l.DataDir,
SessionLogsDir,
namespace,
fmt.Sprintf("%s%s", sid, SessionLogPrefix))
}
// LoggerFor creates a logger for a specified session. Session loggers allow
// to group all events into special "session log files" for easier audit
func (l *AuditLog) LoggerFor(namespace string, sid session.ID) (SessionLogger, error) {
func (l *AuditLog) LoggerFor(namespace string, sid session.ID, compatibilityMode bool) (SessionLogger, error) {
l.Lock()
defer l.Unlock()
@ -556,7 +905,7 @@ func (l *AuditLog) LoggerFor(namespace string, sid session.ID) (SessionLogger, e
return sessionLogger, nil
}
// make sure session logs dir is present
sdir := filepath.Join(l.DataDir, SessionLogsDir, namespace)
sdir := filepath.Join(l.DataDir, l.ServerID, SessionLogsDir, namespace)
if err := os.Mkdir(sdir, *l.DirMask); err != nil {
if !os.IsExist(err) {
return nil, trace.Wrap(err)
@ -567,19 +916,38 @@ func (l *AuditLog) LoggerFor(namespace string, sid session.ID) (SessionLogger, e
return nil, trace.ConvertSystemError(err)
}
}
sessionLogger, err := NewDiskSessionLogger(DiskSessionLoggerConfig{
SessionID: sid,
EventsFileName: l.sessionLogFn(namespace, sid),
StreamFileName: l.sessionStreamFn(namespace, sid),
Clock: l.Clock,
RecordSessions: l.RecordSessions,
})
if err != nil {
return nil, trace.Wrap(err)
// DELETE IN: 2.6.0
// Compatibility mode is required for migration from 2.5.0 to 2.6.0
if compatibilityMode {
l.Debugf("Using compatibility mode logger for session id %v.", sid)
sessionLogger, err := NewCompatSessionLogger(CompatSessionLoggerConfig{
SessionID: sid,
DataDir: sdir,
Clock: l.Clock,
RecordSessions: l.RecordSessions,
})
if err != nil {
return nil, trace.Wrap(err)
}
l.loggers.Set(string(sid), sessionLogger, l.SessionIdlePeriod)
auditOpenFiles.Inc()
return sessionLogger, nil
} else {
sessionLogger, err := NewDiskSessionLogger(DiskSessionLoggerConfig{
SessionID: sid,
DataDir: sdir,
Clock: l.Clock,
RecordSessions: l.RecordSessions,
AuditLog: l,
})
if err != nil {
return nil, trace.Wrap(err)
}
l.loggers.Set(string(sid), sessionLogger, l.SessionIdlePeriod)
auditOpenFiles.Inc()
return sessionLogger, nil
}
l.loggers.Set(string(sid), sessionLogger, l.SessionIdlePeriod)
auditOpenFiles.Inc()
return sessionLogger, nil
}
func (l *AuditLog) asyncCloseSessionLogger(key string, val interface{}) {
@ -587,14 +955,14 @@ func (l *AuditLog) asyncCloseSessionLogger(key string, val interface{}) {
}
func (l *AuditLog) closeSessionLogger(key string, val interface{}) {
l.Debugf("closing session logger %v", key)
l.Debugf("Closing session logger %v.", key)
logger, ok := val.(SessionLogger)
if !ok {
l.Warningf("warning, not valid value type %T for %v", val, key)
l.Warningf("Warning, not valid value type %T for %v.", val, key)
return
}
if err := logger.Finalize(); err != nil {
log.Warningf("failed to finalize: %v", trace.DebugReport(err))
log.Warningf("Failed to finalize: %v.", trace.DebugReport(err))
}
}
@ -616,17 +984,6 @@ func (l *AuditLog) closeInactiveLoggers() {
expired := l.loggers.RemoveExpired(10)
if expired != 0 {
l.Debugf("closed %v inactive session loggers", expired)
l.Debugf("Closed %v inactive session loggers.", expired)
}
}
// eventToLine helper creates a loggable line/string for a given event
func eventToLine(fields EventFields) string {
jbytes, err := json.Marshal(fields)
jsonString := string(jbytes)
if err != nil {
log.Error(err)
jsonString = ""
}
return jsonString
}

View file

@ -2,9 +2,11 @@ package events
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
@ -36,6 +38,7 @@ func (a *AuditTestSuite) makeLog(c *check.C, dataDir string, recordSessions bool
alog, err := NewAuditLog(AuditLogConfig{
DataDir: dataDir,
RecordSessions: recordSessions,
ServerID: "server1",
})
if err != nil {
return nil, trace.Wrap(err)
@ -59,7 +62,8 @@ func (a *AuditTestSuite) TestNew(c *check.C) {
c.Assert(alog.Close(), check.IsNil)
}
func (a *AuditTestSuite) TestComplexLogging(c *check.C) {
// DELETE IN: 2.6.0
func (a *AuditTestSuite) TestCompatComplexLogging(c *check.C) {
now := time.Now().In(time.UTC).Round(time.Second)
// create audit log, write a couple of events into it, close it
@ -138,7 +142,443 @@ func (a *AuditTestSuite) TestComplexLogging(c *check.C) {
c.Assert(found[0].GetString(EventLogin), check.Equals, "vincent")
}
func (a *AuditTestSuite) TestSessionRecordingOff(c *check.C) {
// TestSessionsOnOneAuthServer tests scenario when there are two auth servers
// but session is recorded only on the one
func (a *AuditTestSuite) TestSessionsOnOneAuthServer(c *check.C) {
fakeClock := clockwork.NewFakeClock()
alog, err := NewAuditLog(AuditLogConfig{
Clock: fakeClock,
DataDir: a.dataDir,
RecordSessions: true,
ServerID: "server1",
})
c.Assert(err, check.IsNil)
alog2, err := NewAuditLog(AuditLogConfig{
Clock: fakeClock,
DataDir: a.dataDir,
RecordSessions: true,
ServerID: "server2",
})
c.Assert(err, check.IsNil)
sessionID := "100"
// start the session and emit data stream to it
firstMessage := []byte("hello")
err = alog.PostSessionSlice(SessionSlice{
Namespace: defaults.Namespace,
SessionID: sessionID,
Chunks: []*SessionChunk{
// start the seession
&SessionChunk{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 0,
EventType: SessionStartEvent,
Data: marshal(EventFields{EventLogin: "bob"}),
},
// type "hello" into session "100"
&SessionChunk{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 1,
ChunkIndex: 0,
Offset: 0,
EventType: SessionPrintEvent,
Data: firstMessage,
},
// emitting session end event should close the session
&SessionChunk{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 4,
EventType: SessionEndEvent,
Data: marshal(EventFields{EventLogin: "bob"}),
},
},
Version: V2,
})
c.Assert(err, check.IsNil)
// does not matter which audit server is accessed the results should be the same
for _, a := range []*AuditLog{alog, alog2} {
// read the session bytes
history, err := a.GetSessionEvents(defaults.Namespace, session.ID(sessionID), 0)
c.Assert(err, check.IsNil)
c.Assert(history, check.HasLen, 3)
// make sure offsets were properly set (0 for the first event and 5 bytes for hello):
c.Assert(history[1][SessionByteOffset], check.Equals, float64(0))
c.Assert(history[1][SessionEventTimestamp], check.Equals, float64(0))
// fetch all bytes
buff, err := a.GetSessionChunk(defaults.Namespace, session.ID(sessionID), 0, 5000)
c.Assert(err, check.IsNil)
c.Assert(string(buff), check.Equals, string(firstMessage))
// with offset
buff, err = a.GetSessionChunk(defaults.Namespace, session.ID(sessionID), 2, 5000)
c.Assert(err, check.IsNil)
c.Assert(string(buff), check.Equals, string(firstMessage[2:]))
}
}
// TestSessionsTwoAuthServers tests two auth servers behind the load balancer handling the event stream
// for the same session
func (a *AuditTestSuite) TestSessionsTwoAuthServers(c *check.C) {
fakeClock := clockwork.NewFakeClock()
alog, err := NewAuditLog(AuditLogConfig{
Clock: fakeClock,
DataDir: a.dataDir,
RecordSessions: true,
ServerID: "server1",
})
c.Assert(err, check.IsNil)
alog2, err := NewAuditLog(AuditLogConfig{
Clock: fakeClock,
DataDir: a.dataDir,
RecordSessions: true,
ServerID: "server2",
})
c.Assert(err, check.IsNil)
sessionID := "100"
// start the session and emit data stream to it
firstMessage := []byte("hello")
err = alog.PostSessionSlice(SessionSlice{
Namespace: defaults.Namespace,
SessionID: sessionID,
Chunks: []*SessionChunk{
// start the seession
&SessionChunk{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 0,
EventType: SessionStartEvent,
Data: marshal(EventFields{EventLogin: "bob"}),
},
// type "hello" into session "100"
&SessionChunk{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 1,
ChunkIndex: 0,
Offset: 0,
EventType: SessionPrintEvent,
Data: firstMessage,
},
},
Version: V2,
})
c.Assert(err, check.IsNil)
c.Assert(alog.loggers.Len(), check.Equals, 1)
// now fake sleep past expiration
firstDelay := defaults.SessionIdlePeriod * 2
fakeClock.Advance(firstDelay)
// logger for idle session should be closed
alog.closeInactiveLoggers()
c.Assert(alog.loggers.Len(), check.Equals, 0)
// send another event to the session via second auth server
secondMessage := []byte("good day")
err = alog2.PostSessionSlice(SessionSlice{
Namespace: defaults.Namespace,
SessionID: sessionID,
Chunks: []*SessionChunk{
// notice how offsets are sent by the client
&SessionChunk{
Time: fakeClock.Now().UTC().UnixNano(),
Delay: int64(firstDelay / time.Millisecond),
EventIndex: 2,
ChunkIndex: 1,
Offset: int64(len(firstMessage)),
EventType: SessionPrintEvent,
Data: secondMessage,
},
},
Version: V2,
})
c.Assert(err, check.IsNil)
c.Assert(alog2.loggers.Len(), check.Equals, 1)
// emit next event 17 milliseconds later to the first auth server
thirdMessage := []byte("test")
secondDelay := 17 * time.Millisecond
fakeClock.Advance(secondDelay)
err = alog.PostSessionSlice(SessionSlice{
Namespace: defaults.Namespace,
SessionID: sessionID,
Chunks: []*SessionChunk{
// notice how offsets are sent by the client
&SessionChunk{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 3,
ChunkIndex: 2,
Delay: int64((firstDelay + secondDelay) / time.Millisecond),
Offset: int64(len(firstMessage) + len(secondMessage)),
EventType: SessionPrintEvent,
Data: thirdMessage,
},
// emitting session end event should close the session
&SessionChunk{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 4,
EventType: SessionEndEvent,
Data: marshal(EventFields{EventLogin: "bob"}),
},
},
Version: V2,
})
c.Assert(err, check.IsNil)
// emitting session end event should close the session
c.Assert(alog.loggers.Len(), check.Equals, 0)
// does not matter which audit server is accessed the results should be the same
for _, a := range []*AuditLog{alog, alog2} {
// read the session bytes
history, err := a.GetSessionEvents(defaults.Namespace, session.ID(sessionID), 1)
c.Assert(err, check.IsNil)
c.Assert(history, check.HasLen, 4)
// make sure offsets were properly set (0 for the first event and 5 bytes for hello):
c.Assert(history[0][SessionByteOffset], check.Equals, float64(0))
c.Assert(history[1][SessionByteOffset], check.Equals, float64(len(firstMessage)))
c.Assert(history[2][SessionByteOffset], check.Equals, float64(len(firstMessage)+len(secondMessage)))
// make sure delays are right
c.Assert(history[0][SessionEventTimestamp], check.Equals, float64(0))
c.Assert(history[1][SessionEventTimestamp], check.Equals, float64(firstDelay/time.Millisecond))
c.Assert(history[2][SessionEventTimestamp], check.Equals, float64((firstDelay+secondDelay)/time.Millisecond))
// fetch all bytes
buff, err := a.GetSessionChunk(defaults.Namespace, session.ID(sessionID), 0, 5000)
c.Assert(err, check.IsNil)
c.Assert(string(buff), check.Equals, "hellogood daytest")
// with offset
buff, err = a.GetSessionChunk(defaults.Namespace, session.ID(sessionID), 2, 5000)
c.Assert(err, check.IsNil)
c.Assert(string(buff), check.Equals, "llogood daytest")
// with another offset at the boundary of the first message
buff, err = a.GetSessionChunk(defaults.Namespace, session.ID(sessionID), len(firstMessage), 5000)
c.Assert(err, check.IsNil)
c.Assert(string(buff), check.Equals, "good daytest")
// with another offset after the boundary of the first message
buff, err = a.GetSessionChunk(defaults.Namespace, session.ID(sessionID), len(firstMessage)+1, 5000)
c.Assert(err, check.IsNil)
c.Assert(string(buff), check.Equals, "ood daytest")
// with another offset after the boundary of the third message
buff, err = a.GetSessionChunk(defaults.Namespace, session.ID(sessionID), len(firstMessage)+len(secondMessage), 5000)
c.Assert(err, check.IsNil)
c.Assert(string(buff), check.Equals, "test")
// with another offset outside the boundaries
buff, err = a.GetSessionChunk(defaults.Namespace, session.ID(sessionID), len(firstMessage)+len(secondMessage)+len(thirdMessage), 5000)
c.Assert(err, check.IsNil)
c.Assert(string(buff), check.Equals, "")
}
}
// TestSearchTwoAuthServers tests search on two auth servers behind the load balancer handling the event stream
// for the same session
func (a *AuditTestSuite) TestSearchTwoAuthServers(c *check.C) {
startTime := time.Now().UTC()
alog, err := NewAuditLog(AuditLogConfig{
DataDir: a.dataDir,
RecordSessions: true,
ServerID: "server1",
})
c.Assert(err, check.IsNil)
alog2, err := NewAuditLog(AuditLogConfig{
DataDir: a.dataDir,
RecordSessions: true,
ServerID: "server2",
})
c.Assert(err, check.IsNil)
sessionID := "100"
// start the session and emit data stream to it
firstMessage := []byte("hello")
err = alog.PostSessionSlice(SessionSlice{
Namespace: defaults.Namespace,
SessionID: sessionID,
Chunks: []*SessionChunk{
// start the seession
&SessionChunk{
Time: time.Now().UTC().UnixNano(),
EventIndex: 0,
EventType: SessionStartEvent,
Data: marshal(EventFields{EventLogin: "bob"}),
},
// type "hello" into session "100"
&SessionChunk{
Time: time.Now().UTC().UnixNano(),
EventIndex: 1,
ChunkIndex: 0,
Offset: 0,
EventType: SessionPrintEvent,
Data: firstMessage,
},
},
Version: V2,
})
c.Assert(err, check.IsNil)
c.Assert(alog.loggers.Len(), check.Equals, 1)
// send another event to the session via second auth server
secondMessage := []byte("good day")
err = alog2.PostSessionSlice(SessionSlice{
Namespace: defaults.Namespace,
SessionID: sessionID,
Chunks: []*SessionChunk{
// notice how offsets are sent by the client
&SessionChunk{
Time: time.Now().UTC().UnixNano(),
EventIndex: 2,
ChunkIndex: 1,
Offset: int64(len(firstMessage)),
EventType: SessionPrintEvent,
Data: secondMessage,
},
},
Version: V2,
})
c.Assert(err, check.IsNil)
c.Assert(alog2.loggers.Len(), check.Equals, 1)
// emit next event 17 milliseconds later to the first auth server
thirdMessage := []byte("test")
err = alog.PostSessionSlice(SessionSlice{
Namespace: defaults.Namespace,
SessionID: sessionID,
Chunks: []*SessionChunk{
// notice how offsets are sent by the client
&SessionChunk{
Time: time.Now().UTC().UnixNano(),
EventIndex: 3,
ChunkIndex: 2,
Offset: int64(len(firstMessage) + len(secondMessage)),
EventType: SessionPrintEvent,
Data: thirdMessage,
},
// emitting session end event should close the session
&SessionChunk{
EventIndex: 4,
EventType: SessionEndEvent,
Data: marshal(EventFields{EventLogin: "bob"}),
},
},
Version: V2,
})
c.Assert(err, check.IsNil)
// emitting session end event should close the session
c.Assert(alog.loggers.Len(), check.Equals, 0)
// does not matter which audit server is accessed the results should be the same
for _, a := range []*AuditLog{alog, alog2} {
comment := check.Commentf("auth server %v", a.ServerID)
// search events, start time is in the future
query := fmt.Sprintf("%s=%s", EventType, SessionStartEvent)
found, err := a.SearchEvents(startTime.Add(time.Hour), startTime.Add(time.Hour), query)
c.Assert(err, check.IsNil)
c.Assert(len(found), check.Equals, 0, comment)
// try searching (wrong query)
found, err = a.SearchEvents(startTime, startTime.Add(time.Hour), "foo=bar")
c.Assert(err, check.IsNil)
c.Assert(len(found), check.Equals, 0, comment)
// try searching (good query: for "session start")
found, err = a.SearchEvents(startTime.Add(-time.Hour), startTime.Add(time.Hour), query)
c.Assert(err, check.IsNil)
c.Assert(len(found), check.Equals, 1, comment)
c.Assert(found[0].GetString(EventLogin), check.Equals, "bob", comment)
// try searching (empty query means "anything")
found, err = alog.SearchEvents(startTime.Add(-time.Hour), startTime.Add(time.Hour), "")
c.Assert(err, check.IsNil)
c.Assert(len(found), check.Equals, 2) // total number of events logged in this test
c.Assert(found[0].GetString(EventType), check.Equals, SessionStartEvent, comment)
c.Assert(found[1].GetString(EventType), check.Equals, SessionEndEvent, comment)
}
}
type file struct {
path string
contents []byte
}
var v1Files = []file{
{
path: "2017-12-14.00:00:00.log",
contents: []byte(`{"event":"user.login","method":"oidc","time":"2017-12-13T17:38:34Z","user":"alice@example.com"}
{"addr.local":"172.10.1.20:3022","addr.remote":"172.10.1.254:52406","event":"session.start","login":"root","namespace":"default","server_id":"3e79a2d7-c9e3-4d3f-96ce-1e1346b4900c","sid":"74a5fc73-e02c-11e7-aee2-0242ac0a0101","size":"80:25","time":"2017-12-13T17:38:40Z","user":"alice@example.com"}
{"event":"session.leave","namespace":"default","server_id":"020130c8-b41f-4da5-a061-74c2b0e2b40b","sid":"75aef036-e02c-11e7-aee2-0242ac0a0101","time":"2017-12-13T17:38:42Z","user":"alice@example.com"}
`),
},
{
path: "sessions/default/74a5fc73-e02c-11e7-aee2-0242ac0a0101.session.log",
contents: []byte(`{"addr.local":"172.10.1.20:3022","addr.remote":"172.10.1.254:52406","event":"session.start","login":"root","namespace":"default","server_id":"3e79a2d7-c9e3-4d3f-96ce-1e1346b4900c","sid":"74a5fc73-e02c-11e7-aee2-0242ac0a0101","size":"80:25","time":"2017-12-13T17:38:40Z","user":"alice@example.com"}
{"event":"session.leave","namespace":"default","server_id":"3e79a2d7-c9e3-4d3f-96ce-1e1346b4900c","sid":"74a5fc73-e02c-11e7-aee2-0242ac0a0101","time":"2017-12-13T17:38:41Z","user":"alice@example.com"}
{"time":"2017-12-13T17:38:40.038Z","event":"print","bytes":31,"ms":0,"offset":0}
`),
},
{
path: "sessions/default/74a5fc73-e02c-11e7-aee2-0242ac0a0101.session.bytes",
contents: []byte(`"this string is exactly 31 bytes"`),
},
}
// DELETE IN: 2.6.0
// TestMigrationsToV2 tests migrations to V2 loging format
func (a *AuditTestSuite) TestMigrationsToV2(c *check.C) {
for _, file := range v1Files {
fileName := filepath.Join(a.dataDir, file.path)
err := os.MkdirAll(filepath.Dir(fileName), 0755)
c.Assert(err, check.IsNil)
err = ioutil.WriteFile(fileName, file.contents, 06440)
c.Assert(err, check.IsNil)
}
// create audit log with session recording disabled
alog, err := a.makeLog(c, a.dataDir, false)
c.Assert(err, check.IsNil)
// sessions have been migrated
sid := "74a5fc73-e02c-11e7-aee2-0242ac0a0101"
events, err := alog.GetSessionEvents(defaults.Namespace, session.ID(sid), 0)
c.Assert(err, check.IsNil)
c.Assert(events, check.HasLen, 3)
// global events were migrated
events, err = alog.SearchEvents(time.Time{}, time.Now().Add(time.Hour), "")
c.Assert(err, check.IsNil)
c.Assert(events, check.HasLen, 3)
// second time migration is idempotent
alog, err = a.makeLog(c, a.dataDir, false)
c.Assert(err, check.IsNil)
events, err = alog.GetSessionEvents(defaults.Namespace, session.ID(sid), 0)
c.Assert(err, check.IsNil)
c.Assert(events, check.HasLen, 3)
// global events were migrated
events, err = alog.SearchEvents(time.Time{}, time.Now().Add(time.Hour), "")
c.Assert(err, check.IsNil)
c.Assert(events, check.HasLen, 3)
}
// DELETE IN: 2.6.0
func (a *AuditTestSuite) TestCompatSessionRecordingOff(c *check.C) {
now := time.Now().In(time.UTC).Round(time.Second)
// create audit log with session recording disabled
@ -175,6 +615,69 @@ func (a *AuditTestSuite) TestSessionRecordingOff(c *check.C) {
c.Assert(err, check.NotNil)
}
func (a *AuditTestSuite) TestSessionRecordingOff(c *check.C) {
now := time.Now().In(time.UTC).Round(time.Second)
// create audit log with session recording disabled
alog, err := a.makeLog(c, a.dataDir, false)
c.Assert(err, check.IsNil)
alog.Clock = clockwork.NewFakeClockAt(now)
username := "alice"
sessionID := "200"
// start the session and emit data stream to it
firstMessage := []byte("hello")
err = alog.PostSessionSlice(SessionSlice{
Namespace: defaults.Namespace,
SessionID: sessionID,
Chunks: []*SessionChunk{
// start the session
&SessionChunk{
Time: alog.Clock.Now().UTC().UnixNano(),
EventIndex: 0,
EventType: SessionStartEvent,
Data: marshal(EventFields{EventLogin: username}),
},
// type "hello" into session "100"
&SessionChunk{
Time: alog.Clock.Now().UTC().UnixNano(),
EventIndex: 1,
ChunkIndex: 0,
Offset: 0,
EventType: SessionPrintEvent,
Data: firstMessage,
},
// end the session
&SessionChunk{
Time: alog.Clock.Now().UTC().UnixNano(),
EventIndex: 0,
EventType: SessionEndEvent,
Data: marshal(EventFields{EventLogin: username}),
},
},
Version: V2,
})
c.Assert(err, check.IsNil)
c.Assert(alog.loggers.Len(), check.Equals, 0)
// get all events from the audit log, should have two events
found, err := alog.SearchEvents(now.Add(-time.Hour), now.Add(time.Hour), "")
c.Assert(err, check.IsNil)
c.Assert(found, check.HasLen, 2)
c.Assert(found[0].GetString(EventLogin), check.Equals, username)
c.Assert(found[1].GetString(EventLogin), check.Equals, username)
// inspect the session log for "200", should have two events
history, err := alog.GetSessionEvents(defaults.Namespace, session.ID(sessionID), 0)
c.Assert(err, check.IsNil)
c.Assert(history, check.HasLen, 2)
// try getting the session stream, should get an error
_, err = alog.GetSessionChunk(defaults.Namespace, session.ID(sessionID), 0, 5000)
c.Assert(err, check.NotNil)
}
func (a *AuditTestSuite) TestBasicLogging(c *check.C) {
now := time.Now().In(time.UTC).Round(time.Second)
// create audit log, write a couple of events into it, close it
@ -195,23 +698,26 @@ func (a *AuditTestSuite) TestBasicLogging(c *check.C) {
fmt.Sprintf("{\"apples?\":\"yes\",\"event\":\"user.joined\",\"time\":\"%s\"}\n", now.Format(time.RFC3339)))
}
// TestAutoClose tests scenario with auto closing of inactive sessions
func (a *AuditTestSuite) TestAutoClose(c *check.C) {
// DELETE IN: 2.6.0
// TestCompatAutoClose tests scenario with auto closing of inactive sessions for compatibility logger
func (a *AuditTestSuite) TestCompatAutoClose(c *check.C) {
// create audit log, write a couple of events into it, close it
fakeClock := clockwork.NewFakeClock()
alog, err := NewAuditLog(AuditLogConfig{
DataDir: a.dataDir,
RecordSessions: true,
Clock: fakeClock,
ServerID: "autoclose1",
})
c.Assert(err, check.IsNil)
sessionID := "100"
// start the session and emit data stream to it
err = alog.EmitAuditEvent(SessionStartEvent, EventFields{SessionEventID: "100", EventLogin: "bob", EventNamespace: defaults.Namespace})
err = alog.EmitAuditEvent(SessionStartEvent, EventFields{SessionEventID: sessionID, EventLogin: "bob", EventNamespace: defaults.Namespace})
c.Assert(err, check.IsNil)
c.Assert(alog.loggers.Len(), check.Equals, 1)
sessionID := "100"
// type "hello" into session "100":
firstMessage := "hello"
err = alog.PostSessionChunk(defaults.Namespace, session.ID(sessionID), bytes.NewBufferString(firstMessage))
@ -256,16 +762,141 @@ func (a *AuditTestSuite) TestAutoClose(c *check.C) {
c.Assert(history[0][SessionEventTimestamp], check.Equals, float64(0))
c.Assert(history[1][SessionEventTimestamp], check.Equals, float64(firstDelay/time.Millisecond))
c.Assert(history[2][SessionEventTimestamp], check.Equals, float64((firstDelay+secondDelay)/time.Millisecond))
}
// TestCloseOutstanding makes sure the logger closed outstanding sessions
func (a *AuditTestSuite) TestCloseOutstanding(c *check.C) {
// TestAutoClose tests scenario with auto closing of inactive sessions
func (a *AuditTestSuite) TestAutoClose(c *check.C) {
// create audit log, write a couple of events into it, close it
fakeClock := clockwork.NewFakeClock()
alog, err := NewAuditLog(AuditLogConfig{
DataDir: a.dataDir,
RecordSessions: true,
Clock: fakeClock,
ServerID: "autoclose1",
})
c.Assert(err, check.IsNil)
sessionID := "100"
// start the session and emit data stream to it
firstMessage := []byte("hello")
err = alog.PostSessionSlice(SessionSlice{
Namespace: defaults.Namespace,
SessionID: sessionID,
Chunks: []*SessionChunk{
// start the seession
&SessionChunk{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 0,
EventType: SessionStartEvent,
Data: marshal(EventFields{EventLogin: "bob"}),
},
// type "hello" into session "100"
&SessionChunk{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 1,
ChunkIndex: 0,
Offset: 0,
EventType: SessionPrintEvent,
Data: firstMessage,
},
},
Version: V2,
})
c.Assert(err, check.IsNil)
c.Assert(alog.loggers.Len(), check.Equals, 1)
// now fake sleep past expiration
firstDelay := defaults.SessionIdlePeriod * 2
fakeClock.Advance(firstDelay)
// logger for idle session should be closed
alog.closeInactiveLoggers()
c.Assert(alog.loggers.Len(), check.Equals, 0)
// send another event to the session
secondMessage := []byte("good day")
err = alog.PostSessionSlice(SessionSlice{
Namespace: defaults.Namespace,
SessionID: sessionID,
Chunks: []*SessionChunk{
// notice how offsets are sent by the client
&SessionChunk{
Time: fakeClock.Now().UTC().UnixNano(),
Delay: int64(firstDelay / time.Millisecond),
EventIndex: 2,
ChunkIndex: 1,
Offset: int64(len(firstMessage)),
EventType: SessionPrintEvent,
Data: secondMessage,
},
},
Version: V2,
})
c.Assert(err, check.IsNil)
c.Assert(alog.loggers.Len(), check.Equals, 1)
// emit next event 17 milliseconds later
thirdMessage := []byte("test")
secondDelay := 17 * time.Millisecond
fakeClock.Advance(secondDelay)
err = alog.PostSessionSlice(SessionSlice{
Namespace: defaults.Namespace,
SessionID: sessionID,
Chunks: []*SessionChunk{
// notice how offsets are sent by the client
&SessionChunk{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 3,
ChunkIndex: 2,
Delay: int64((firstDelay + secondDelay) / time.Millisecond),
Offset: int64(len(firstMessage) + len(secondMessage)),
EventType: SessionPrintEvent,
Data: thirdMessage,
},
// emitting session end event should close the session
&SessionChunk{
Time: fakeClock.Now().UTC().UnixNano(),
EventIndex: 4,
EventType: SessionEndEvent,
Data: marshal(EventFields{EventLogin: "bob"}),
},
},
Version: V2,
})
c.Assert(err, check.IsNil)
// emitting session end event should close the session
c.Assert(alog.loggers.Len(), check.Equals, 0)
// read the session bytes
history, err := alog.GetSessionEvents(defaults.Namespace, session.ID(sessionID), 1)
c.Assert(err, check.IsNil)
c.Assert(history, check.HasLen, 4)
// make sure offsets were properly set (0 for the first event and 5 bytes for hello):
c.Assert(history[0][SessionByteOffset], check.Equals, float64(0))
c.Assert(history[1][SessionByteOffset], check.Equals, float64(len(firstMessage)))
c.Assert(history[2][SessionByteOffset], check.Equals, float64(len(firstMessage)+len(secondMessage)))
// make sure delays are right
c.Assert(history[0][SessionEventTimestamp], check.Equals, float64(0))
c.Assert(history[1][SessionEventTimestamp], check.Equals, float64(firstDelay/time.Millisecond))
c.Assert(history[2][SessionEventTimestamp], check.Equals, float64((firstDelay+secondDelay)/time.Millisecond))
}
// DELETE IN: 2.6.0
// TestCompatCloseOutstanding makes sure the logger working in compatibility mode
// closed outstanding sessions
func (a *AuditTestSuite) TestCompatCloseOutstanding(c *check.C) {
// create audit log, write a couple of events into it, close it
fakeClock := clockwork.NewFakeClock()
alog, err := NewAuditLog(AuditLogConfig{
DataDir: a.dataDir,
RecordSessions: true,
Clock: fakeClock,
ServerID: "outstanding",
})
c.Assert(err, check.IsNil)
@ -277,3 +908,42 @@ func (a *AuditTestSuite) TestCloseOutstanding(c *check.C) {
alog.Close()
c.Assert(alog.loggers.Len(), check.Equals, 0)
}
// TestCloseOutstanding makes sure the logger closed outstanding sessions
func (a *AuditTestSuite) TestCloseOutstanding(c *check.C) {
// create audit log, write a couple of events into it, close it
fakeClock := clockwork.NewFakeClock()
alog, err := NewAuditLog(AuditLogConfig{
DataDir: a.dataDir,
RecordSessions: true,
Clock: fakeClock,
ServerID: "outstanding",
})
c.Assert(err, check.IsNil)
// start the session and emit data stream to it
err = alog.PostSessionSlice(SessionSlice{
Namespace: defaults.Namespace,
SessionID: "100",
Chunks: []*SessionChunk{
&SessionChunk{
EventIndex: 0,
EventType: SessionStartEvent,
Data: marshal(EventFields{EventLogin: "bob"}),
},
},
Version: V2,
})
c.Assert(err, check.IsNil)
c.Assert(alog.loggers.Len(), check.Equals, 1)
alog.Close()
c.Assert(alog.loggers.Len(), check.Equals, 0)
}
func marshal(f EventFields) []byte {
data, err := json.Marshal(f)
if err != nil {
panic(err)
}
return data
}

View file

@ -0,0 +1,315 @@
/*
Copyright 2017 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"bytes"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"
)
// DELETE IN: 2.6.0
// CompatSessionLogger is used only during upgrades from 2.4.0 to 2.5.0
// Should be deleted in 2.6.0 releases
// CompatSessionLoggerConfig sets up parameters for disk session logger
// associated with the session ID
type CompatSessionLoggerConfig struct {
// SessionID is the session id of the logger
SessionID session.ID
// DataDir is data directory for session events files
DataDir string
// Clock is the clock replacement
Clock clockwork.Clock
// RecordSessions controls if sessions are recorded along with audit events.
RecordSessions bool
}
// DELETE IN: 2.6.0
// CompatSessionLogger is used only during upgrades from 2.4.0 to 2.5.0
// Should be deleted in 2.6.0 releases
// NewCompatSessionLogger creates new disk based session logger
func NewCompatSessionLogger(cfg CompatSessionLoggerConfig) (*CompatSessionLogger, error) {
var err error
lastPrintEvent, err := readLastPrintEvent(eventsFileName(cfg.DataDir, cfg.SessionID, 0))
if err != nil {
if !trace.IsNotFound(err) {
return nil, trace.Wrap(err)
}
// no last event is ok
lastPrintEvent = nil
}
indexFile, err := os.OpenFile(filepath.Join(cfg.DataDir, fmt.Sprintf("%v.index", cfg.SessionID.String())), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640)
if err != nil {
return nil, trace.Wrap(err)
}
sessionLogger := &CompatSessionLogger{
Entry: log.WithFields(log.Fields{
trace.Component: teleport.ComponentAuditLog,
trace.ComponentFields: log.Fields{
"sid": cfg.SessionID,
},
}),
indexFile: indexFile,
CompatSessionLoggerConfig: cfg,
lastPrintEvent: lastPrintEvent,
}
return sessionLogger, nil
}
// CompatSessionLogger implements a disk based session logger. The imporant
// property of the disk based logger is that it never fails and can be used as
// a fallback implementation behind more sophisticated loggers.
type CompatSessionLogger struct {
CompatSessionLoggerConfig
*log.Entry
sync.Mutex
indexFile *os.File
eventsFile *os.File
chunksFile *os.File
// lastPrintEvent is the last written session event
lastPrintEvent *printEvent
}
// LogEvent logs an event associated with this session
func (sl *CompatSessionLogger) LogEvent(fields EventFields) error {
if err := sl.openEventsFile(); err != nil {
return trace.Wrap(err)
}
if _, ok := fields[EventTime]; !ok {
fields[EventTime] = sl.Clock.Now().In(time.UTC).Round(time.Millisecond)
}
data, err := json.Marshal(fields)
if err != nil {
return trace.Wrap(err)
}
_, err = fmt.Fprintln(sl.eventsFile, string(data))
if err != nil {
return trace.Wrap(err)
}
return nil
}
// readLastEvent reads last event from the file, it opens
// the file in read only mode and closes it after
func readLastPrintEvent(fileName string) (*printEvent, error) {
f, err := os.Open(fileName)
if err != nil {
return nil, trace.ConvertSystemError(err)
}
defer f.Close()
info, err := f.Stat()
if err != nil {
return nil, trace.ConvertSystemError(err)
}
if info.Size() == 0 {
return nil, trace.NotFound("no events found")
}
bufSize := int64(512)
if info.Size() < bufSize {
bufSize = info.Size()
}
buf := make([]byte, bufSize)
_, err = f.ReadAt(buf, info.Size()-bufSize)
if err != nil {
return nil, trace.ConvertSystemError(err)
}
lines := bytes.Split(buf, []byte("\n"))
if len(lines) == 0 {
return nil, trace.BadParameter("expected some lines, got %q", string(buf))
}
for i := len(lines) - 1; i > 0; i-- {
line := bytes.TrimSpace(lines[i])
if len(line) == 0 {
continue
}
var event printEvent
if err = json.Unmarshal(line, &event); err != nil {
return nil, trace.Wrap(err)
}
if event.Type != SessionPrintEvent {
continue
}
return &event, nil
}
return nil, trace.NotFound("no session print events found")
}
// Close is called when clients close on the requested "session writer".
// We ignore their requests because this writer (file) should be closed only
// when the session logger is closed
func (sl *CompatSessionLogger) Close() error {
sl.Debugf("Close")
return nil
}
// Finalize is called by the session when it's closing. This is where we're
// releasing audit resources associated with the session
func (sl *CompatSessionLogger) Finalize() error {
sl.Lock()
defer sl.Unlock()
auditOpenFiles.Dec()
if sl.indexFile != nil {
sl.indexFile.Close()
}
if sl.chunksFile != nil {
sl.chunksFile.Close()
}
if sl.eventsFile != nil {
sl.eventsFile.Close()
}
return nil
}
// WriteChunk takes a stream of bytes (usually the output from a session terminal)
// and writes it into a "stream file", for future replay of interactive sessions.
func (sl *CompatSessionLogger) WriteChunk(chunk *SessionChunk) (written int, err error) {
sl.Lock()
defer sl.Unlock()
// when session recording is turned off, don't record the session byte stream
if sl.RecordSessions == false {
return len(chunk.Data), nil
}
if err := sl.openChunksFile(); err != nil {
return -1, trace.Wrap(err)
}
if written, err = sl.chunksFile.Write(chunk.Data); err != nil {
return written, trace.Wrap(err)
}
err = sl.writePrintEvent(time.Unix(0, chunk.Time), len(chunk.Data))
return written, trace.Wrap(err)
}
// writePrintEvent logs print event indicating write to the session
func (sl *CompatSessionLogger) writePrintEvent(start time.Time, bytesWritten int) error {
if err := sl.openEventsFile(); err != nil {
return trace.Wrap(err)
}
start = start.In(time.UTC).Round(time.Millisecond)
offset := int64(0)
delayMilliseconds := int64(0)
if sl.lastPrintEvent != nil {
offset = sl.lastPrintEvent.Offset + sl.lastPrintEvent.Bytes
delayMilliseconds = diff(sl.lastPrintEvent.Start, start) + sl.lastPrintEvent.DelayMilliseconds
}
event := printEvent{
Start: start,
Type: SessionPrintEvent,
Bytes: int64(bytesWritten),
DelayMilliseconds: delayMilliseconds,
Offset: offset,
}
bytes, err := json.Marshal(event)
if err != nil {
return trace.Wrap(err)
}
_, err = fmt.Fprintln(sl.eventsFile, string(bytes))
if err != nil {
return trace.Wrap(err)
}
sl.lastPrintEvent = &event
return trace.Wrap(err)
}
func (sl *CompatSessionLogger) openEventsFile() error {
if sl.eventsFile != nil {
return nil
}
eventsFileName := eventsFileName(sl.DataDir, sl.SessionID, 0)
// udpate the index file to write down that new events file has been created
data, err := json.Marshal(indexEntry{
FileName: filepath.Base(eventsFileName),
Type: fileTypeEvents,
Index: 0,
})
if err != nil {
return trace.Wrap(err)
}
_, err = fmt.Fprintf(sl.indexFile, "%v\n", string(data))
if err != nil {
return trace.Wrap(err)
}
// open new events file for writing
sl.eventsFile, err = os.OpenFile(eventsFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640)
if err != nil {
return trace.Wrap(err)
}
return nil
}
func (sl *CompatSessionLogger) openChunksFile() error {
if sl.chunksFile != nil {
return nil
}
// chunksFileName consists of session id and the first global offset recorded
chunksFileName := chunksFileName(sl.DataDir, sl.SessionID, 0)
// udpate the index file to write down that new chunks file has been created
data, err := json.Marshal(indexEntry{
FileName: filepath.Base(chunksFileName),
Type: fileTypeChunks,
Offset: 0,
})
if err != nil {
return trace.Wrap(err)
}
_, err = fmt.Fprintf(sl.indexFile, "%v\n", string(data))
if err != nil {
return trace.Wrap(err)
}
// open new chunks file for writing
sl.chunksFile, err = os.OpenFile(chunksFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640)
if err != nil {
return trace.Wrap(err)
}
return nil
}

View file

@ -15,19 +15,77 @@ limitations under the License.
*/
/*
Package events currently implements the audit log using a simple filesystem backend.
"Implements" means it implements events.IAuditLog interface (see events/api.go)
Package events implements the audit log interface events.IAuditLog
using filesystem backend.
Audit logs
----------
Audit logs are events associated with user logins, server access
and session log events like session.start.
Example audit log event:
{"addr.local":"172.10.1.20:3022",
"addr.remote":"172.10.1.254:58866",
"event":"session.start",
"login":"root",
"user":"klizhentas@gmail.com"
}
Session Logs
------------
Session logs are a series of events and recorded SSH interactive session playback.
Example session log event:
{
"time":"2018-01-04T02:12:40.245Z",
"event":"print",
"bytes":936,
"ms":40962,
"offset":16842,
"ei":31,
"ci":29
}
Print event fields
------------------
Print event specifies session output - PTY io recorded by Teleport node or Proxy
based on the configuration.
* "offset" is an offset in bytes from a start of a session
* "ms" is a delay in milliseconds from the last event occured
* "ci" is a chunk index ordering only print events
* "ei" is an event index ordering events from the first one
As in example of print event above, "ei" - is a session event index - 31,
while "ci" is a chunk index - meaning that this event is 29th in a row of print events.
Client streaming session logs
------------------------------
Session related logs are delivered in order defined by clients.
Every event is ordered and has a session-local index, every next event has index incremented.
Client deliveres session events in batches, where every event in the batch
is guaranteed to be in continuous order (e.g. no cases with events
delivered in a single batch to have missing event or chunk index).
Disk File format
----------------
On disk file format is designed to be compatible with NFS filesystems and provides
guarantee that only one auth server writes to the file at a time.
Main Audit Log Format
=====================
The main log files are saved as:
/var/lib/teleport/log/<date>.log
Each session has its own session log stored as two files
/var/lib/teleport/log/<session-id>.session.log
/var/lib/teleport/log/<session-id>.session.bytes
Where:
- .session.log (same events as in the main log, but related to the session)
- .session.bytes (recorded session bytes: PTY IO)
/var/lib/teleport/log/<auth-server-id>/<date>.log
The log file is rotated every 24 hours. The old files must be cleaned
up or archived by an external tool.
@ -45,5 +103,120 @@ Common JSON fields
Examples:
2016-04-25 22:37:29 +0000 UTC,session.start,{"addr.local":"127.0.0.1:3022","addr.remote":"127.0.0.1:35732","login":"root","sid":"4a9d97de-0b36-11e6-a0b3-d8cb8ae5080e","user":"vincent"}
2016-04-25 22:54:31 +0000 UTC,exec,{"addr.local":"127.0.0.1:3022","addr.remote":"127.0.0.1:35949","command":"-bash -c ls /","login":"root","user":"vincent"}
Session log file format
=======================
Each session has its own session log stored as several files:
Index file contains a list of event files and chunks files associated with a session:
/var/lib/teleport/log/sessions/<auth-server-id>/<session-id>.index
The format of the index file contains of two or more lines with pointers to other files:
{"file_name":"<session-id>-<first-event-in-file-index>.events","type":"events","index":<first-event-in-file-index>}
{"file_name":"<session-id>-<first-chunk-in-file-offset>.chunks","type":"chunks","offset":<first-chunk-in-file-offset>}
Files:
/var/lib/teleport/log/<auth-server-id>/<session-id>-<first-event-in-file-index>.events
/var/lib/teleport/log/<auth-server-id>/<session-id>-<first-chunk-in-file-offset>.chunks
Where:
- .events (same events as in the main log, but related to the session)
- .chunks (recorded session bytes: PTY IO)
Examples
~~~~~~~~
**Single auth server**
In the simplest case, single auth server a1 log for a single session id s1
will consist of three files:
/var/lib/teleport/a1/s1.index
With contents:
{"file_name":"s1-0.events","type":"events","index":0}
{"file_name":"s1-0.chunks","type":"chunks","offset":0}
This means that all session events are located in s1-0.events file starting from
the first event with index 0 and all chunks are located in file s1-0.chunks file
with the byte offset from the start - 0.
File with session events /var/lib/teleport/a1/s1-0.events will contain:
{"ei":0,"event":"session.start", ...}
{"ei":1,"event":"resize",...}
{"ei":2,"ci":0, "event":"print","bytes":40,"offset":0}
{"ei":3,"event":"session.end", ...}
File with recorded session /var/lib/teleport/a1/s1-0.chunks will contain 40 bytes
emitted by print event with chunk index 0
**Multiple Auth Servers**
In high availability mode scenario, multiple auth servers will be
deployed behind a load balancer.
Any auth server can go down during session and clients will retry the delivery
to the other auth server.
Both auth servers have mounted /var/lib/teleport/log as a shared NFS folder.
To make sure that only one auth server writes to a file at a time,
each auth server writes to it's own file in a sub folder named
with host UUID of the server.
Client sends the chunks of events related to the session s1 in order,
but load balancer sends first batch of event to the first server a1,
and the second batch of event to the second server a2.
Server a1 will produce the following file:
/var/lib/teleport/a1/s1.index
With contents:
{"file_name":"s1-0.events","type":"events","index":0}
{"file_name":"s1-0.chunks","type":"chunks","offset":0}
Events file /var/lib/teleport/a1/s1-0.events will contain:
{"ei":0,"event":"session.start", ...}
{"ei":1,"event":"resize",...}
{"ei":2,"ci":0, "event":"print","bytes":40,"offset":0}
Events file /var/lib/teleport/a1/s1-0.chunks will contain 40 bytes
emitted by print event with chunk index.
Server a2 will produce the following file:
/var/lib/teleport/a2/s1.index
With contents:
{"file_name":"s1-3.events","type":"events","index":3}
{"file_name":"s1-40.chunks","type":"chunks","offset":40}
Events file /var/lib/teleport/a2/s1-4.events will contain:
{"ei":3,"ci":1, "event":"print","bytes":15,"ms":713,"offset":40}
{"ei":4,"event":"session.end", ...}
Events file /var/lib/teleport/a2/s1-40.chunks will contain 15 bytes emitted
by print event with chunk index 1 and comes after delay of 713 milliseconds.
Offset 40 indicates that the first chunk stored in the file s1-40.chunks
comes at an offset of 40 bytes from the start of the session.
Log Search and Playback
-----------------------
Log search and playback is aware of multiple auth servers, merges
indexes, event streams stored on multiple auth servers.
*/
package events

View file

@ -17,10 +17,10 @@ limitations under the License.
package events
import (
"bytes"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
@ -34,7 +34,7 @@ import (
// SessionLogger is an interface that all session loggers must implement.
type SessionLogger interface {
// LogEvent logs events associated with this session.
LogEvent(fields EventFields)
LogEvent(fields EventFields) error
// Close is called when clients close on the requested "session writer".
// We ignore their requests because this writer (file) should be closed only
@ -56,59 +56,36 @@ type SessionLogger interface {
type DiskSessionLoggerConfig struct {
// SessionID is the session id of the logger
SessionID session.ID
// EventsFileName is the events file name
EventsFileName string
// StreamFileName is the byte stream file name
StreamFileName string
// DataDir is data directory for session events files
DataDir string
// Clock is the clock replacement
Clock clockwork.Clock
// RecordSessions controls if sessions are recorded along with audit events.
RecordSessions bool
// AuditLog is the audit log
AuditLog *AuditLog
}
// NewDiskSessionLogger creates new disk based session logger
func NewDiskSessionLogger(cfg DiskSessionLoggerConfig) (*DiskSessionLogger, error) {
var err error
lastPrintEvent, err := readLastPrintEvent(cfg.EventsFileName)
if err != nil {
if !trace.IsNotFound(err) {
return nil, trace.Wrap(err)
}
// no last event is ok
lastPrintEvent = nil
}
// if session recording is on, create a stream file that stores all the
// bytes of the session
var fstream *os.File
if cfg.RecordSessions {
fstream, err = os.OpenFile(cfg.StreamFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640)
if err != nil {
return nil, trace.Wrap(err)
}
}
// create a new session file that stores all the audit events that occured
// related to the session
fevents, err := os.OpenFile(cfg.EventsFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640)
indexFile, err := os.OpenFile(filepath.Join(cfg.DataDir, fmt.Sprintf("%v.index", cfg.SessionID.String())), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640)
if err != nil {
return nil, trace.Wrap(err)
}
sessionLogger := &DiskSessionLogger{
DiskSessionLoggerConfig: cfg,
Entry: log.WithFields(log.Fields{
trace.Component: teleport.ComponentAuditLog,
trace.ComponentFields: log.Fields{
"sid": cfg.SessionID,
},
}),
sid: cfg.SessionID,
streamFile: fstream,
eventsFile: fevents,
clock: cfg.Clock,
lastPrintEvent: lastPrintEvent,
recordSessions: cfg.RecordSessions,
indexFile: indexFile,
lastEventIndex: -1,
lastChunkIndex: -1,
}
return sessionLogger, nil
}
@ -117,86 +94,28 @@ func NewDiskSessionLogger(cfg DiskSessionLoggerConfig) (*DiskSessionLogger, erro
// property of the disk based logger is that it never fails and can be used as
// a fallback implementation behind more sophisticated loggers.
type DiskSessionLogger struct {
DiskSessionLoggerConfig
*log.Entry
sync.Mutex
sid session.ID
// eventsFile stores logged events, just like the main logger, except
// these are all associated with this session
indexFile *os.File
eventsFile *os.File
chunksFile *os.File
// streamFile stores bytes from the session terminal I/O for replaying
streamFile *os.File
// clock provides real of fake clock (for tests)
clock clockwork.Clock
// lastPrintEvent is the last written session event
lastPrintEvent *printEvent
lastEventIndex int64
lastChunkIndex int64
// recordSessions controls if sessions are recorded along with audit events.
recordSessions bool
}
// LogEvent logs an event associated with this session
func (sl *DiskSessionLogger) LogEvent(fields EventFields) {
if _, ok := fields[EventTime]; !ok {
fields[EventTime] = sl.clock.Now().In(time.UTC).Round(time.Millisecond)
}
if sl.eventsFile != nil {
_, err := fmt.Fprintln(sl.eventsFile, eventToLine(fields))
if err != nil {
log.Error(trace.DebugReport(err))
}
}
}
// readLastEvent reads last event from the file, it opens
// the file in read only mode and closes it after
func readLastPrintEvent(fileName string) (*printEvent, error) {
f, err := os.Open(fileName)
if err != nil {
return nil, trace.ConvertSystemError(err)
}
defer f.Close()
info, err := f.Stat()
if err != nil {
return nil, trace.ConvertSystemError(err)
}
if info.Size() == 0 {
return nil, trace.NotFound("no events found")
}
bufSize := int64(512)
if info.Size() < bufSize {
bufSize = info.Size()
}
buf := make([]byte, bufSize)
_, err = f.ReadAt(buf, info.Size()-bufSize)
if err != nil {
return nil, trace.ConvertSystemError(err)
}
lines := bytes.Split(buf, []byte("\n"))
if len(lines) == 0 {
return nil, trace.BadParameter("expected some lines, got %q", string(buf))
}
for i := len(lines) - 1; i > 0; i-- {
line := bytes.TrimSpace(lines[i])
if len(line) == 0 {
continue
}
var event printEvent
if err = json.Unmarshal(line, &event); err != nil {
return nil, trace.Wrap(err)
}
if event.Type != SessionPrintEvent {
continue
}
return &event, nil
}
return nil, trace.NotFound("no session print events found")
func (sl *DiskSessionLogger) LogEvent(fields EventFields) error {
panic("does not work")
}
// Close is called when clients close on the requested "session writer".
@ -213,17 +132,98 @@ func (sl *DiskSessionLogger) Finalize() error {
sl.Lock()
defer sl.Unlock()
return sl.finalize()
}
func (sl *DiskSessionLogger) finalize() error {
auditOpenFiles.Dec()
if sl.streamFile != nil {
sl.streamFile.Close()
sl.streamFile = nil
}
if sl.eventsFile != nil {
sl.eventsFile.Close()
sl.eventsFile = nil
if sl.indexFile != nil {
sl.indexFile.Close()
}
if sl.chunksFile != nil {
sl.chunksFile.Close()
}
if sl.eventsFile != nil {
sl.eventsFile.Close()
}
return nil
}
// eventsFileName consists of session id and the first global event index recorded there
func eventsFileName(dataDir string, sessionID session.ID, eventIndex int64) string {
return filepath.Join(dataDir, fmt.Sprintf("%v-%v.events", sessionID.String(), eventIndex))
}
// chunksFileName consists of session id and the first global offset recorded
func chunksFileName(dataDir string, sessionID session.ID, offset int64) string {
return filepath.Join(dataDir, fmt.Sprintf("%v-%v.chunks", sessionID.String(), offset))
}
func (sl *DiskSessionLogger) openEventsFile(eventIndex int64) error {
if sl.eventsFile != nil {
err := sl.eventsFile.Close()
if err != nil {
sl.Warningf("Failed to close file: %v", trace.DebugReport(err))
}
}
eventsFileName := eventsFileName(sl.DataDir, sl.SessionID, eventIndex)
// udpate the index file to write down that new events file has been created
data, err := json.Marshal(indexEntry{
FileName: filepath.Base(eventsFileName),
Type: fileTypeEvents,
Index: eventIndex,
})
if err != nil {
return trace.Wrap(err)
}
_, err = fmt.Fprintf(sl.indexFile, "%v\n", string(data))
if err != nil {
return trace.Wrap(err)
}
// open new events file for writing
sl.eventsFile, err = os.OpenFile(eventsFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640)
if err != nil {
return trace.Wrap(err)
}
return nil
}
func (sl *DiskSessionLogger) openChunksFile(offset int64) error {
if sl.chunksFile != nil {
err := sl.chunksFile.Close()
if err != nil {
sl.Warningf("Failed to close file: %v", trace.DebugReport(err))
}
}
chunksFileName := chunksFileName(sl.DataDir, sl.SessionID, offset)
// udpate the index file to write down that new chunks file has been created
data, err := json.Marshal(indexEntry{
FileName: filepath.Base(chunksFileName),
Type: fileTypeChunks,
Offset: offset,
})
if err != nil {
return trace.Wrap(err)
}
_, err = fmt.Fprintf(sl.indexFile, "%v\n", string(data))
if err != nil {
return trace.Wrap(err)
}
// open new chunks file for writing
sl.chunksFile, err = os.OpenFile(chunksFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640)
if err != nil {
return trace.Wrap(err)
}
return nil
}
@ -233,49 +233,73 @@ func (sl *DiskSessionLogger) WriteChunk(chunk *SessionChunk) (written int, err e
sl.Lock()
defer sl.Unlock()
// when session recording is turned off, don't record the session byte stream
if sl.recordSessions == false {
// this section enforces the following invariant:
// a single events file only contains successive events
if sl.lastEventIndex == -1 || chunk.EventIndex-1 != sl.lastEventIndex {
if err := sl.openEventsFile(chunk.EventIndex); err != nil {
return -1, trace.Wrap(err)
}
}
sl.lastEventIndex = chunk.EventIndex
if chunk.EventType != SessionPrintEvent {
if chunk.EventType == SessionEndEvent {
defer sl.closeLogger()
}
var fields EventFields
err := json.Unmarshal(chunk.Data, &fields)
if err != nil {
return -1, trace.Wrap(err)
}
fields[EventIndex] = chunk.EventIndex
fields[EventTime] = sl.Clock.Now().In(time.UTC).Round(time.Millisecond)
fields[EventType] = chunk.EventType
data, err := json.Marshal(fields)
if err != nil {
return -1, trace.Wrap(err)
}
if err := sl.AuditLog.emitAuditEvent(chunk.EventType, fields); err != nil {
return -1, trace.Wrap(err)
}
return fmt.Fprintln(sl.eventsFile, string(data))
}
if !sl.RecordSessions {
return len(chunk.Data), nil
}
if sl.streamFile == nil || sl.eventsFile == nil {
return 0, trace.BadParameter("session %v: attempt to write to a closed file", sl.sid)
}
if written, err = sl.streamFile.Write(chunk.Data); err != nil {
return written, trace.Wrap(err)
}
err = sl.writePrintEvent(time.Unix(0, chunk.Time), len(chunk.Data))
return written, trace.Wrap(err)
}
// writePrintEvent logs print event indicating write to the session
func (sl *DiskSessionLogger) writePrintEvent(start time.Time, bytesWritten int) error {
start = start.In(time.UTC).Round(time.Millisecond)
offset := int64(0)
delayMilliseconds := int64(0)
if sl.lastPrintEvent != nil {
offset = sl.lastPrintEvent.Offset + sl.lastPrintEvent.Bytes
delayMilliseconds = diff(sl.lastPrintEvent.Start, start) + sl.lastPrintEvent.DelayMilliseconds
eventStart := time.Unix(0, chunk.Time).In(time.UTC).Round(time.Millisecond)
// this section enforces the following invariant:
// a single chunks file only contains successive chunks
if sl.lastChunkIndex == -1 || chunk.ChunkIndex-1 != sl.lastChunkIndex {
if err := sl.openChunksFile(chunk.Offset); err != nil {
return -1, trace.Wrap(err)
}
}
sl.lastChunkIndex = chunk.ChunkIndex
event := printEvent{
Start: start,
Start: eventStart,
Type: SessionPrintEvent,
Bytes: int64(bytesWritten),
DelayMilliseconds: delayMilliseconds,
Offset: offset,
Bytes: int64(len(chunk.Data)),
DelayMilliseconds: chunk.Delay,
Offset: chunk.Offset,
EventIndex: chunk.EventIndex,
ChunkIndex: chunk.ChunkIndex,
}
bytes, err := json.Marshal(event)
if err != nil {
return trace.Wrap(err)
return -1, trace.Wrap(err)
}
_, err = fmt.Fprintln(sl.eventsFile, string(bytes))
if err != nil {
return trace.Wrap(err)
return -1, trace.Wrap(err)
}
return sl.chunksFile.Write(chunk.Data)
}
func (sl *DiskSessionLogger) closeLogger() {
sl.AuditLog.removeLogger(sl.SessionID.String())
if err := sl.finalize(); err != nil {
log.Error(err)
}
sl.lastPrintEvent = &event
return trace.Wrap(err)
}
func diff(before, after time.Time) int64 {
@ -286,6 +310,19 @@ func diff(before, after time.Time) int64 {
return d
}
const (
fileTypeChunks = "chunks"
fileTypeEvents = "events"
)
type indexEntry struct {
FileName string `json:"file_name"`
Type string `json:"type"`
Index int64 `json:"index"`
Offset int64 `json:"offset,"`
authServer string
}
type printEvent struct {
// Start is event start
Start time.Time `json:"time"`
@ -297,4 +334,8 @@ type printEvent struct {
DelayMilliseconds int64 `json:"ms"`
// Offset int64 is the offset in bytes in the session file
Offset int64 `json:"offset"`
// EventIndex is the global event index
EventIndex int64 `json:"ei"`
// ChunkIndex is the global chunk index
ChunkIndex int64 `json:"ci"`
}

View file

@ -40,9 +40,14 @@ const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
// SessionSlice is a slice of submitted chunks
type SessionSlice struct {
Namespace string `protobuf:"bytes,1,opt,name=Namespace,json=namespace,proto3" json:"Namespace,omitempty"`
SessionID string `protobuf:"bytes,2,opt,name=SessionID,json=sessionID,proto3" json:"SessionID,omitempty"`
Chunks []*SessionChunk `protobuf:"bytes,3,rep,name=Chunks,json=chunks" json:"Chunks,omitempty"`
// Namespace is a session namespace
Namespace string `protobuf:"bytes,1,opt,name=Namespace,json=namespace,proto3" json:"Namespace,omitempty"`
// SessionID is a session ID associated with this chunk
SessionID string `protobuf:"bytes,2,opt,name=SessionID,json=sessionID,proto3" json:"SessionID,omitempty"`
// Chunks is a list of submitted session chunks
Chunks []*SessionChunk `protobuf:"bytes,3,rep,name=Chunks,json=chunks" json:"Chunks,omitempty"`
// Version specifies session slice version
Version int64 `protobuf:"varint,4,opt,name=Version,json=version,proto3" json:"Version,omitempty"`
}
func (m *SessionSlice) Reset() { *m = SessionSlice{} }
@ -59,10 +64,20 @@ func (m *SessionSlice) GetChunks() []*SessionChunk {
// SessionChunk is a chunk to be posted in the context of the session
type SessionChunk struct {
// Time is the occurrence of this event
// Time is the occurence of this event
Time int64 `protobuf:"varint,2,opt,name=Time,json=time,proto3" json:"Time,omitempty"`
// Data is captured data
// Data is captured data, contains event fields in case of event, session data otherwise
Data []byte `protobuf:"bytes,3,opt,name=Data,json=data,proto3" json:"Data,omitempty"`
// EventType is event type
EventType string `protobuf:"bytes,4,opt,name=EventType,json=eventType,proto3" json:"EventType,omitempty"`
// EventIndex is the event global index
EventIndex int64 `protobuf:"varint,5,opt,name=EventIndex,json=eventIndex,proto3" json:"EventIndex,omitempty"`
// Index is the autoincremented chunk index
ChunkIndex int64 `protobuf:"varint,6,opt,name=ChunkIndex,json=chunkIndex,proto3" json:"ChunkIndex,omitempty"`
// Offset is an offset from the previous chunk in bytes
Offset int64 `protobuf:"varint,7,opt,name=Offset,json=offset,proto3" json:"Offset,omitempty"`
// Delay is a delay from the previous event in milliseconds
Delay int64 `protobuf:"varint,8,opt,name=Delay,json=delay,proto3" json:"Delay,omitempty"`
}
func (m *SessionChunk) Reset() { *m = SessionChunk{} }
@ -220,6 +235,11 @@ func (m *SessionSlice) MarshalTo(data []byte) (int, error) {
i += n
}
}
if m.Version != 0 {
data[i] = 0x20
i++
i = encodeVarintSlice(data, i, uint64(m.Version))
}
return i, nil
}
@ -249,6 +269,32 @@ func (m *SessionChunk) MarshalTo(data []byte) (int, error) {
i = encodeVarintSlice(data, i, uint64(len(m.Data)))
i += copy(data[i:], m.Data)
}
if len(m.EventType) > 0 {
data[i] = 0x22
i++
i = encodeVarintSlice(data, i, uint64(len(m.EventType)))
i += copy(data[i:], m.EventType)
}
if m.EventIndex != 0 {
data[i] = 0x28
i++
i = encodeVarintSlice(data, i, uint64(m.EventIndex))
}
if m.ChunkIndex != 0 {
data[i] = 0x30
i++
i = encodeVarintSlice(data, i, uint64(m.ChunkIndex))
}
if m.Offset != 0 {
data[i] = 0x38
i++
i = encodeVarintSlice(data, i, uint64(m.Offset))
}
if m.Delay != 0 {
data[i] = 0x40
i++
i = encodeVarintSlice(data, i, uint64(m.Delay))
}
return i, nil
}
@ -296,6 +342,9 @@ func (m *SessionSlice) Size() (n int) {
n += 1 + l + sovSlice(uint64(l))
}
}
if m.Version != 0 {
n += 1 + sovSlice(uint64(m.Version))
}
return n
}
@ -309,6 +358,22 @@ func (m *SessionChunk) Size() (n int) {
if l > 0 {
n += 1 + l + sovSlice(uint64(l))
}
l = len(m.EventType)
if l > 0 {
n += 1 + l + sovSlice(uint64(l))
}
if m.EventIndex != 0 {
n += 1 + sovSlice(uint64(m.EventIndex))
}
if m.ChunkIndex != 0 {
n += 1 + sovSlice(uint64(m.ChunkIndex))
}
if m.Offset != 0 {
n += 1 + sovSlice(uint64(m.Offset))
}
if m.Delay != 0 {
n += 1 + sovSlice(uint64(m.Delay))
}
return n
}
@ -443,6 +508,25 @@ func (m *SessionSlice) Unmarshal(data []byte) error {
return err
}
iNdEx = postIndex
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType)
}
m.Version = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSlice
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.Version |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipSlice(data[iNdEx:])
@ -543,6 +627,111 @@ func (m *SessionChunk) Unmarshal(data []byte) error {
m.Data = []byte{}
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field EventType", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSlice
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthSlice
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.EventType = string(data[iNdEx:postIndex])
iNdEx = postIndex
case 5:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field EventIndex", wireType)
}
m.EventIndex = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSlice
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.EventIndex |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 6:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field ChunkIndex", wireType)
}
m.ChunkIndex = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSlice
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.ChunkIndex |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 7:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Offset", wireType)
}
m.Offset = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSlice
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.Offset |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 8:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Delay", wireType)
}
m.Delay = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSlice
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.Delay |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipSlice(data[iNdEx:])
@ -672,25 +861,31 @@ var (
func init() { proto.RegisterFile("slice.proto", fileDescriptorSlice) }
var fileDescriptorSlice = []byte{
// 316 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x5c, 0x90, 0x41, 0x4b, 0xc3, 0x30,
0x18, 0x86, 0x17, 0x3b, 0x8a, 0xcb, 0x76, 0x90, 0x20, 0x52, 0xa6, 0x94, 0xb1, 0x53, 0x0f, 0x9a,
0xc2, 0x04, 0xef, 0xea, 0x14, 0x04, 0xf1, 0xd0, 0x79, 0xf2, 0x22, 0x69, 0xf7, 0xd9, 0x05, 0xd7,
0x24, 0x34, 0x5f, 0xd5, 0xfe, 0x13, 0x7f, 0x92, 0x47, 0x7f, 0x82, 0xcc, 0x3f, 0x22, 0x4d, 0x3b,
0x41, 0x6f, 0xf9, 0xde, 0xe7, 0x4d, 0xde, 0x7c, 0x2f, 0x1d, 0xda, 0xb5, 0xcc, 0x80, 0x9b, 0x52,
0xa3, 0x66, 0x3e, 0xbc, 0x80, 0x42, 0x3b, 0x7e, 0xc8, 0x25, 0xae, 0xaa, 0x94, 0x67, 0xba, 0x88,
0xf3, 0xd2, 0x64, 0x27, 0x90, 0x69, 0x5b, 0x5b, 0x84, 0x6e, 0xcc, 0x05, 0xc2, 0xab, 0xa8, 0x63,
0x5c, 0xc9, 0x72, 0xf9, 0x68, 0x44, 0x89, 0x75, 0x9c, 0x6b, 0x9d, 0xaf, 0x41, 0x18, 0x69, 0xbb,
0x63, 0x2c, 0x8c, 0x8c, 0x85, 0x52, 0x1a, 0x05, 0x4a, 0xad, 0x6c, 0x9b, 0x31, 0x3e, 0xec, 0xa8,
0x9b, 0xd2, 0xea, 0x29, 0x86, 0xc2, 0x60, 0xdd, 0xc2, 0xe9, 0x1b, 0x1d, 0x2d, 0xc0, 0x5a, 0xa9,
0xd5, 0xa2, 0xf9, 0x16, 0x3b, 0xa2, 0x83, 0x3b, 0x51, 0x80, 0x35, 0x22, 0x83, 0x80, 0x4c, 0x48,
0x34, 0x48, 0x06, 0x6a, 0x2b, 0x34, 0xb4, 0x73, 0xdf, 0xcc, 0x83, 0x9d, 0x96, 0xda, 0xad, 0xc0,
0x8e, 0xa9, 0x7f, 0xb9, 0xaa, 0xd4, 0xb3, 0x0d, 0xbc, 0x89, 0x17, 0x0d, 0x67, 0xfb, 0xbc, 0xdd,
0x8e, 0x77, 0x77, 0x1c, 0x4c, 0xfc, 0xcc, 0x79, 0xa6, 0x67, 0xbf, 0xc9, 0x4e, 0x67, 0x8c, 0xf6,
0xef, 0x65, 0x01, 0xee, 0x59, 0x2f, 0xe9, 0xa3, 0x2c, 0xa0, 0xd1, 0xe6, 0x02, 0x45, 0xe0, 0x4d,
0x48, 0x34, 0x4a, 0xfa, 0x4b, 0x81, 0x62, 0x96, 0xd0, 0xdd, 0xf3, 0x6a, 0x29, 0xf1, 0x56, 0xe7,
0xec, 0x9a, 0xb2, 0x45, 0x95, 0x16, 0x12, 0xff, 0xec, 0xf0, 0x3f, 0xd7, 0xa9, 0xe3, 0x03, 0xde,
0xf6, 0xc0, 0xb7, 0x3d, 0xf0, 0xab, 0xa6, 0x87, 0x69, 0x2f, 0x22, 0x17, 0x7b, 0x1f, 0x9b, 0x90,
0x7c, 0x6e, 0x42, 0xf2, 0xb5, 0x09, 0xc9, 0xfb, 0x77, 0xd8, 0x4b, 0x7d, 0xe7, 0x3a, 0xfd, 0x09,
0x00, 0x00, 0xff, 0xff, 0x21, 0xc6, 0xeb, 0xd4, 0xae, 0x01, 0x00, 0x00,
// 405 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x5c, 0x91, 0xc1, 0x8e, 0xd3, 0x30,
0x10, 0x86, 0xd7, 0xa4, 0x4d, 0x37, 0xde, 0x3d, 0x20, 0x6b, 0xb5, 0xb2, 0x0a, 0x8a, 0xaa, 0x9e,
0x72, 0x80, 0x44, 0x5a, 0x9e, 0x00, 0xe8, 0x22, 0xad, 0x84, 0x40, 0x4a, 0x57, 0x1c, 0xb8, 0x20,
0x37, 0x99, 0xa6, 0x16, 0x8d, 0x6d, 0xc5, 0x93, 0x85, 0xbc, 0x06, 0x27, 0x9e, 0x87, 0x13, 0x47,
0x1e, 0x01, 0x95, 0x17, 0x41, 0xb6, 0xd3, 0x45, 0xec, 0x2d, 0xff, 0xf7, 0x8d, 0x3d, 0x13, 0x0f,
0x3d, 0xb3, 0x7b, 0x59, 0x41, 0x6e, 0x3a, 0x8d, 0x9a, 0xc5, 0x70, 0x07, 0x0a, 0xed, 0xfc, 0x63,
0x23, 0x71, 0xd7, 0x6f, 0xf2, 0x4a, 0xb7, 0x45, 0xd3, 0x99, 0xea, 0x39, 0x54, 0xda, 0x0e, 0x16,
0x61, 0x8c, 0x8d, 0x40, 0xf8, 0x22, 0x86, 0x02, 0x77, 0xb2, 0xab, 0x3f, 0x19, 0xd1, 0xe1, 0x50,
0x34, 0x5a, 0x37, 0x7b, 0x10, 0x46, 0xda, 0xf1, 0xb3, 0x10, 0x46, 0x16, 0x42, 0x29, 0x8d, 0x02,
0xa5, 0x56, 0x36, 0xf4, 0x98, 0x3f, 0x19, 0xad, 0x4f, 0x9b, 0x7e, 0x5b, 0x40, 0x6b, 0x70, 0x08,
0x72, 0xf9, 0x8d, 0xd0, 0xf3, 0x35, 0x58, 0x2b, 0xb5, 0x5a, 0xbb, 0xb9, 0xd8, 0x53, 0x9a, 0xbc,
0x13, 0x2d, 0x58, 0x23, 0x2a, 0xe0, 0x64, 0x41, 0xb2, 0xa4, 0x4c, 0xd4, 0x11, 0x38, 0x3b, 0x56,
0xdf, 0xac, 0xf8, 0xa3, 0x60, 0xed, 0x11, 0xb0, 0x67, 0x34, 0x7e, 0xbd, 0xeb, 0xd5, 0x67, 0xcb,
0xa3, 0x45, 0x94, 0x9d, 0x5d, 0x5d, 0xe4, 0xe1, 0xf7, 0xf2, 0xf1, 0x8c, 0x97, 0x65, 0x5c, 0xf9,
0x1a, 0xc6, 0xe9, 0xec, 0x03, 0x74, 0x8e, 0xf3, 0xc9, 0x82, 0x64, 0x51, 0x39, 0xbb, 0x0b, 0x71,
0xf9, 0xe3, 0xdf, 0x50, 0xfe, 0x08, 0x63, 0x74, 0x72, 0x2b, 0x5b, 0xf0, 0x1d, 0xa3, 0x72, 0x82,
0xb2, 0x05, 0xc7, 0x56, 0x02, 0x05, 0x8f, 0x16, 0x24, 0x3b, 0x2f, 0x27, 0xb5, 0x40, 0xe1, 0xc6,
0xbb, 0x76, 0x1d, 0x6f, 0x07, 0x03, 0xfe, 0xd2, 0xa4, 0x4c, 0xe0, 0x08, 0x58, 0x4a, 0xa9, 0xb7,
0x37, 0xaa, 0x86, 0xaf, 0x7c, 0xea, 0xef, 0xa2, 0x70, 0x4f, 0x9c, 0xf7, 0xed, 0x82, 0x8f, 0x83,
0xaf, 0xee, 0x09, 0xbb, 0xa4, 0xf1, 0xfb, 0xed, 0xd6, 0x02, 0xf2, 0x99, 0x77, 0xb1, 0xf6, 0x89,
0x5d, 0xd0, 0xe9, 0x0a, 0xf6, 0x62, 0xe0, 0xa7, 0x1e, 0x4f, 0x6b, 0x17, 0xae, 0x4a, 0x7a, 0xfa,
0xb2, 0xaf, 0x25, 0xbe, 0xd5, 0x0d, 0x7b, 0x43, 0xd9, 0xba, 0xdf, 0xb4, 0x12, 0xff, 0x7b, 0xea,
0x87, 0xcf, 0xe3, 0xe9, 0xfc, 0x32, 0x0f, 0xfb, 0xca, 0x8f, 0xfb, 0xca, 0xaf, 0xdd, 0xbe, 0x96,
0x27, 0x19, 0x79, 0xf5, 0xf8, 0xe7, 0x21, 0x25, 0xbf, 0x0e, 0x29, 0xf9, 0x7d, 0x48, 0xc9, 0xf7,
0x3f, 0xe9, 0xc9, 0x26, 0xf6, 0x55, 0x2f, 0xfe, 0x06, 0x00, 0x00, 0xff, 0xff, 0xa6, 0xf1, 0x9c,
0x76, 0x56, 0x02, 0x00, 0x00,
}

View file

@ -6,17 +6,32 @@ import "google/protobuf/empty.proto";
// SessionSlice is a slice of submitted chunks
message SessionSlice {
// Namespace is a session namespace
string Namespace = 1;
// SessionID is a session ID associated with this chunk
string SessionID = 2;
// Chunks is a list of submitted session chunks
repeated SessionChunk Chunks = 3;
// Version specifies session slice version
int64 Version = 4;
}
// SessionChunk is a chunk to be posted in the context of the session
message SessionChunk {
// Time is the occurence of this event
int64 Time = 2;
// Data is captured data
// Data is captured data, contains event fields in case of event, session data otherwise
bytes Data = 3;
// EventType is event type
string EventType = 4;
// EventIndex is the event global index
int64 EventIndex = 5;
// Index is the autoincremented chunk index
int64 ChunkIndex = 6;
// Offset is an offset from the previous chunk in bytes
int64 Offset = 7;
// Delay is a delay from the previous event in milliseconds
int64 Delay = 8;
}
service AuditLog {

View file

@ -25,7 +25,6 @@ import (
"fmt"
"io"
"net"
"runtime/debug"
"sync"
"time"
@ -204,7 +203,6 @@ func (a *Agent) getState() string {
// Close signals to close all connections and operations
func (a *Agent) Close() error {
a.Info(string(debug.Stack()))
a.cancel()
return nil
}

View file

@ -390,6 +390,7 @@ func (process *TeleportProcess) initAuthService(authority sshca.Authority) error
auditConfig := events.AuditLogConfig{
DataDir: filepath.Join(cfg.DataDir, "log"),
RecordSessions: recordSessions,
ServerID: cfg.HostUUID,
}
if runtime.GOOS == teleport.LinuxOS {
// if the user member of adm linux group,
@ -874,6 +875,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
regular.SetCiphers(cfg.Ciphers),
regular.SetKEXAlgorithms(cfg.KEXAlgorithms),
regular.SetMACAlgorithms(cfg.MACAlgorithms),
regular.SetNamespace(defaults.Namespace),
)
if err != nil {
return trace.Wrap(err)

View file

@ -294,6 +294,15 @@ func New(addr utils.NetAddr,
}
}
// TODO(klizhentas): replace function arguments with struct
if s.alog == nil {
return nil, trace.BadParameter("setup valid AuditLog parameter using SetAuditLog")
}
if s.namespace == "" {
return nil, trace.BadParameter("setup valid namespace parameter using SetNamespace")
}
var component string
if s.proxyMode {
component = teleport.ComponentProxy

View file

@ -473,6 +473,8 @@ func (s *SrvSuite) TestProxyReverseTunnel(c *C) {
utils.NetAddr{},
SetProxyMode(reverseTunnelServer),
SetSessionServer(s.proxyClient),
SetAuditLog(s.nodeClient),
SetNamespace(defaults.Namespace),
)
c.Assert(err, IsNil)
c.Assert(proxy.Start(), IsNil)
@ -552,7 +554,10 @@ func (s *SrvSuite) TestProxyReverseTunnel(c *C) {
},
),
SetSessionServer(s.nodeClient),
SetAuditLog(s.nodeClient),
SetNamespace(defaults.Namespace),
)
c.Assert(err, IsNil)
srv2.uuid = bobAddr
c.Assert(err, IsNil)
c.Assert(srv2.Start(), IsNil)
@ -638,6 +643,8 @@ func (s *SrvSuite) TestProxyRoundRobin(c *C) {
utils.NetAddr{},
SetProxyMode(reverseTunnelServer),
SetSessionServer(s.proxyClient),
SetAuditLog(s.nodeClient),
SetNamespace(defaults.Namespace),
)
c.Assert(err, IsNil)
c.Assert(proxy.Start(), IsNil)
@ -734,6 +741,8 @@ func (s *SrvSuite) TestProxyDirectAccess(c *C) {
utils.NetAddr{},
SetProxyMode(reverseTunnelServer),
SetSessionServer(s.proxyClient),
SetAuditLog(s.nodeClient),
SetNamespace(defaults.Namespace),
)
c.Assert(err, IsNil)
c.Assert(proxy.Start(), IsNil)
@ -835,6 +844,8 @@ func (s *SrvSuite) TestLimiter(c *C) {
SetLimiter(limiter),
SetShell("/bin/sh"),
SetSessionServer(s.nodeClient),
SetAuditLog(s.nodeClient),
SetNamespace(defaults.Namespace),
)
c.Assert(err, IsNil)
c.Assert(srv.Start(), IsNil)

View file

@ -89,7 +89,7 @@ func (r *SessionRegistry) Close() {
func (s *SessionRegistry) OpenSession(ch ssh.Channel, req *ssh.Request, ctx *ServerContext) error {
if ctx.session != nil {
// emit "joined session" event:
s.srv.EmitAuditEvent(events.SessionJoinEvent, events.EventFields{
ctx.session.recorder.alog.EmitAuditEvent(events.SessionJoinEvent, events.EventFields{
events.SessionEventID: string(ctx.session.id),
events.EventNamespace: s.srv.GetNamespace(),
events.EventLogin: ctx.Identity.Login,
@ -137,7 +137,7 @@ func (s *SessionRegistry) leaveSession(party *party) error {
}
// emit "session leave" event (party left the session)
s.srv.EmitAuditEvent(events.SessionLeaveEvent, events.EventFields{
sess.recorder.alog.EmitAuditEvent(events.SessionLeaveEvent, events.EventFields{
events.SessionEventID: string(sess.id),
events.EventUser: party.user,
events.SessionServerID: party.serverID,
@ -165,18 +165,17 @@ func (s *SessionRegistry) leaveSession(party *party) error {
delete(s.sessions, sess.id)
s.Unlock()
// close recorder to free up associated resources
// and flush data
if sess.recorder != nil {
sess.recorder.Close()
}
// send an event indicating that this session has ended
s.srv.EmitAuditEvent(events.SessionEndEvent, events.EventFields{
sess.recorder.alog.EmitAuditEvent(events.SessionEndEvent, events.EventFields{
events.SessionEventID: string(sess.id),
events.EventUser: party.user,
events.EventNamespace: s.srv.GetNamespace(),
})
// close recorder to free up associated resources
// and flush data
sess.recorder.Close()
if err := sess.Close(); err != nil {
log.Error(err)
}
@ -220,7 +219,7 @@ func (s *SessionRegistry) NotifyWinChange(params rsession.TerminalParams, ctx *S
}
sid := ctx.session.id
// report this to the event/audit log:
s.srv.EmitAuditEvent(events.ResizeEvent, events.EventFields{
ctx.session.recorder.alog.EmitAuditEvent(events.ResizeEvent, events.EventFields{
events.EventNamespace: s.srv.GetNamespace(),
events.SessionEventID: sid,
events.EventLogin: ctx.Identity.Login,
@ -582,8 +581,17 @@ func (s *session) start(ch ssh.Channel, ctx *ServerContext) error {
params := s.term.GetTerminalParams()
// start recording this session
auditLog := s.registry.srv.GetAuditLog()
var err error
s.recorder, err = newSessionRecorder(auditLog, ctx.srv.GetNamespace(), s.id)
if err != nil {
return trace.Wrap(err)
}
s.writer.addWriter("session-recorder", s.recorder, true)
// emit "new session created" event:
s.registry.srv.EmitAuditEvent(events.SessionStartEvent, events.EventFields{
s.recorder.alog.EmitAuditEvent(events.SessionStartEvent, events.EventFields{
events.EventNamespace: ctx.srv.GetNamespace(),
events.SessionEventID: string(s.id),
events.SessionServerID: ctx.srv.ID(),
@ -594,17 +602,6 @@ func (s *session) start(ch ssh.Channel, ctx *ServerContext) error {
events.TerminalSize: params.Serialize(),
})
// start recording this session
auditLog := s.registry.srv.GetAuditLog()
if auditLog != nil {
var err error
s.recorder, err = newSessionRecorder(auditLog, ctx.srv.GetNamespace(), s.id)
if err != nil {
return trace.Wrap(err)
}
s.writer.addWriter("session-recorder", s.recorder, true)
}
// start asynchronous loop of synchronizing session state with
// the session server (terminal size and activity)
go s.pollAndSync()

View file

@ -19,6 +19,7 @@ package state
import (
"context"
"encoding/json"
"io"
"io/ioutil"
"time"
@ -205,6 +206,8 @@ type CachingAuditLog struct {
throttleStart time.Time
waitCtx context.Context
waitCtxCancel context.CancelFunc
lastChunk *events.SessionChunk
eventIndex int64
}
func (ll *CachingAuditLog) add(chunks []*events.SessionChunk) {
@ -298,6 +301,14 @@ type flushOpts struct {
noRetry bool
}
func diff(before, after time.Time) int64 {
d := int64(after.Sub(before) / time.Millisecond)
if d < 0 {
return 0
}
return d
}
func (ll *CachingAuditLog) flush(opts flushOpts) {
if len(ll.chunks) == 0 {
return
@ -308,10 +319,24 @@ func (ll *CachingAuditLog) flush(opts flushOpts) {
}
}
chunks := ll.reset()
for i := range chunks {
chunk := chunks[i]
chunk.EventIndex = ll.eventIndex
ll.eventIndex += 1
if chunk.EventType == events.SessionPrintEvent {
if ll.lastChunk != nil {
chunk.Offset = ll.lastChunk.Offset + int64(len(ll.lastChunk.Data))
chunk.Delay = diff(time.Unix(0, ll.lastChunk.Time), time.Unix(0, chunk.Time)) + ll.lastChunk.Delay
chunk.ChunkIndex = ll.lastChunk.ChunkIndex + 1
}
ll.lastChunk = chunk
}
}
slice := events.SessionSlice{
Namespace: ll.Namespace,
SessionID: ll.SessionID,
Chunks: chunks,
Version: events.V2,
}
err := ll.postSlice(slice)
if err == nil {
@ -404,7 +429,18 @@ func (ll *CachingAuditLog) Close() error {
}
func (ll *CachingAuditLog) EmitAuditEvent(eventType string, fields events.EventFields) error {
return ll.Server.EmitAuditEvent(eventType, fields)
data, err := json.Marshal(fields)
if err != nil {
return trace.Wrap(err)
}
chunks := []*events.SessionChunk{
{
EventType: eventType,
Data: data,
Time: time.Now().UTC().UnixNano(),
},
}
return ll.post(chunks)
}
func (ll *CachingAuditLog) PostSessionChunk(namespace string, sid session.ID, reader io.Reader) error {
@ -414,14 +450,18 @@ func (ll *CachingAuditLog) PostSessionChunk(namespace string, sid session.ID, re
}
chunks := []*events.SessionChunk{
{
Data: data,
Time: time.Now().UTC().UnixNano(),
EventType: events.SessionPrintEvent,
Data: data,
Time: time.Now().UTC().UnixNano(),
},
}
return ll.post(chunks)
}
func (ll *CachingAuditLog) PostSessionSlice(slice events.SessionSlice) error {
for i := range slice.Chunks {
slice.Chunks[i].EventType = events.SessionPrintEvent
}
return ll.post(slice.Chunks)
}

View file

@ -60,6 +60,7 @@ func (s *CacheLogSuite) newSlice(data string) *events.SessionSlice {
Namespace: "ns",
SessionID: "s1",
Chunks: []*events.SessionChunk{chunk},
Version: events.V2,
}
}

View file

@ -216,6 +216,7 @@ func (s *WebSuite) SetUpTest(c *C) {
regular.SetProxyMode(revTunServer),
regular.SetSessionServer(s.proxyClient),
regular.SetAuditLog(s.proxyClient),
regular.SetNamespace(defaults.Namespace),
)
c.Assert(err, IsNil)