teleport/lib/backend/buffer.go
Michael Wilson e93d165ed1
Redundant event prefix check before event watcher is created. (#32748)
* Revert "Error when redundant prefixes are detected in events. (#32652)"

This reverts commit 84dbc45e1d.

* Error when redundant prefixes are detected in events.

When creating a new events watcher, redundant prefixes will be detected
and produce an error. This should prevent developer mistakes where watched
prefixes overlap, causing subsets of events not to be parsed. This has been
verified manually.

* Fix buffer_test.go

* Update lib/services/local/events.go

Co-authored-by: Edoardo Spadolini <edoardo.spadolini@goteleport.com>

---------

Co-authored-by: Edoardo Spadolini <edoardo.spadolini@goteleport.com>
2023-09-29 01:22:08 +00:00

502 lines
12 KiB
Go

/*
Copyright 2018-2019 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package backend
import (
"bytes"
"context"
"fmt"
"sort"
"sync"
"time"
radix "github.com/armon/go-radix"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
)
type bufferConfig struct {
gracePeriod time.Duration
capacity int
clock clockwork.Clock
}
type BufferOption func(*bufferConfig)
// BufferCapacity sets the event capacity of the circular buffer.
func BufferCapacity(c int) BufferOption {
return func(cfg *bufferConfig) {
if c > 0 {
cfg.capacity = c
}
}
}
// BacklogGracePeriod sets the amount of time a watcher with a backlog will be tolerated.
func BacklogGracePeriod(d time.Duration) BufferOption {
return func(cfg *bufferConfig) {
if d > 0 {
cfg.gracePeriod = d
}
}
}
// BufferClock sets a custom clock for the buffer (used in tests).
func BufferClock(c clockwork.Clock) BufferOption {
return func(cfg *bufferConfig) {
if c != nil {
cfg.clock = c
}
}
}
// CircularBuffer implements in-memory circular buffer
// of predefined size, that is capable of fan-out of the backend events.
type CircularBuffer struct {
sync.Mutex
*log.Entry
cfg bufferConfig
init, closed bool
watchers *watcherTree
}
// NewCircularBuffer returns a new uninitialized instance of circular buffer.
func NewCircularBuffer(opts ...BufferOption) *CircularBuffer {
cfg := bufferConfig{
gracePeriod: DefaultBacklogGracePeriod,
capacity: DefaultBufferCapacity,
clock: clockwork.NewRealClock(),
}
for _, opt := range opts {
opt(&cfg)
}
return &CircularBuffer{
Entry: log.WithFields(log.Fields{
trace.Component: teleport.ComponentBuffer,
}),
cfg: cfg,
watchers: newWatcherTree(),
}
}
// Clear clears all events from the queue and closes all active watchers,
// but does not modify init state.
func (c *CircularBuffer) Clear() {
c.Lock()
defer c.Unlock()
c.clear()
}
// Reset is equivalent to Clear except that is also sets the buffer into
// an uninitialized state. This method should only be used when resetting
// after a broken event stream. If only closure of watchers is desired,
// use Clear instead.
func (c *CircularBuffer) Reset() {
c.Lock()
defer c.Unlock()
c.clear()
c.init = false
}
func (c *CircularBuffer) clear() {
// could close multiple times
c.watchers.walk(func(w *BufferWatcher) {
w.closeWatcher()
})
c.watchers = newWatcherTree()
}
// SetInit puts the buffer into an initialized state if it isn't already. Any watchers already queued
// will be sent init events, and watchers added after this call will have their init events sent immediately.
// This function must be called *after* establishing a healthy parent event stream in order to preserve
// correct cache behavior.
func (c *CircularBuffer) SetInit() {
c.Lock()
defer c.Unlock()
if c.init {
return
}
var watchersToDelete []*BufferWatcher
c.watchers.walk(func(watcher *BufferWatcher) {
if ok := watcher.init(); !ok {
watchersToDelete = append(watchersToDelete, watcher)
}
})
for _, watcher := range watchersToDelete {
c.Warningf("Closing %v, failed to send init event.", watcher)
watcher.closeWatcher()
c.watchers.rm(watcher)
}
c.init = true
}
// Close closes circular buffer and all watchers
func (c *CircularBuffer) Close() error {
c.Lock()
defer c.Unlock()
c.clear()
c.closed = true
// note that we do not modify init state here. this is because
// calls to Close are allowed to happen concurrently with calls
// to Emit().
return nil
}
// Emit emits events to currently registered watchers and stores them to
// the buffer. Panics if called before SetInit(), and returns false if called
// after Close().
func (c *CircularBuffer) Emit(events ...Event) (ok bool) {
c.Lock()
defer c.Unlock()
if c.closed {
return false
}
for i := range events {
c.emit(events[i])
}
return true
}
func (c *CircularBuffer) emit(r Event) {
if !c.init {
panic("push called on uninitialized buffer instance")
}
c.fanOutEvent(r)
}
func (c *CircularBuffer) fanOutEvent(r Event) {
var watchersToDelete []*BufferWatcher
c.watchers.walkPath(string(r.Item.Key), func(watcher *BufferWatcher) {
if watcher.MetricComponent != "" {
watcherQueues.WithLabelValues(watcher.MetricComponent).Set(float64(len(watcher.eventsC)))
}
if !watcher.emit(r) {
watchersToDelete = append(watchersToDelete, watcher)
}
})
for _, watcher := range watchersToDelete {
c.Warningf("Closing %v, buffer overflow at %v (backlog=%v).", watcher, len(watcher.eventsC), watcher.backlogLen())
watcher.closeWatcher()
c.watchers.rm(watcher)
}
}
// RemoveRedundantPrefixes will remove redundant prefixes from the given prefix list.
func RemoveRedundantPrefixes(prefixes [][]byte) [][]byte {
if len(prefixes) == 0 {
return prefixes
}
// group adjacent prefixes together
sort.Slice(prefixes, func(i, j int) bool {
return bytes.Compare(prefixes[i], prefixes[j]) == -1
})
// j increments only for values with non-redundant prefixes
j := 0
for i := 1; i < len(prefixes); i++ {
// skip keys that have first key as a prefix
if bytes.HasPrefix(prefixes[i], prefixes[j]) {
continue
}
j++
// assign the first non-matching key to the j
prefixes[j] = prefixes[i]
}
return prefixes[:j+1]
}
// NewWatcher adds a new watcher to the events buffer
func (c *CircularBuffer) NewWatcher(ctx context.Context, watch Watch) (Watcher, error) {
c.Lock()
defer c.Unlock()
if c.closed {
return nil, trace.Errorf("cannot register watcher, buffer is closed")
}
if watch.QueueSize == 0 {
watch.QueueSize = c.cfg.capacity
}
if len(watch.Prefixes) == 0 {
// if watcher has no prefixes, assume it will match anything
// starting from the separator (what includes all keys in backend invariant, see Keys function)
watch.Prefixes = append(watch.Prefixes, []byte{Separator})
} else {
// if watcher's prefixes are redundant, keep only shorter prefixes
// to avoid double fan out
watch.Prefixes = RemoveRedundantPrefixes(watch.Prefixes)
}
closeCtx, cancel := context.WithCancel(ctx)
w := &BufferWatcher{
buffer: c,
Watch: watch,
eventsC: make(chan Event, watch.QueueSize),
ctx: closeCtx,
cancel: cancel,
capacity: watch.QueueSize,
}
c.Debugf("Add %v.", w)
if c.init {
if ok := w.init(); !ok {
c.Warningf("Closing %v, failed to send init event.", w)
return nil, trace.BadParameter("failed to send init event")
}
}
c.watchers.add(w)
return w, nil
}
func (c *CircularBuffer) removeWatcherWithLock(watcher *BufferWatcher) {
c.Lock()
defer c.Unlock()
if watcher == nil {
c.Warningf("Internal logic error: %v.", trace.DebugReport(trace.BadParameter("empty watcher")))
return
}
c.Debugf("Removing watcher %v (%p) via external close.", watcher.Name, watcher)
found := c.watchers.rm(watcher)
if !found {
c.Debugf("Could not find watcher %v.", watcher.Name)
}
}
// BufferWatcher is a watcher connected to the
// buffer and receiving fan-out events from the watcher
type BufferWatcher struct {
buffer *CircularBuffer
Watch
eventsC chan Event
bmu sync.Mutex
backlog []Event
backlogSince time.Time
ctx context.Context
cancel context.CancelFunc
initOnce sync.Once
initOk bool
capacity int
}
// String returns user-friendly representation
// of the buffer watcher
func (w *BufferWatcher) String() string {
return fmt.Sprintf("Watcher(name=%v, prefixes=%v, capacity=%v, size=%v)", w.Name, string(bytes.Join(w.Prefixes, []byte(", "))), w.capacity, len(w.eventsC))
}
// Events returns events channel. This method performs internal work and should be re-called after each event
// is received, rather than having its output cached.
func (w *BufferWatcher) Events() <-chan Event {
w.bmu.Lock()
defer w.bmu.Unlock()
// it is possible that the channel has been drained, but events exist in
// the backlog, so make sure to flush the backlog. we can ignore the result
// of the flush here since we don't actually care if the backlog was fully
// flushed, only that the event channel is non-empty if a backlog does exist.
w.flushBacklog()
return w.eventsC
}
// Done channel is closed when watcher is closed
func (w *BufferWatcher) Done() <-chan struct{} {
return w.ctx.Done()
}
// flushBacklog attempts to push any backlogged events into the
// event channel. returns true if backlog is empty.
func (w *BufferWatcher) flushBacklog() (ok bool) {
for i, e := range w.backlog {
select {
case w.eventsC <- e:
default:
w.backlog = w.backlog[i:]
return false
}
}
w.backlogSince = time.Time{}
w.backlog = nil
return true
}
// emit attempts to emit an event. Returns false if the watcher has
// exceeded the backlog grace period.
func (w *BufferWatcher) emit(e Event) (ok bool) {
w.bmu.Lock()
defer w.bmu.Unlock()
if !w.flushBacklog() {
if w.buffer.cfg.clock.Now().After(w.backlogSince.Add(w.buffer.cfg.gracePeriod)) {
// backlog has existed for longer than grace period,
// this watcher needs to be removed.
return false
}
// backlog exists, but we are still within grace period.
w.backlog = append(w.backlog, e)
return true
}
select {
case w.eventsC <- e:
default:
// primary event buffer is full; start backlog.
w.backlog = append(w.backlog, e)
w.backlogSince = w.buffer.cfg.clock.Now()
}
return true
}
func (w *BufferWatcher) backlogLen() int {
w.bmu.Lock()
defer w.bmu.Unlock()
return len(w.backlog)
}
// init transmits the OpInit event. safe to double-call.
func (w *BufferWatcher) init() (ok bool) {
w.initOnce.Do(func() {
select {
case w.eventsC <- Event{Type: types.OpInit}:
w.initOk = true
default:
w.initOk = false
}
})
return w.initOk
}
// Close closes the watcher, could
// be called multiple times, removes the watcher
// from the buffer queue
func (w *BufferWatcher) Close() error {
w.closeAndRemove(removeAsync)
return nil
}
// closeWatcher closes watcher
func (w *BufferWatcher) closeWatcher() {
w.cancel()
}
const (
removeSync = true
removeAsync = false
)
// closeAndRemove closes the watcher, could
// be called multiple times, removes the watcher
// from the buffer queue synchronously (used in tests)
// or asyncronously, used in prod, to avoid potential deadlocks
func (w *BufferWatcher) closeAndRemove(sync bool) {
w.closeWatcher()
if sync {
w.buffer.removeWatcherWithLock(w)
} else {
go w.buffer.removeWatcherWithLock(w)
}
}
func newWatcherTree() *watcherTree {
return &watcherTree{
Tree: radix.New(),
}
}
type watcherTree struct {
*radix.Tree
}
// add adds buffer watcher to the tree
func (t *watcherTree) add(w *BufferWatcher) {
for _, p := range w.Prefixes {
prefix := string(p)
val, ok := t.Tree.Get(prefix)
var watchers []*BufferWatcher
if ok {
watchers = val.([]*BufferWatcher)
}
watchers = append(watchers, w)
t.Tree.Insert(prefix, watchers)
}
}
// rm removes the buffer watcher from the prefix tree
func (t *watcherTree) rm(w *BufferWatcher) bool {
if w == nil {
return false
}
var found bool
for _, p := range w.Prefixes {
prefix := string(p)
val, ok := t.Tree.Get(prefix)
if !ok {
continue
}
buffers := val.([]*BufferWatcher)
prevLen := len(buffers)
for i := range buffers {
if buffers[i] == w {
buffers = append(buffers[:i], buffers[i+1:]...)
found = true
break
}
}
if len(buffers) == 0 {
t.Tree.Delete(prefix)
} else if len(buffers) != prevLen {
t.Tree.Insert(prefix, buffers)
}
}
return found
}
// walkFn is a callback executed for every matching watcher
type walkFn func(w *BufferWatcher)
// walkPath walks the tree above the longest matching prefix
// and calls fn callback for every buffer watcher
func (t *watcherTree) walkPath(key string, fn walkFn) {
t.Tree.WalkPath(key, func(prefix string, val interface{}) bool {
watchers := val.([]*BufferWatcher)
for _, w := range watchers {
fn(w)
}
return false
})
}
// walk calls fn for every matching leaf of the tree
func (t *watcherTree) walk(fn walkFn) {
t.Tree.Walk(func(prefix string, val interface{}) bool {
watchers := val.([]*BufferWatcher)
for _, w := range watchers {
fn(w)
}
return false
})
}