lib/events: remove more old code

This removes support for the pre-5.1.0 streaming directory, and
removes the unused Recorder type.
This commit is contained in:
Zac Bergquist 2022-03-23 13:01:55 -06:00 committed by Zac Bergquist
parent 62f687bef7
commit 3dc33ccc32
5 changed files with 28 additions and 231 deletions

View file

@ -566,12 +566,6 @@ type SessionMetadataSetter interface {
SetClusterName(string)
}
// SetCode is a shortcut that sets code for the audit event
func SetCode(event apievents.AuditEvent, code string) apievents.AuditEvent {
event.SetCode(code)
return event
}
// Streamer creates and resumes event streams for session IDs
type Streamer interface {
// CreateAuditStream creates event stream

View file

@ -17,10 +17,12 @@ limitations under the License.
package events
import (
"context"
"encoding/json"
"sync"
"time"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/utils"
@ -68,6 +70,7 @@ func (s *ForwarderConfig) CheckAndSetDefaults() error {
}
// NewForwarder returns a new instance of session forwarder
// TODO(zmb3): this is not used outside of tests - remove it
func NewForwarder(cfg ForwarderConfig) (*Forwarder, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
@ -117,6 +120,12 @@ func (l *Forwarder) Close() error {
return l.sessionLogger.Finalize()
}
// EmitAuditEvent is not implemented
// Events are forwarded to the auth server and is then emitted from there
func (r *Forwarder) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error {
return trace.NotImplemented("not implemented")
}
// EmitAuditEventLegacy emits audit event
func (l *Forwarder) EmitAuditEventLegacy(event Event, fields EventFields) error {
err := UpdateEventFields(event, fields, l.Clock, l.UID)

View file

@ -1,198 +0,0 @@
/*
Copyright 2015-2020 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"context"
"io"
"time"
"github.com/gravitational/teleport"
apidefaults "github.com/gravitational/teleport/api/defaults"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/trace"
"github.com/sirupsen/logrus"
)
// SessionRecorder implements io.Writer to be plugged into the multi-writer
// associated with every session. It forwards session stream to the audit log
type SessionRecorder interface {
io.Writer
apievents.Emitter
Close(ctx context.Context) error
}
// DiscardRecorder discards all writes
type DiscardRecorder struct {
DiscardAuditLog
}
// Write acks all writes but discards them
func (*DiscardRecorder) Write(b []byte) (int, error) {
return len(b), nil
}
// Close does nothing and always succeeds
func (*DiscardRecorder) Close() error {
return nil
}
// GetAuditLog returns audit log associated with this recorder
func (d *DiscardRecorder) GetAuditLog() IAuditLog {
return &d.DiscardAuditLog
}
// ForwardRecorder implements io.Writer to be plugged into the multi-writer
// associated with every session. It forwards session stream to the audit log
type ForwardRecorder struct {
// ForwardRecorderConfig specifies session recorder configuration
ForwardRecorderConfig
// Entry holds the structured logger
*logrus.Entry
// AuditLog is the audit log to store session chunks
AuditLog IAuditLog
}
// ForwardRecorderConfig specifies config for session recording
type ForwardRecorderConfig struct {
// DataDir is a data directory to record
DataDir string
// SessionID defines the session to record.
SessionID session.ID
// Namespace is the session namespace.
Namespace string
// RecordSessions stores info on whether to record sessions
RecordSessions bool
// Component is a component used for logging
Component string
// ForwardTo is external audit log where events will be forwarded
ForwardTo IAuditLog
}
func (cfg *ForwardRecorderConfig) CheckAndSetDefaults() error {
if cfg.DataDir == "" {
return trace.BadParameter("missing parameter DataDir")
}
if cfg.SessionID.IsZero() {
return trace.BadParameter("missing parameter DataDir")
}
if cfg.Namespace == "" {
cfg.Namespace = apidefaults.Namespace
}
if cfg.ForwardTo == nil {
cfg.ForwardTo = &DiscardAuditLog{}
}
return nil
}
// NewForwardRecorder returns a new instance of session recorder
func NewForwardRecorder(cfg ForwardRecorderConfig) (*ForwardRecorder, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
// Always write sessions to local disk first, then forward them to the Auth
// Server later.
auditLog, err := NewForwarder(ForwarderConfig{
SessionID: cfg.SessionID,
ServerID: teleport.ComponentUpload,
DataDir: cfg.DataDir,
RecordSessions: cfg.RecordSessions,
Namespace: cfg.Namespace,
IAuditLog: cfg.ForwardTo,
})
if err != nil {
return nil, trace.Wrap(err)
}
sr := &ForwardRecorder{
ForwardRecorderConfig: cfg,
Entry: logrus.WithFields(logrus.Fields{
trace.Component: cfg.Component,
}),
AuditLog: auditLog,
}
return sr, nil
}
// GetAuditLog returns audit log associated with this recorder
func (r *ForwardRecorder) GetAuditLog() IAuditLog {
return r.AuditLog
}
// EmitAuditEvent is not implemented
// Events are forwarded to the auth server and is then emitted from there
func (r *Forwarder) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error {
return trace.NotImplemented("not implemented")
}
// Write takes a chunk and writes it into the audit log
func (r *ForwardRecorder) Write(data []byte) (int, error) {
// we are copying buffer to prevent data corruption:
// io.Copy allocates single buffer and calls multiple writes in a loop
// our PostSessionSlice is async and sends reader wrapping buffer
// to the channel. This can lead to cases when the buffer is re-used
// and data is corrupted unless we copy the data buffer in the first place
dataCopy := make([]byte, len(data))
copy(dataCopy, data)
// post the chunk of bytes to the audit log:
chunk := &SessionChunk{
EventType: SessionPrintEvent,
Data: dataCopy,
Time: time.Now().UTC().UnixNano(),
}
if err := r.AuditLog.PostSessionSlice(SessionSlice{
Namespace: r.Namespace,
SessionID: string(r.SessionID),
Chunks: []*SessionChunk{chunk},
}); err != nil {
r.Error(trace.DebugReport(err))
}
return len(data), nil
}
// Close closes audit log session recorder
func (r *ForwardRecorder) Close() error {
var errors []error
err := r.AuditLog.Close()
errors = append(errors, err)
// wait until all events from recorder get flushed, it is important
// to do so before we send SessionEndEvent to advise the audit log
// to release resources associated with this session.
// not doing so will not result in memory leak, but could result
// in missing playback events
context, cancel := context.WithTimeout(context.TODO(), defaults.ReadHeadersTimeout)
defer cancel() // releases resources if slowOperation completes before timeout elapses
err = r.AuditLog.WaitForDelivery(context)
if err != nil {
errors = append(errors, err)
r.Warnf("Timeout waiting for session to flush events: %v", trace.DebugReport(err))
}
return trace.NewAggregate(errors...)
}

View file

@ -2098,38 +2098,30 @@ func (process *TeleportProcess) initUploaderService(streamer events.Streamer, au
if err != nil {
return trace.Wrap(err)
}
// prepare dirs for uploader
streamingDir := []string{process.Config.DataDir, teleport.LogsDir, teleport.ComponentUpload, events.StreamingLogsDir, apidefaults.Namespace}
paths := [][]string{
// DELETE IN (5.1.0)
// this directory will no longer be used after migration to 5.1.0
{process.Config.DataDir, teleport.LogsDir, teleport.ComponentUpload, events.SessionLogsDir, apidefaults.Namespace},
// This directory will remain to be used after migration to 5.1.0
streamingDir,
}
for _, path := range paths {
for i := 1; i < len(path); i++ {
dir := filepath.Join(path[:i+1]...)
log.Infof("Creating directory %v.", dir)
err := os.Mkdir(dir, 0755)
err = trace.ConvertSystemError(err)
if err != nil {
if !trace.IsAlreadyExists(err) {
return trace.Wrap(err)
}
// prepare dir for uploader
path := []string{process.Config.DataDir, teleport.LogsDir, teleport.ComponentUpload, events.StreamingLogsDir, apidefaults.Namespace}
for i := 1; i < len(path); i++ {
dir := filepath.Join(path[:i+1]...)
log.Infof("Creating directory %v.", dir)
err := os.Mkdir(dir, 0755)
err = trace.ConvertSystemError(err)
if err != nil {
if !trace.IsAlreadyExists(err) {
return trace.Wrap(err)
}
if uid != nil && gid != nil {
log.Infof("Setting directory %v owner to %v:%v.", dir, *uid, *gid)
err := os.Chown(dir, *uid, *gid)
if err != nil {
return trace.ConvertSystemError(err)
}
}
if uid != nil && gid != nil {
log.Infof("Setting directory %v owner to %v:%v.", dir, *uid, *gid)
err := os.Chown(dir, *uid, *gid)
if err != nil {
return trace.ConvertSystemError(err)
}
}
}
fileUploader, err := filesessions.NewUploader(filesessions.UploaderConfig{
ScanDir: filepath.Join(streamingDir...),
ScanDir: filepath.Join(path...),
Streamer: streamer,
AuditLog: auditLog,
EventsC: process.Config.UploadEventsC,

View file

@ -723,7 +723,7 @@ func (s *session) PID() int {
return s.term.PID()
}
// Recorder returns a events.SessionRecorder which can be used to emit events
// Recorder returns a StreamWriter which can be used to emit events
// to a session as well as the audit log.
func (s *session) Recorder() events.StreamWriter {
s.mu.RLock()