event fanout rework (#32978)

This commit is contained in:
Forrest 2023-10-23 11:59:12 -07:00 committed by GitHub
parent 4dce835487
commit 54a05b2ca4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 896 additions and 59 deletions

View file

@ -22,6 +22,7 @@ import (
"time"
"github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/utils"
)
// WatchStatus contains information about a successful Watch request.
@ -31,6 +32,8 @@ type WatchStatus interface {
GetKinds() []WatchKind
// SetKinds sets the list of kinds confirmed by the Watch request.
SetKinds([]WatchKind)
// Clone performs a deep copy of watch status.
Clone() WatchStatus
}
// GetKind returns the watch status resource kind.
@ -113,6 +116,11 @@ func (w *WatchStatusV1) SetKinds(kinds []WatchKind) {
w.Spec.Kinds = kinds
}
// Clone performs a deep-copy of watch status.
func (w *WatchStatusV1) Clone() WatchStatus {
return utils.CloneProtoMsg(w)
}
// NewWatchStatus returns a new WatchStatus resource.
func NewWatchStatus(kinds []WatchKind) *WatchStatusV1 {
return &WatchStatusV1{

View file

@ -79,6 +79,7 @@ import (
wantypes "github.com/gravitational/teleport/lib/auth/webauthntypes"
"github.com/gravitational/teleport/lib/authz"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/cache"
"github.com/gravitational/teleport/lib/circleci"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
@ -444,6 +445,10 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) {
as.RegisterLoginHook(as.ulsGenerator.LoginHook(services.UserLoginStates))
if _, ok := as.getCache(); !ok {
log.Warn("Auth server starting without cache (may have negative performance implications).")
}
return &as, nil
}
@ -4520,6 +4525,43 @@ func (a *Server) GetAccessCapabilities(ctx context.Context, req types.AccessCapa
return caps, nil
}
func (a *Server) getCache() (c *cache.Cache, ok bool) {
c, ok = a.Cache.(*cache.Cache)
return
}
func (a *Server) NewStream(ctx context.Context, watch types.Watch) (stream.Stream[types.Event], error) {
if cache, ok := a.getCache(); ok {
// cache exposes a native stream implementation
return cache.NewStream(ctx, watch)
}
// fallback to wrapping a watcher in a stream.Stream adapter
watcher, err := a.Cache.NewWatcher(ctx, watch)
if err != nil {
return nil, trace.Wrap(err)
}
closer := func() {
watcher.Close()
}
return stream.Func(func() (types.Event, error) {
select {
case event := <-watcher.Events():
return event, nil
case <-watcher.Done():
err := watcher.Error()
if err == nil {
// stream.Func needs an error to signal end of stream. io.EOF is
// the expected "happy" end of stream singnal.
err = io.EOF
}
return types.Event{}, trace.Wrap(err)
}
}, closer), nil
}
// NewKeepAliver returns a new instance of keep aliver
func (a *Server) NewKeepAliver(ctx context.Context) (types.KeepAliver, error) {
cancelCtx, cancel := context.WithCancel(ctx)

View file

@ -1396,10 +1396,27 @@ func (a *ServerWithRoles) KeepAliveServer(ctx context.Context, handle types.Keep
return a.authServer.KeepAliveServer(ctx, handle)
}
// NewStream returns a new event stream (equivalent to NewWatcher, but with slightly different
// performance characteristics).
func (a *ServerWithRoles) NewStream(ctx context.Context, watch types.Watch) (stream.Stream[types.Event], error) {
if err := a.authorizeWatchRequest(&watch); err != nil {
return nil, trace.Wrap(err)
}
return a.authServer.NewStream(ctx, watch)
}
// NewWatcher returns a new event watcher
func (a *ServerWithRoles) NewWatcher(ctx context.Context, watch types.Watch) (types.Watcher, error) {
if err := a.authorizeWatchRequest(&watch); err != nil {
return nil, trace.Wrap(err)
}
return a.authServer.NewWatcher(ctx, watch)
}
// authorizeWatchRequest performs permission checks and filtering on incoming watch requests.
func (a *ServerWithRoles) authorizeWatchRequest(watch *types.Watch) error {
if len(watch.Kinds) == 0 {
return nil, trace.AccessDenied("can't setup global watch")
return trace.AccessDenied("can't setup global watch")
}
validKinds := make([]types.WatchKind, 0, len(watch.Kinds))
@ -1409,14 +1426,14 @@ func (a *ServerWithRoles) NewWatcher(ctx context.Context, watch types.Watch) (ty
if watch.AllowPartialSuccess {
continue
}
return nil, trace.Wrap(err)
return trace.Wrap(err)
}
validKinds = append(validKinds, kind)
}
if len(validKinds) == 0 {
return nil, trace.BadParameter("none of the requested kinds can be watched")
return trace.BadParameter("none of the requested kinds can be watched")
}
watch.Kinds = validKinds
@ -1426,7 +1443,8 @@ func (a *ServerWithRoles) NewWatcher(ctx context.Context, watch types.Watch) (ty
case a.hasBuiltinRole(types.RoleNode):
watch.QueueSize = defaults.NodeQueueSize
}
return a.authServer.NewWatcher(ctx, watch)
return nil
}
// hasWatchPermissionForKind checks the permissions for data of each kind.

View file

@ -376,7 +376,7 @@ func (g *GRPCServer) CreateAuditStream(stream authpb.AuthService_CreateAuditStre
const logInterval = 10000
// WatchEvents returns a new stream of cluster events
func (g *GRPCServer) WatchEvents(watch *authpb.Watch, stream authpb.AuthService_WatchEventsServer) error {
func (g *GRPCServer) WatchEvents(watch *authpb.Watch, stream authpb.AuthService_WatchEventsServer) (err error) {
auth, err := g.authenticate(stream.Context())
if err != nil {
return trace.Wrap(err)
@ -387,40 +387,43 @@ func (g *GRPCServer) WatchEvents(watch *authpb.Watch, stream authpb.AuthService_
AllowPartialSuccess: watch.AllowPartialSuccess,
}
watcher, err := auth.NewWatcher(stream.Context(), servicesWatch)
events, err := auth.NewStream(stream.Context(), servicesWatch)
if err != nil {
return trace.Wrap(err)
}
defer watcher.Close()
for {
select {
case <-stream.Context().Done():
return nil
case <-watcher.Done():
return watcher.Error()
case event := <-watcher.Events():
if role, ok := event.Resource.(*types.RoleV6); ok {
downgraded, err := maybeDowngradeRole(stream.Context(), role)
if err != nil {
return trace.Wrap(err)
}
event.Resource = downgraded
}
out, err := client.EventToGRPC(event)
defer func() {
serr := events.Done()
if err == nil {
err = serr
}
}()
for events.Next() {
event := events.Item()
if role, ok := event.Resource.(*types.RoleV6); ok {
downgraded, err := maybeDowngradeRole(stream.Context(), role)
if err != nil {
return trace.Wrap(err)
}
event.Resource = downgraded
}
out, err := client.EventToGRPC(event)
if err != nil {
return trace.Wrap(err)
}
size := float64(proto.Size(out))
watcherEventsEmitted.WithLabelValues(resourceLabel(event)).Observe(size)
watcherEventSizes.Observe(size)
size := float64(proto.Size(out))
watcherEventsEmitted.WithLabelValues(resourceLabel(event)).Observe(size)
watcherEventSizes.Observe(size)
if err := stream.Send(out); err != nil {
return trace.Wrap(err)
}
if err := stream.Send(out); err != nil {
return trace.Wrap(err)
}
}
// defferred cleanup func will inject stream error if needed
return nil
}
// resourceLabel returns the label for the provided types.Event

View file

@ -145,7 +145,7 @@ type EtcdBackend struct {
nodes []string
*log.Entry
cfg *Config
clients roundRobin[*clientv3.Client]
clients *utils.RoundRobin[*clientv3.Client]
cancelC chan bool
stopC chan bool
clock clockwork.Clock
@ -273,7 +273,6 @@ func New(ctx context.Context, params backend.Params, opts ...Option) (*EtcdBacke
b := &EtcdBackend{
Entry: log.WithFields(log.Fields{trace.Component: GetName()}),
cfg: cfg,
clients: newRoundRobin[*clientv3.Client](nil), // initialized below in reconnect()
nodes: cfg.Nodes,
cancelC: make(chan bool, 1),
stopC: make(chan bool, 1),
@ -409,8 +408,10 @@ func (b *EtcdBackend) Close() error {
b.cancel()
b.buf.Close()
var errs []error
for _, clt := range b.clients.items {
errs = append(errs, clt.Close())
if b.clients != nil {
b.clients.ForEach(func(clt *clientv3.Client) {
errs = append(errs, clt.Close())
})
}
return trace.NewAggregate(errs...)
}
@ -422,12 +423,15 @@ func (b *EtcdBackend) CloseWatchers() {
}
func (b *EtcdBackend) reconnect(ctx context.Context) error {
for _, clt := range b.clients.items {
if err := clt.Close(); err != nil {
b.Entry.WithError(err).Warning("Failed closing existing etcd client on reconnect.")
}
if b.clients != nil {
b.clients.ForEach(func(clt *clientv3.Client) {
if err := clt.Close(); err != nil {
b.Entry.WithError(err).Warning("Failed closing existing etcd client on reconnect.")
}
})
b.clients = nil
}
b.clients.items = nil
tlsConfig := utils.TLSConfig(nil)
@ -466,6 +470,7 @@ func (b *EtcdBackend) reconnect(ctx context.Context) error {
tlsConfig.ClientCAs = certPool
}
clients := make([]*clientv3.Client, 0, b.cfg.ClientPoolSize)
for i := 0; i < b.cfg.ClientPoolSize; i++ {
clt, err := clientv3.New(clientv3.Config{
Context: ctx,
@ -477,13 +482,16 @@ func (b *EtcdBackend) reconnect(ctx context.Context) error {
MaxCallSendMsgSize: b.cfg.MaxClientMsgSizeBytes,
})
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return trace.WrapWithMessage(err, "timed out dialing etcd endpoints: %s", b.nodes)
// close any preceding clients
for _, c := range clients {
c.Close()
}
return trace.Wrap(err)
}
b.clients.items = append(b.clients.items, clt)
clients = append(clients, clt)
}
b.clients = utils.NewRoundRobin(clients)
return nil
}

108
lib/cache/cache.go vendored
View file

@ -35,6 +35,7 @@ import (
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client/proto"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/internalutils/stream"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/discoveryconfig"
"github.com/gravitational/teleport/api/types/secreports"
@ -71,6 +72,26 @@ var (
cacheCollectors = []prometheus.Collector{cacheEventsReceived, cacheStaleEventsReceived}
)
// highVolumeResources is the set of cached resources that tend to produce high
// event volumes (e.g. heartbeat resources). high volume events, and the watchers that
// care about them, are separated into a dedicated event fanout system in order to
// reduce the amount of load on watchers that only care about cluster state resources.
// peripheral agents that scale linearly with cluster resources (e.g. nodes) should never
// watch events of this kind.
var highVolumeResources = map[string]struct{}{
types.KindNode: {},
types.KindAppServer: {},
types.KindDatabaseServer: {},
types.KindDatabaseService: {},
types.KindWindowsDesktopService: {},
types.KindKubeServer: {},
}
func isHighVolumeResource(kind string) bool {
_, ok := highVolumeResources[kind]
return ok
}
// ForAuth sets up watch configuration for the auth server
func ForAuth(cfg Config) Config {
cfg.target = "auth"
@ -127,6 +148,8 @@ func ForAuth(cfg Config) Config {
// We don't want to enable partial health for auth cache because auth uses an event stream
// from the local backend which must support all resource kinds. We want to catch it early if it doesn't.
cfg.DisablePartialHealth = true
// auth server shards its event fanout system in order to reduce lock contention in very large clusters.
cfg.FanoutShards = 64
return cfg
}
@ -485,7 +508,8 @@ type Cache struct {
headlessAuthenticationsCache services.HeadlessAuthenticationService
secReportsCache services.SecReports
userLoginStateCache services.UserLoginStates
eventsFanout *services.FanoutSet
eventsFanout *services.FanoutV2
lowVolumeEventsFanout *utils.RoundRobin[*services.FanoutV2]
// closed indicates that the cache has been closed
closed atomic.Bool
@ -591,6 +615,8 @@ type Config struct {
// Watches provides a list of resources
// for the cache to watch
Watches []types.WatchKind
// FanoutShards is the number of event fanout shards to allocate
FanoutShards int
// Events provides events watchers
Events types.Events
// Trust is a service providing information about certificate
@ -736,6 +762,9 @@ func (c *Config) CheckAndSetDefaults() error {
if c.Tracer == nil {
c.Tracer = tracing.NoopTracer(c.Component)
}
if c.FanoutShards == 0 {
c.FanoutShards = 1
}
return nil
}
@ -831,6 +860,12 @@ func New(config Config) (*Cache, error) {
return nil, trace.Wrap(err)
}
fanout := services.NewFanoutV2(services.FanoutV2Config{})
lowVolumeFanouts := make([]*services.FanoutV2, 0, config.FanoutShards)
for i := 0; i < config.FanoutShards; i++ {
lowVolumeFanouts = append(lowVolumeFanouts, services.NewFanoutV2(services.FanoutV2Config{}))
}
cs := &Cache{
ctx: ctx,
cancel: cancel,
@ -863,7 +898,8 @@ func New(config Config) (*Cache, error) {
headlessAuthenticationsCache: local.NewIdentityService(config.Backend),
secReportsCache: secReprotsCache,
userLoginStateCache: userLoginStatesCache,
eventsFanout: services.NewFanoutSet(),
eventsFanout: fanout,
lowVolumeEventsFanout: utils.NewRoundRobin(lowVolumeFanouts),
Logger: log.WithFields(log.Fields{
trace.Component: config.Component,
}),
@ -920,6 +956,30 @@ func (c *Cache) Start() error {
return nil
}
// NewStream is equivalent to NewWatcher except that it represents the event
// stream as a stream.Stream rather than a channel. Watcher style event handling
// is generally more common, but this API may be preferable for usecases where
// *many* event streams need to be allocated as it is slightly more resource-efficient.
func (c *Cache) NewStream(ctx context.Context, watch types.Watch) (stream.Stream[types.Event], error) {
ctx, span := c.Tracer.Start(ctx, "cache/NewStream")
defer span.End()
validKinds, highVolume, err := c.validateWatchRequest(watch)
if err != nil {
return nil, trace.Wrap(err)
}
watch.Kinds = validKinds
if highVolume {
// watch request includes high volume resources, register with the
// full fanout instance.
return c.eventsFanout.NewStream(ctx, watch), nil
}
// watch request does not contain high volume resources, register with
// the low volume fanout instance (improves performance at scale).
return c.lowVolumeEventsFanout.Next().NewStream(ctx, watch), nil
}
// NewWatcher returns a new event watcher. In case of a cache
// this watcher will return events as seen by the cache,
// not the backend. This feature allows auth server
@ -929,14 +989,35 @@ func (c *Cache) NewWatcher(ctx context.Context, watch types.Watch) (types.Watche
ctx, span := c.Tracer.Start(ctx, "cache/NewWatcher")
defer span.End()
validKinds, highVolume, err := c.validateWatchRequest(watch)
if err != nil {
return nil, trace.Wrap(err)
}
watch.Kinds = validKinds
if highVolume {
// watch request includes high volume resources, register with the
// full fanout instance.
return c.eventsFanout.NewWatcher(ctx, watch)
}
// watch request does not contain high volume resources, register with
// the low volume fanout instance (improves performance at scale).
return c.lowVolumeEventsFanout.Next().NewWatcher(ctx, watch)
}
func (c *Cache) validateWatchRequest(watch types.Watch) (kinds []types.WatchKind, highVolume bool, err error) {
c.rw.RLock()
cacheOK := c.ok
confirmedKinds := c.confirmedKinds
c.rw.RUnlock()
validKinds := make([]types.WatchKind, 0, len(watch.Kinds))
var containsHighVolumeResource bool
Outer:
for _, requested := range watch.Kinds {
if isHighVolumeResource(requested.Kind) {
containsHighVolumeResource = true
}
if cacheOK {
// if cache has been initialized, we already know which kinds are confirmed by the event source
// and can validate the kinds requested for fanout against that.
@ -945,7 +1026,7 @@ Outer:
if watch.AllowPartialSuccess {
continue
}
return nil, trace.BadParameter("cache %q does not support watching resource %q", c.Config.target, requested.Kind)
return nil, false, trace.BadParameter("cache %q does not support watching resource %q", c.Config.target, requested.Kind)
}
validKinds = append(validKinds, requested)
} else {
@ -960,16 +1041,15 @@ Outer:
if watch.AllowPartialSuccess {
continue
}
return nil, trace.BadParameter("cache %q does not support watching resource %q", c.Config.target, requested.Kind)
return nil, false, trace.BadParameter("cache %q does not support watching resource %q", c.Config.target, requested.Kind)
}
}
if len(validKinds) == 0 {
return nil, trace.BadParameter("cache %q does not support any of the requested resources", c.Config.target)
return nil, false, trace.BadParameter("cache %q does not support any of the requested resources", c.Config.target)
}
watch.Kinds = validKinds
return c.eventsFanout.NewWatcher(ctx, watch)
return validKinds, containsHighVolumeResource, nil
}
func (c *Cache) update(ctx context.Context, retry retryutils.Retry) {
@ -1156,7 +1236,13 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer
// that any derivative caches do not perform their fetch operations
// until this cache has finished its apply operations.
c.eventsFanout.SetInit(confirmedKinds)
c.lowVolumeEventsFanout.ForEach(func(f *services.FanoutV2) {
f.SetInit(confirmedKinds)
})
defer c.eventsFanout.Reset()
defer c.lowVolumeEventsFanout.ForEach(func(f *services.FanoutV2) {
f.Reset()
})
retry.Reset()
@ -1367,6 +1453,9 @@ func (c *Cache) Close() error {
c.closed.Store(true)
c.cancel()
c.eventsFanout.Close()
c.lowVolumeEventsFanout.ForEach(func(f *services.FanoutV2) {
f.Close()
})
return nil
}
@ -1496,6 +1585,11 @@ func (c *Cache) processEvent(ctx context.Context, event types.Event, emit bool)
}
if emit {
c.eventsFanout.Emit(event)
if !isHighVolumeResource(resourceKind.kind) {
c.lowVolumeEventsFanout.ForEach(func(f *services.FanoutV2) {
f.Emit(event)
})
}
}
return nil
}

View file

@ -60,6 +60,14 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}
// TestNodesDontCacheHighVolumeResources verifies that resources classified as "high volume" aren't
// cached by nodes.
func TestNodesDontCacheHighVolumeResources(t *testing.T) {
for _, kind := range ForNode(Config{}).Watches {
require.False(t, isHighVolumeResource(kind.Kind), "resource=%q", kind.Kind)
}
}
// testPack contains pack of
// services used for test run
type testPack struct {

410
lib/services/fanoutv2.go Normal file
View file

@ -0,0 +1,410 @@
/*
Copyright 2020 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package services
import (
"context"
"errors"
"sync"
"time"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/gravitational/teleport/api/internalutils/stream"
"github.com/gravitational/teleport/api/types"
fb "github.com/gravitational/teleport/lib/utils/fanoutbuffer"
)
var errFanoutReset = errors.New("event fanout system reset")
var errFanoutClosed = errors.New("event fanout system closed")
var errWatcherClosed = errors.New("event watcher closed")
type FanoutV2Config struct {
Capacity uint64
GracePeriod time.Duration
Clock clockwork.Clock
}
func (c *FanoutV2Config) SetDefaults() {
if c.Capacity == 0 {
c.Capacity = 1024
}
if c.GracePeriod == 0 {
// the most frequent periodic writes happen once per minute. a grace period of 59s is a
// reasonable default, since a cursor that can't catch up within 59s is likely to continue
// to fall further behind.
c.GracePeriod = 59 * time.Second
}
if c.Clock == nil {
c.Clock = clockwork.NewRealClock()
}
}
// FanoutV2 is a drop-in replacement for Fanout that offers a different set of performance characteristics. It
// supports variable-size buffers to better accommodate large spikes in event load, but it does so at the cost
// of higher levels of context-switching since all readers are notified of all events as well as higher baseline
// memory usage due to relying on a large shared buffer.
type FanoutV2 struct {
cfg FanoutV2Config
rw sync.RWMutex
buf *fb.Buffer[fanoutV2Entry]
init *fanoutV2Init
closed bool
}
// NewFanoutV2 allocates a new fanout instance.
func NewFanoutV2(cfg FanoutV2Config) *FanoutV2 {
cfg.SetDefaults()
f := &FanoutV2{
cfg: cfg,
}
f.setup()
return f
}
// NewStream gets a new event stream. The provided context will form the basis of
// the stream's close context. Note that streams *must* be explicitly closed when
// completed in order to avoid performance issues.
func (f *FanoutV2) NewStream(ctx context.Context, watch types.Watch) stream.Stream[types.Event] {
f.rw.RLock()
defer f.rw.RUnlock()
if f.closed {
return stream.Fail[types.Event](errFanoutClosed)
}
return &fanoutV2Stream{
closeContext: ctx,
cursor: f.buf.NewCursor(),
init: f.init,
watch: watch,
}
}
func (f *FanoutV2) NewWatcher(ctx context.Context, watch types.Watch) (types.Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
w := &streamWatcher{
cancel: cancel,
events: make(chan types.Event, 16),
// note that we don't use ctx.Done() because we want to wait until
// we've finished stream closure and extracted the resulting error
// before signaling watcher closure.
done: make(chan struct{}),
}
go w.run(ctx, f.NewStream(ctx, watch))
return w, nil
}
type streamWatcher struct {
cancel context.CancelFunc
events chan types.Event
done chan struct{}
emux sync.Mutex
err error
}
func (w *streamWatcher) run(ctx context.Context, stream stream.Stream[types.Event]) {
defer func() {
if err := stream.Done(); err != nil {
w.emux.Lock()
w.err = err
w.emux.Unlock()
}
close(w.done)
}()
for stream.Next() {
select {
case w.events <- stream.Item():
case <-ctx.Done():
return
}
}
}
func (w *streamWatcher) Events() <-chan types.Event {
return w.events
}
func (w *streamWatcher) Done() <-chan struct{} {
return w.done
}
func (w *streamWatcher) Close() error {
w.cancel()
return nil
}
func (w *streamWatcher) Error() error {
w.emux.Lock()
defer w.emux.Unlock()
if w.err != nil {
return w.err
}
select {
case <-w.Done():
return errWatcherClosed
default:
return nil
}
}
func (f *FanoutV2) Emit(events ...types.Event) {
f.rw.RLock()
defer f.rw.RUnlock()
if !f.init.isInit() {
panic("Emit called on uninitialized fanout instance")
}
if f.closed {
// emit racing with close is fairly common with how we
// use this type, so its best to ignore it.
return
}
// batch-process events to minimize the need to acquire the
// fanout buffer's write lock (batching writes has a non-trivial
// impact on fanout buffer benchmarks due to each cursor needing
// to acquire the read lock individually).
var ebuf [16]fanoutV2Entry
for len(events) > 0 {
n := min(len(events), len(ebuf))
for i := 0; i < n; i++ {
ebuf[i] = newFanoutV2Entry(events[i])
}
f.buf.Append(ebuf[:n]...)
events = events[n:]
}
}
func (f *FanoutV2) Reset() {
f.rw.Lock()
defer f.rw.Unlock()
if f.closed {
return
}
f.teardown(errFanoutReset)
f.setup()
}
func (f *FanoutV2) Close() error {
f.rw.Lock()
defer f.rw.Unlock()
if f.closed {
return nil
}
f.teardown(errFanoutClosed)
f.closed = true
return nil
}
func (f *FanoutV2) setup() {
f.init = newFanoutV2Init()
f.buf = fb.NewBuffer[fanoutV2Entry](fb.Config{
Capacity: f.cfg.Capacity,
GracePeriod: f.cfg.GracePeriod,
Clock: f.cfg.Clock,
})
}
func (f *FanoutV2) teardown(err error) {
f.init.setErr(err)
f.buf.Close()
}
func (f *FanoutV2) SetInit(kinds []types.WatchKind) {
f.rw.RLock()
defer f.rw.RUnlock()
km := make(map[resourceKind]types.WatchKind, len(kinds))
for _, kind := range kinds {
km[resourceKind{kind: kind.Kind, subKind: kind.SubKind}] = kind
}
f.init.setInit(km)
}
// fanoutV2Stream is a stream.Stream implementation that streams events from a FanoutV2 instance. It handles filtering
// out events that don't match the provided watch parameters, and construction of custom init events.
type fanoutV2Stream struct {
closeContext context.Context
cursor *fb.Cursor[fanoutV2Entry]
init *fanoutV2Init
watch types.Watch
rbuf [16]fanoutV2Entry
n, next int
event types.Event
err error
}
func (s *fanoutV2Stream) Next() (ok bool) {
if s.init != nil {
s.event, s.err = s.waitInit(s.closeContext)
s.init = nil
return s.err == nil
}
for {
// try finding the next matching event within read buffer
var ok bool
s.event, ok, s.err = s.advance()
if ok {
return true
}
// read a new batch of events into the read buffer
s.next = 0
s.n, s.err = s.cursor.Read(s.closeContext, s.rbuf[:])
if s.err != nil {
if errors.Is(s.err, fb.ErrBufferClosed) {
s.err = errFanoutReset
}
return false
}
}
}
func (s *fanoutV2Stream) Item() types.Event {
return s.event
}
func (s *fanoutV2Stream) Done() error {
s.cursor.Close()
return s.err
}
// waitInit waits for fanout initialization and builds an appropriate init event.
func (s *fanoutV2Stream) waitInit(ctx context.Context) (types.Event, error) {
confirmedKinds, err := s.init.wait(ctx)
if err != nil {
return types.Event{}, trace.Wrap(err)
}
validKinds := make([]types.WatchKind, 0, len(s.watch.Kinds))
for _, requested := range s.watch.Kinds {
k := resourceKind{kind: requested.Kind, subKind: requested.SubKind}
if configured, ok := confirmedKinds[k]; !ok || !configured.Contains(requested) {
if s.watch.AllowPartialSuccess {
continue
}
return types.Event{}, trace.BadParameter("resource type %q is not supported by this event stream", requested.Kind)
}
validKinds = append(validKinds, requested)
}
if len(validKinds) == 0 {
return types.Event{}, trace.BadParameter("none of the requested resources are supported by this fanoutWatcher")
}
return types.Event{Type: types.OpInit, Resource: types.NewWatchStatus(validKinds)}, nil
}
// advance advances through the stream's internal read buffer looking for the
// next event that matches our specific watch parameters.
func (f *fanoutV2Stream) advance() (event types.Event, ok bool, err error) {
for f.next < f.n {
entry := f.rbuf[f.next]
f.next++
for _, kind := range f.watch.Kinds {
match, err := kind.Matches(entry.Event)
if err != nil {
return types.Event{}, false, trace.Wrap(err)
}
if !match {
continue
}
if kind.LoadSecrets {
return entry.EventWithSecrets, true, nil
}
return entry.Event, true, nil
}
}
return types.Event{}, false, nil
}
// fanoutV2Entry is the underlying buffer entry that is fanned out to all
// cursors. Individual streams decide if they care about the version of the
// event with or without secrets based on their parameters.
type fanoutV2Entry struct {
Event types.Event
EventWithSecrets types.Event
}
func newFanoutV2Entry(event types.Event) fanoutV2Entry {
return fanoutV2Entry{
Event: filterEventSecrets(event),
EventWithSecrets: event,
}
}
// fanoutV2Init is a helper for blocking on and distributing the init event for a fanout
// instance. It uses a channel as both the init signal and a memory barrier to ensure
// good concurrent performance, and it is allocated behind a pointer so that it can be
// easily termianted and replaced during resets, ensuring that we don't need to handle
// edge-cases around old streams observing the wrong event/error.
type fanoutV2Init struct {
once sync.Once
ch chan struct{}
kinds map[resourceKind]types.WatchKind
err error
}
func newFanoutV2Init() *fanoutV2Init {
return &fanoutV2Init{
ch: make(chan struct{}),
}
}
func (i *fanoutV2Init) setInit(kinds map[resourceKind]types.WatchKind) {
i.once.Do(func() {
i.kinds = kinds
close(i.ch)
})
}
func (i *fanoutV2Init) setErr(err error) {
i.once.Do(func() {
i.err = err
close(i.ch)
})
}
func (i *fanoutV2Init) wait(ctx context.Context) (kinds map[resourceKind]types.WatchKind, err error) {
select {
case <-i.ch:
return i.kinds, i.err
case <-ctx.Done():
return nil, trace.Wrap(ctx.Err())
}
}
func (i *fanoutV2Init) isInit() bool {
select {
case <-i.ch:
return true
default:
return false
}
}

View file

@ -0,0 +1,150 @@
/*
Copyright 2020 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package services
import (
"context"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/gravitational/teleport/api/types"
)
// TestFanoutV2Init verifies that Init event is sent exactly once.
func TestFanoutV2Init(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f := NewFanoutV2(FanoutV2Config{})
w, err := f.NewWatcher(ctx, types.Watch{
Name: "test",
Kinds: []types.WatchKind{{Kind: "spam"}, {Kind: "eggs"}},
})
require.NoError(t, err)
f.SetInit([]types.WatchKind{{Kind: "spam"}, {Kind: "eggs"}})
select {
case e := <-w.Events():
require.Equal(t, types.OpInit, e.Type)
require.Equal(t, types.NewWatchStatus([]types.WatchKind{{Kind: "spam"}, {Kind: "eggs"}}), e.Resource)
case <-time.After(time.Second * 10):
t.Fatalf("Expected init event")
}
select {
case e := <-w.Events():
t.Fatalf("Unexpected second event: %+v", e)
case <-time.After(time.Millisecond * 200):
}
}
func TestFanoutV2StreamFiltering(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
f := NewFanoutV2(FanoutV2Config{})
stream := f.NewStream(ctx, types.Watch{
Name: "test",
Kinds: []types.WatchKind{{Kind: "spam"}},
})
f.SetInit([]types.WatchKind{{Kind: "spam"}, {Kind: "eggs"}})
require.True(t, stream.Next())
require.Equal(t, types.OpInit, stream.Item().Type)
require.Equal(t, types.NewWatchStatus([]types.WatchKind{{Kind: "spam"}}), stream.Item().Resource)
put := func(kind string) {
f.Emit(types.Event{Type: types.OpPut, Resource: &types.ResourceHeader{Kind: kind}})
}
put("spam")
put("eggs")
put("spam")
require.True(t, stream.Next())
require.Equal(t, "spam", stream.Item().Resource.GetKind())
require.True(t, stream.Next())
require.Equal(t, "spam", stream.Item().Resource.GetKind())
require.NoError(t, stream.Done())
}
func TestFanoutV2StreamOrdering(t *testing.T) {
const streams = 100
const events = 400
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
f := NewFanoutV2(FanoutV2Config{})
f.SetInit([]types.WatchKind{{Kind: "spam"}, {Kind: "eggs"}})
put := func(kind string) {
f.Emit(types.Event{Type: types.OpPut, Resource: &types.ResourceHeader{Kind: kind}})
}
results := make(chan []string, streams)
var inputs []string
for i := 0; i < events; i++ {
kind := "spam"
if rand.Int()%2 == 0 {
kind = "eggs"
}
inputs = append(inputs, kind)
}
for i := 0; i < streams; i++ {
stream := f.NewStream(ctx, types.Watch{
Name: "test",
Kinds: []types.WatchKind{{Kind: "spam"}, {Kind: "eggs"}},
})
require.True(t, stream.Next())
require.Equal(t, types.OpInit, stream.Item().Type)
go func() {
defer stream.Done()
var kinds []string
for stream.Next() {
kinds = append(kinds, stream.Item().Resource.GetKind())
if len(kinds) == events {
break
}
}
results <- kinds
}()
}
for _, k := range inputs {
put(k)
}
for i := 0; i < 100; i++ {
require.Equal(t, inputs, <-results)
}
}

View file

@ -14,26 +14,34 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package etcdbk
package utils
import "sync/atomic"
// roundRobin is a helper for distributing load across multiple resources in a round-robin
// fashion (used to implement simple client pooling).
type roundRobin[T any] struct {
ct *atomic.Uint64
// RoundRobin is a helper for distributing load across multiple resources in a round-robin
// fashion.
type RoundRobin[T any] struct {
ct atomic.Uint64
items []T
}
func newRoundRobin[T any](items []T) roundRobin[T] {
return roundRobin[T]{
ct: new(atomic.Uint64),
// NewRoundRobin creates a new round-robin inst
func NewRoundRobin[T any](items []T) *RoundRobin[T] {
return &RoundRobin[T]{
items: items,
}
}
func (r roundRobin[T]) Next() T {
// Next gets the next item that is up for use.
func (r *RoundRobin[T]) Next() T {
n := r.ct.Add(1) - 1
l := uint64(len(r.items))
return r.items[int(n%l)]
}
// ForEach applies the supplied closure to each item.
func (r *RoundRobin[T]) ForEach(fn func(T)) {
for _, item := range r.items {
fn(item)
}
}

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package etcdbk
package utils
import (
"sync"
@ -30,7 +30,7 @@ func TestRoundRobinConcurrent(t *testing.T) {
const workers = 100
const rounds = 100
rr := newRoundRobin([]bool{true, false})
rr := NewRoundRobin([]bool{true, false})
var tct atomic.Uint64
var fct atomic.Uint64
@ -99,7 +99,7 @@ func TestRoundRobinSequential(t *testing.T) {
}
for _, tt := range tts {
t.Run(tt.desc, func(t *testing.T) {
rr := newRoundRobin(tt.items)
rr := NewRoundRobin(tt.items)
for _, exp := range tt.expect {
require.Equal(t, exp, rr.Next())
}

View file

@ -23,6 +23,7 @@ import (
"os/signal"
"runtime"
"strconv"
"strings"
"sync/atomic"
"time"
@ -33,6 +34,7 @@ import (
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/cache"
"github.com/gravitational/teleport/lib/service/servicecfg"
"github.com/gravitational/teleport/lib/utils"
)
@ -43,12 +45,16 @@ type LoadtestCommand struct {
nodeHeartbeats *kingpin.CmdClause
watch *kingpin.CmdClause
count int
churn int
labels int
interval time.Duration
ttl time.Duration
concurrency int
kind string
}
// Initialize allows LoadtestCommand to plug itself into the CLI parser
@ -65,6 +71,9 @@ func (c *LoadtestCommand) Initialize(app *kingpin.Application, config *servicecf
c.nodeHeartbeats.Flag("concurrency", "Max concurrent requests").Default(
strconv.Itoa(runtime.NumCPU() * 16),
).IntVar(&c.concurrency)
c.watch = loadtest.Command("watch", "Monitor event stream").Hidden()
c.watch.Flag("kind", "Resource kind(s) to watch, e.g. --kind=node,user,role").StringVar(&c.kind)
}
// TryRun takes the CLI command as an argument (like "loadtest node-heartbeats") and executes it.
@ -72,6 +81,8 @@ func (c *LoadtestCommand) TryRun(ctx context.Context, cmd string, client auth.Cl
switch cmd {
case c.nodeHeartbeats.FullCommand():
err = c.NodeHeartbeats(ctx, client)
case c.watch.FullCommand():
err = c.Watch(ctx, client)
default:
return false, nil
}
@ -201,3 +212,80 @@ func (c *LoadtestCommand) NodeHeartbeats(ctx context.Context, client auth.Client
}
}
}
func (c *LoadtestCommand) Watch(ctx context.Context, client auth.ClientI) error {
var kinds []types.WatchKind
for _, kind := range strings.Split(c.kind, ",") {
kind = strings.TrimSpace(kind)
if kind == "" {
continue
}
kinds = append(kinds, types.WatchKind{
Kind: kind,
})
}
var allowPartialSuccess bool
if len(kinds) == 0 {
// use auth watch kinds by default
ccfg := cache.ForAuth(cache.Config{})
kinds = ccfg.Watches
allowPartialSuccess = true
}
watcher, err := client.NewWatcher(ctx, types.Watch{
Name: "tctl-watch",
Kinds: kinds,
AllowPartialSuccess: allowPartialSuccess,
})
if err != nil {
return trace.Wrap(err)
}
defer watcher.Close()
select {
case event := <-watcher.Events():
if event.Type != types.OpInit {
return trace.BadParameter("expected init event, got %v instead", event.Type)
}
var skinds []string
for _, k := range event.Resource.(types.WatchStatus).GetKinds() {
skinds = append(skinds, k.Kind)
}
fmt.Printf("INIT: %v\n", skinds)
case <-watcher.Done():
return trace.Errorf("failed to get init event: %v", watcher.Error())
}
for {
select {
case event := <-watcher.Events():
switch event.Type {
case types.OpPut:
printEvent("PUT", event.Resource)
case types.OpDelete:
printEvent("DEL", event.Resource)
default:
return trace.BadParameter("expected put or del event, got %v instead", event.Type)
}
case <-watcher.Done():
if ctx.Err() != nil {
// canceled by caller
return nil
}
return trace.Errorf("watcher exited unexpectedly: %v", watcher.Error())
}
}
}
func printEvent(ekind string, rsc types.Resource) {
if sk := rsc.GetSubKind(); sk != "" {
fmt.Printf("%s: %s/%s/%s\n", ekind, rsc.GetKind(), sk, rsc.GetName())
} else {
fmt.Printf("%s: %s/%s\n", ekind, rsc.GetKind(), rsc.GetName())
}
}