active node inventory cleanup

This commit is contained in:
Forrest Marshall 2022-02-02 09:01:19 -08:00 committed by Forrest
parent 92dbdab703
commit ba317929d4
2 changed files with 231 additions and 6 deletions

137
lib/cache/cache.go vendored
View file

@ -31,6 +31,7 @@ import (
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/services/local"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/teleport/lib/utils/interval"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
@ -42,9 +43,11 @@ func tombstoneKey() []byte {
return backend.Key("cache", teleport.Version, "tombstone", "ok")
}
const cacheTargetAuth string = "auth"
// ForAuth sets up watch configuration for the auth server
func ForAuth(cfg Config) Config {
cfg.target = "auth"
cfg.target = cacheTargetAuth
cfg.Watches = []types.WatchKind{
{Kind: types.KindCertAuthority, LoadSecrets: true},
{Kind: types.KindClusterName},
@ -520,6 +523,10 @@ type Config struct {
// CacheInitTimeout is the maximum amount of time that cache.New
// will block, waiting for initialization (default=20s).
CacheInitTimeout time.Duration
// RelativeExpiryCheckInterval determines how often the cache performs special
// "relative expiration" checks which are used to compensate for real backends
// that have suffer from overly lazy ttl'ing of resources.
RelativeExpiryCheckInterval time.Duration
// EventsC is a channel for event notifications,
// used in tests
EventsC chan Event
@ -561,6 +568,12 @@ func (c *Config) CheckAndSetDefaults() error {
if c.CacheInitTimeout == 0 {
c.CacheInitTimeout = time.Second * 20
}
if c.RelativeExpiryCheckInterval == 0 {
// TODO(fspmarshall): change this to 1/2 offline threshold once that becomes
// a configurable value. This will likely be a dynamic configuration, and
// therefore require lazy initialization after the cache has become healthy.
c.RelativeExpiryCheckInterval = apidefaults.ServerAnnounceTTL / 2
}
if c.Component == "" {
c.Component = teleport.ComponentCache
}
@ -589,6 +602,9 @@ const (
// Reloading is emitted when an error occurred watching events
// and the cache is waiting to create a new watcher
Reloading = "reloading_cache"
// RelativeExpiry notifies that relative expiry operations have
// been run.
RelativeExpiry = "relative_expiry"
)
// New creates a new instance of Cache
@ -901,6 +917,13 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim
retry.Reset()
relativeExpiryInterval := interval.New(interval.Config{
Duration: c.Config.RelativeExpiryCheckInterval,
FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval),
Jitter: utils.NewSeventhJitter(),
})
defer relativeExpiryInterval.Stop()
c.notify(c.ctx, Event{Type: WatcherStarted})
var lastStalenessWarning time.Time
@ -911,6 +934,11 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim
return trace.ConnectionProblem(watcher.Error(), "watcher is closed: %v", watcher.Error())
case <-c.ctx.Done():
return trace.ConnectionProblem(c.ctx.Err(), "context is closing")
case <-relativeExpiryInterval.Next():
if err := c.performRelativeNodeExpiry(ctx); err != nil {
return trace.Wrap(err)
}
c.notify(ctx, Event{Type: RelativeExpiry})
case event := <-watcher.Events():
// check for expired resources in OpPut events and log them periodically. stale OpPut events
// may be an indicator of poor performance, and can lead to confusing and inconsistent state
@ -952,6 +980,113 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim
}
}
// performRelativeNodeExpiry performs a special kind of active expiration where we remove nodes
// which are clearly stale relative to their more recently heartbeated counterparts as well as
// the current time. This strategy lets us side-step issues of clock drift or general cache
// staleness by only removing items which are stale from within the cache's own "frame of
// reference".
//
// to better understand why we use this expiry strategy, its important to understand the two
// distinct scenarios that we're trying to accommodate:
//
// 1. Expiry events are being emitted very lazily by the real backend (*hours* after the time
// at which the resource was supposed to expire).
//
// 2. The event stream itself is stale (i.e. all events show up late, not just expiry events).
//
// In the first scenario, removing items from the cache after they have passed some designated
// threshold of staleness seems reasonable. In the second scenario, your best option is to
// faithfully serve the delayed, but internally consistent, view created by the event stream and
// not expire any items.
//
// Relative expiry is the compromise between the two above scenarios. We calculate a staleness
// threshold after which items would be removed, but we calculate it relative to the most recent
// expiry *or* the current time, depending on which is earlier. The result is that nodes are
// removed only if they are both stale from the perspective of the current clock, *and* stale
// relative to our current view of the world.
//
// *note*: this function is only sane to call when the cache and event stream are healthy, and
// cannot run concurrently with event processing. this function injects additional events into
// the outbound event stream.
func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error {
if c.target != cacheTargetAuth {
// if we are not the auth cache, we are a downstream cache and can rely upon the
// upstream auth cache to perform relative expiry and propagate the changes.
return nil
}
// TODO(fspmarshall): Start using dynamic value once it is implemented.
gracePeriod := apidefaults.ServerAnnounceTTL
// latestExp will be the value that we choose to consider the most recent "expired"
// timestamp. This will either end up being the most recently seen node expiry, or
// the current time (whichever is earlier).
var latestExp time.Time
nodes, err := c.GetNodes(ctx, apidefaults.Namespace)
if err != nil {
return trace.Wrap(err)
}
// iterate nodes and determine the most recent expiration value.
for _, node := range nodes {
if node.Expiry().IsZero() {
continue
}
if node.Expiry().After(latestExp) || latestExp.IsZero() {
// this node's expiry is more recent than the previously
// recorded value.
latestExp = node.Expiry()
}
}
if latestExp.IsZero() {
return nil
}
// if the most recent expiration value is still in the future, we use the current time
// as the most recent expiration value instead. Unless the event stream is unhealthy, or
// all nodes were recently removed, this should always be true.
if now := c.Clock.Now(); latestExp.After(now) {
latestExp = now
}
// we subtract gracePeriod from our most recent expiry value to get the retention
// threshold. nodes which expired earlier than the retention threshold will be
// removed, as we expect well-behaved backends to have emitted an expiry event
// within the grace period.
retentionThreshold := latestExp.Add(-gracePeriod)
var removed int
for _, node := range nodes {
if node.Expiry().IsZero() || node.Expiry().After(retentionThreshold) {
continue
}
// event stream processing is paused while this function runs. we perform the
// actual expiry by constructing a fake delete event for the resource which both
// updates this cache, and all downstream caches.
err = c.processEvent(ctx, types.Event{
Type: types.OpDelete,
Resource: &types.ResourceHeader{
Kind: types.KindNode,
Metadata: node.GetMetadata(),
},
})
if err != nil {
return trace.Wrap(err)
}
removed++
}
if removed > 0 {
c.Debugf("Removed %d nodes via relative expiry (retentionThreshold=%s).", removed, retentionThreshold)
}
return nil
}
func (c *Cache) watchKinds() []types.WatchKind {
out := make([]types.WatchKind, 0, len(c.collections))
for _, collection := range c.collections {

View file

@ -365,10 +365,34 @@ func TestWatchers(t *testing.T) {
}
func waitForRestart(t *testing.T, eventsC <-chan Event) {
waitForEvent(t, eventsC, WatcherStarted, Reloading, WatcherFailed)
expectEvent(t, eventsC, WatcherStarted)
}
func waitForEvent(t *testing.T, eventsC <-chan Event, expectedEvent string, skipEvents ...string) {
func drainEvents(eventsC <-chan Event) {
for {
select {
case <-eventsC:
default:
return
}
}
}
func expectEvent(t *testing.T, eventsC <-chan Event, expectedEvent string) {
timeC := time.After(5 * time.Second)
for {
select {
case event := <-eventsC:
if event.Type == expectedEvent {
return
}
case <-timeC:
t.Fatalf("Timeout waiting for expected event: %s", expectedEvent)
}
}
}
func expectNextEvent(t *testing.T, eventsC <-chan Event, expectedEvent string, skipEvents ...string) {
timeC := time.After(5 * time.Second)
for {
// wait for watcher to restart
@ -565,7 +589,7 @@ func TestTombstones(t *testing.T) {
require.NoError(t, p.cache.Close())
// wait for TombstoneWritten, ignoring all other event types
waitForEvent(t, p.eventsC, TombstoneWritten, WatcherStarted, EventProcessed, WatcherFailed)
expectEvent(t, p.eventsC, TombstoneWritten)
// simulate bad connection to auth server
p.backend.SetReadError(trace.ConnectionProblem(nil, "backend is unavailable"))
p.eventsS.closeWatchers()
@ -860,7 +884,7 @@ func initStrategy(t *testing.T) {
// wait for the watcher to fail
// there could be optional event processed event,
// see NOTE 1 above
waitForEvent(t, p.eventsC, WatcherFailed, EventProcessed, Reloading)
expectNextEvent(t, p.eventsC, WatcherFailed, EventProcessed, Reloading)
// backend is out, but old value is available
out2, err := p.cache.GetCertAuthority(ca.GetID(), false)
@ -878,7 +902,7 @@ func initStrategy(t *testing.T) {
// wait for watcher to restart successfully; ignoring any failed
// attempts which occurred before backend became healthy again.
waitForEvent(t, p.eventsC, WatcherStarted, WatcherFailed, Reloading)
expectEvent(t, p.eventsC, WatcherStarted)
// new value is available now
out, err = p.cache.GetCertAuthority(ca.GetID(), false)
@ -2178,6 +2202,72 @@ func TestDatabases(t *testing.T) {
require.Equal(t, 0, len(out))
}
func TestRelativeExpiry(t *testing.T) {
const checkInterval = time.Second
const nodeCount = int64(100)
ctx := context.Background()
clock := clockwork.NewFakeClockAt(time.Now().Add(time.Hour))
p := newTestPack(t, func(c Config) Config {
c.RelativeExpiryCheckInterval = checkInterval
c.Clock = clock
return ForAuth(c)
})
t.Cleanup(p.Close)
// add servers that expire at a range of times
now := clock.Now()
for i := int64(0); i < nodeCount; i++ {
exp := now.Add(time.Minute * time.Duration(i))
server := suite.NewServer(types.KindNode, uuid.New().String(), "127.0.0.1:2022", apidefaults.Namespace)
server.SetExpiry(exp)
_, err := p.presenceS.UpsertNode(ctx, server)
require.NoError(t, err)
// Check that information has been replicated to the cache.
expectEvent(t, p.eventsC, EventProcessed)
}
nodes, err := p.cache.GetNodes(ctx, apidefaults.Namespace)
require.NoError(t, err)
require.Len(t, nodes, 100)
clock.Advance(time.Minute * 25)
// get rid of events that were emitted before clock advanced
drainEvents(p.eventsC)
// wait for next relative expiry check to run
expectEvent(t, p.eventsC, RelativeExpiry)
// verify that roughly expected proportion of nodes was removed.
nodes, err = p.cache.GetNodes(ctx, apidefaults.Namespace)
require.NoError(t, err)
require.True(t, len(nodes) < 100 && len(nodes) > 75, "node_count=%d", len(nodes))
clock.Advance(time.Minute * 25)
// get rid of events that were emitted before clock advanced
drainEvents(p.eventsC)
// wait for next relative expiry check to run
expectEvent(t, p.eventsC, RelativeExpiry)
// verify that roughly expected proportion of nodes was removed.
nodes, err = p.cache.GetNodes(ctx, apidefaults.Namespace)
require.NoError(t, err)
require.True(t, len(nodes) < 75 && len(nodes) > 50, "node_count=%d", len(nodes))
// finally, we check the "sliding window" by verifying that we don't remove all nodes
// even if we advance well past the latest expiry time.
clock.Advance(time.Hour * 24)
// get rid of events that were emitted before clock advanced
drainEvents(p.eventsC)
// wait for next relative expiry check to run
expectEvent(t, p.eventsC, RelativeExpiry)
// verify that sliding window has preserved most recent nodes
nodes, err = p.cache.GetNodes(ctx, apidefaults.Namespace)
require.NoError(t, err)
require.True(t, len(nodes) > 0, "node_count=%d", len(nodes))
}
func TestCache_Backoff(t *testing.T) {
clock := clockwork.NewFakeClock()
p := newTestPack(t, func(c Config) Config {