Run session migrations in the background.

Large directories with on disk recordings
take a lot of time to migrate, this patch
makes the operation async.

Do not use modification time for audit log
search, replace it with file name parsing.
This commit is contained in:
Sasha Klizhentas 2018-03-15 17:51:19 -07:00
parent dd496cab23
commit 1bcf8ae010
2 changed files with 86 additions and 26 deletions

View file

@ -56,6 +56,9 @@ const (
// LogfileExt defines the ending of the daily event log file
LogfileExt = ".log"
// sessionsMigratedEvent is a sessions migration event used internally
sessionsMigratedEvent = "sessions.migrated"
)
var (
@ -136,6 +139,17 @@ type AuditLogConfig struct {
// ExternalLog is a pluggable external log service
ExternalLog IAuditLog
// EventC is evnets channel for testing purposes, not used if emtpy
EventsC chan *AuditLogEvent
}
// AuditLogEvent is an internal audit log event
type AuditLogEvent struct {
// Type is an event type
Type string
// Error is an event error
Error error
}
// CheckAndSetDefaults checks and sets defaults
@ -199,7 +213,7 @@ func NewAuditLog(cfg AuditLogConfig) (*AuditLog, error) {
return nil, trace.Wrap(err)
}
// create a directory for session logs:
sessionDir := filepath.Join(cfg.DataDir, cfg.ServerID, SessionLogsDir)
sessionDir := filepath.Join(cfg.DataDir, cfg.ServerID, SessionLogsDir, defaults.Namespace)
if err := os.MkdirAll(sessionDir, *cfg.DirMask); err != nil {
return nil, trace.ConvertSystemError(err)
}
@ -221,6 +235,7 @@ func NewAuditLog(cfg AuditLogConfig) (*AuditLog, error) {
return nil, trace.ConvertSystemError(err)
}
}
go al.backgroundMigrateSessions()
go al.periodicCloseInactiveLoggers()
go al.periodicCleanupPlaybacks()
return al, nil
@ -801,8 +816,19 @@ func (l *AuditLog) matchingFiles(fromUTC, toUTC time.Time) ([]eventFile, error)
if fi.IsDir() || filepath.Ext(fi.Name()) != LogfileExt {
continue
}
fd := fi.ModTime().UTC()
if fd.After(fromUTC) && fd.Before(toUTC) {
base := strings.TrimSuffix(fi.Name(), filepath.Ext(fi.Name()))
fd, err := time.Parse(defaults.AuditLogTimeFormat, base)
if err != nil {
l.Warningf("Failed to parse audit log file %q format: %v", base, err)
continue
}
// File rounding in current logs is non-deterministic,
// as Round function used in rotateLog can round up to the lowest
// or the highest period. That's why this has to check both
// periods.
// Previous logic used modification time what was flaky
// as it could be changed by migrations or simply moving files
if fd.After(fromUTC.Add(-1*l.RotationPeriod)) && fd.Before(toUTC.Add(l.RotationPeriod)) {
eventFile := eventFile{
FileInfo: fi,
path: filepath.Join(l.DataDir, serverID, fi.Name()),
@ -826,18 +852,34 @@ func (l *AuditLog) moveAuditLogFile(fileName string) error {
return nil
}
// emitEvent emits event for test purposes
func (l *AuditLog) emitEvent(e AuditLogEvent) {
if l.EventsC == nil {
return
}
select {
case l.EventsC <- &e:
return
default:
l.Warningf("Blocked on the events channel.")
}
}
// migrateSessionsDir migrates session directory session by session
func (l *AuditLog) migrateSessionsDir() error {
sessionDir := filepath.Join(l.DataDir, SessionLogsDir)
_, err := utils.StatDir(sessionDir)
if err != nil {
if !trace.IsNotFound(err) {
return trace.Wrap(err)
}
l.Debugf("No V1 sessions directory, nothing to migrate.")
}
targetDir := filepath.Join(l.DataDir, l.ServerID, SessionLogsDir)
_, err := utils.StatDir(targetDir)
if err == nil {
return nil
}
if !trace.IsNotFound(err) {
return trace.Wrap(err)
}
// transform the recorded files to the new index format
recordingsDir := filepath.Join(l.DataDir, SessionLogsDir, defaults.Namespace)
targetRecordingsDir := filepath.Join(targetDir, defaults.Namespace)
fileInfos, err := listDir(recordingsDir)
if err != nil {
// source directory does not exist, means nothing to migrate
@ -865,20 +907,20 @@ func (l *AuditLog) migrateSessionsDir() error {
}
sessionID := parts[0]
sourceEventsFile := filepath.Join(recordingsDir, fmt.Sprintf("%v.session.log", sessionID))
targetEventsFile := filepath.Join(recordingsDir, fmt.Sprintf("%v-0.events.gz", sessionID))
targetEventsFile := filepath.Join(targetRecordingsDir, fmt.Sprintf("%v-0.events.gz", sessionID))
l.Debugf("Migrating, session ID %v. Compressed %v to %v", sessionID, sourceEventsFile, targetEventsFile)
err := moveAndGzipFile(sourceEventsFile, targetEventsFile)
if err != nil {
return trace.Wrap(err)
}
sourceChunksFile := filepath.Join(recordingsDir, fmt.Sprintf("%v.session.bytes", sessionID))
targetChunksFile := filepath.Join(recordingsDir, fmt.Sprintf("%v-0.chunks.gz", sessionID))
targetChunksFile := filepath.Join(targetRecordingsDir, fmt.Sprintf("%v-0.chunks.gz", sessionID))
l.Debugf("Migrating session ID %v. Compressed %v to %v", sessionID, sourceChunksFile, targetChunksFile)
err = moveAndGzipFile(sourceChunksFile, targetChunksFile)
if err != nil {
return trace.Wrap(err)
}
indexFileName := filepath.Join(recordingsDir, fmt.Sprintf("%v.index", sessionID))
indexFileName := filepath.Join(targetRecordingsDir, fmt.Sprintf("%v.index", sessionID))
eventsData, err := json.Marshal(indexEntry{
FileName: filepath.Base(targetEventsFile),
@ -911,10 +953,10 @@ func (l *AuditLog) migrateSessionsDir() error {
}
l.Debugf("Migrating session ID %v. Wrote index file %v.", sessionID, indexFileName)
}
l.Infof("Moving sessions folder from %v to %v", sessionDir, targetDir)
if err := os.Rename(sessionDir, targetDir); err != nil {
return trace.ConvertSystemError(err)
}
l.Info("Sessions migrations completed.")
l.emitEvent(AuditLogEvent{
Type: sessionsMigratedEvent,
})
return nil
}
@ -953,13 +995,19 @@ func listDir(dir string) ([]os.FileInfo, error) {
return entries, nil
}
// DELETE IN: 2.6.0 this is a background session migration process
// as session migrations can take a long time, we process
// sesions chunk by chunk
func (l *AuditLog) backgroundMigrateSessions() {
// migrate sessions directory
if err := l.migrateSessionsDir(); err != nil {
l.Errorf("Failed to migrate sessions directory: %v", trace.DebugReport(err))
}
}
// DELETE IN: 2.6.0
// runMigrations runs migrations for audit logs format
func (l *AuditLog) runMigrations() error {
// migrate sessions directory
if err := l.migrateSessionsDir(); err != nil {
return trace.Wrap(err)
}
// move audit log files to the auth server id
fileInfos, err := listDir(l.DataDir)
if err != nil {
@ -1167,7 +1215,8 @@ func (l *AuditLog) findInFile(fn string, query url.Values, total *int, limit int
// in the query:
var ef EventFields
if err = json.Unmarshal(scanner.Bytes(), &ef); err != nil {
l.Warnf("Invalid JSON in %s line %d.", fn, lineNo)
l.Warnf("invalid JSON in %s line %d", fn, lineNo)
continue
}
for i := range eventFilter {
if ef.GetString(EventType) == eventFilter[i] {

View file

@ -138,7 +138,7 @@ func (a *AuditTestSuite) TestCompatComplexLogging(c *check.C) {
// try searching (in the future)
query := fmt.Sprintf("%s=%s", EventType, SessionStartEvent)
found, err := alog.SearchEvents(now.Add(time.Hour), now.Add(time.Hour), query, 0)
found, err := alog.SearchEvents(now.Add(48*time.Hour), now.Add(72*time.Hour), query, 0)
c.Assert(err, check.IsNil)
c.Assert(len(found), check.Equals, 0)
@ -516,7 +516,7 @@ func (a *AuditTestSuite) TestSearchTwoAuthServers(c *check.C) {
// search events, start time is in the future
query := fmt.Sprintf("%s=%s", EventType, SessionStartEvent)
found, err := a.SearchEvents(startTime.Add(time.Hour), startTime.Add(time.Hour), query, 0)
found, err := a.SearchEvents(startTime.Add(48*time.Hour), startTime.Add(72*time.Hour), query, 0)
c.Assert(err, check.IsNil)
c.Assert(len(found), check.Equals, 0, comment)
@ -695,10 +695,21 @@ func (a *AuditTestSuite) TestMigrationsToV2(c *check.C) {
c.Assert(err, check.IsNil)
}
// create audit log with session recording disabled
alog, err := a.makeLog(c, a.dataDir, false)
eventsC := make(chan *AuditLogEvent, 100)
alog, err := NewAuditLog(AuditLogConfig{
DataDir: a.dataDir,
RecordSessions: false,
ServerID: "server1",
EventsC: eventsC,
})
c.Assert(err, check.IsNil)
select {
case <-eventsC:
case <-time.After(time.Second):
c.Fatalf("Failed to wait for sessions migrations to complete.")
}
// sessions have been migrated
sid := "74a5fc73-e02c-11e7-aee2-0242ac0a0101"
events, err := alog.GetSessionEvents(defaults.Namespace, session.ID(sid), 0, true)