mirror of
https://github.com/gravitational/teleport
synced 2024-10-19 08:43:58 +00:00
single auth client per instance (#30384)
This commit is contained in:
parent
94562cf023
commit
47414c7aae
|
@ -329,5 +329,5 @@ func waitForDatabases(t *testing.T, authServer *auth.Server, dbNames []string) {
|
|||
}
|
||||
}
|
||||
return registered == len(dbNames)
|
||||
}, 10*time.Second, 100*time.Millisecond)
|
||||
}, 30*time.Second, 100*time.Millisecond)
|
||||
}
|
||||
|
|
|
@ -417,7 +417,7 @@ func MakeTestServers(t *testing.T) (auth *service.TeleportProcess, proxy *servic
|
|||
})
|
||||
|
||||
// Wait for proxy to become ready.
|
||||
_, err = proxy.WaitForEventTimeout(10*time.Second, service.ProxyWebServerReady)
|
||||
_, err = proxy.WaitForEventTimeout(30*time.Second, service.ProxyWebServerReady)
|
||||
require.NoError(t, err, "proxy web server didn't start after 10s")
|
||||
|
||||
return auth, proxy, provisionToken
|
||||
|
@ -438,6 +438,7 @@ func MakeTestDatabaseServer(t *testing.T, proxyAddr utils.NetAddr, token string,
|
|||
cfg.SetToken(token)
|
||||
cfg.SSH.Enabled = false
|
||||
cfg.Auth.Enabled = false
|
||||
cfg.Proxy.Enabled = false
|
||||
cfg.Databases.Enabled = true
|
||||
cfg.Databases.Databases = dbs
|
||||
cfg.Databases.ResourceMatchers = resMatchers
|
||||
|
@ -452,7 +453,7 @@ func MakeTestDatabaseServer(t *testing.T, proxyAddr utils.NetAddr, token string,
|
|||
})
|
||||
|
||||
// Wait for database agent to start.
|
||||
_, err = db.WaitForEventTimeout(10*time.Second, service.DatabasesReady)
|
||||
_, err = db.WaitForEventTimeout(30*time.Second, service.DatabasesReady)
|
||||
require.NoError(t, err, "database server didn't start after 10s")
|
||||
|
||||
return db
|
||||
|
|
|
@ -1047,6 +1047,14 @@ func (process *TeleportProcess) rotate(conn *Connector, localState auth.StateV2,
|
|||
// For config v1 and v2, it will attempt to direct dial the auth server, and fallback to trying to tunnel
|
||||
// to the Auth Server through the proxy.
|
||||
func (process *TeleportProcess) newClient(identity *auth.Identity) (*auth.Client, error) {
|
||||
if identity.ID.Role != types.RoleInstance {
|
||||
clt, ok := process.waitForInstanceClient()
|
||||
if !ok {
|
||||
return nil, trace.Errorf("failed to get instance client for identity %q", identity.ID.Role)
|
||||
}
|
||||
return clt, nil
|
||||
}
|
||||
|
||||
tlsConfig, err := identity.TLSConfig(process.Config.CipherSuites)
|
||||
if err != nil {
|
||||
return nil, trace.Wrap(err)
|
||||
|
|
|
@ -347,6 +347,12 @@ type TeleportProcess struct {
|
|||
// and may not exist for some time if cert migrations are necessary.
|
||||
instanceClient *auth.Client
|
||||
|
||||
// instanceClientReady is closed when the isntance client becomes available.
|
||||
instanceClientReady chan struct{}
|
||||
|
||||
// instanceClientReadyOnce protects instanceClientReady from double-close.
|
||||
instanceClientReadyOnce sync.Once
|
||||
|
||||
// instanceRoles is the collection of enabled service roles (excludes things like "admin"
|
||||
// and "instance" which aren't true user-facing services). The values in this mapping are
|
||||
// the names of the associated identity events for these roles.
|
||||
|
@ -920,6 +926,7 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) {
|
|||
Clock: cfg.Clock,
|
||||
Supervisor: supervisor,
|
||||
Config: cfg,
|
||||
instanceClientReady: make(chan struct{}),
|
||||
instanceRoles: make(map[types.SystemRole]string),
|
||||
Identities: make(map[types.SystemRole]*auth.Identity),
|
||||
connectors: make(map[types.SystemRole]*Connector),
|
||||
|
@ -1079,6 +1086,13 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) {
|
|||
warnOnErr(process.closeImportedDescriptors(teleport.ComponentAuth), process.log)
|
||||
}
|
||||
|
||||
// initInstance initializes the pseudo-service "Instance" that is active for all teleport
|
||||
// instances. All other services inherit their auth client from the "Instance" service, so
|
||||
// we initialize it immediately after auth in order to ensure timely client availability.
|
||||
if err := process.initInstance(); err != nil {
|
||||
return nil, trace.Wrap(err)
|
||||
}
|
||||
|
||||
if cfg.SSH.Enabled {
|
||||
if err := process.initSSH(); err != nil {
|
||||
return nil, err
|
||||
|
@ -1168,12 +1182,6 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) {
|
|||
return nil, trace.BadParameter("all services failed to start")
|
||||
}
|
||||
|
||||
// initInstance initializes the pseudo-service "Instance" that is active for all teleport
|
||||
// instances. We activate this last because it cares about which other services are active.
|
||||
if err := process.initInstance(); err != nil {
|
||||
return nil, trace.Wrap(err)
|
||||
}
|
||||
|
||||
// create the new pid file only after started successfully
|
||||
if cfg.PIDFile != "" {
|
||||
f, err := os.OpenFile(cfg.PIDFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o666)
|
||||
|
@ -1245,8 +1253,11 @@ func (process *TeleportProcess) getLocalAuth() *auth.Server {
|
|||
|
||||
func (process *TeleportProcess) setInstanceClient(clt *auth.Client) {
|
||||
process.Lock()
|
||||
defer process.Unlock()
|
||||
process.instanceClient = clt
|
||||
process.Unlock()
|
||||
process.instanceClientReadyOnce.Do(func() {
|
||||
close(process.instanceClientReady)
|
||||
})
|
||||
}
|
||||
|
||||
func (process *TeleportProcess) getInstanceClient() *auth.Client {
|
||||
|
@ -1255,6 +1266,19 @@ func (process *TeleportProcess) getInstanceClient() *auth.Client {
|
|||
return process.instanceClient
|
||||
}
|
||||
|
||||
// waitForInstanceClient waits for the instance client to become available. Instance client is only available if at least
|
||||
// one non-auth service is registered. Auth-only instances cannot use the instance client because auth servers need to
|
||||
// be able to fully initialize without a valid CA in order to support HSMs.
|
||||
func (process *TeleportProcess) waitForInstanceClient() (clt *auth.Client, ok bool) {
|
||||
select {
|
||||
case <-process.instanceClientReady:
|
||||
clt = process.getInstanceClient()
|
||||
return clt, clt != nil
|
||||
case <-process.ExitContext().Done():
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
|
||||
// makeInventoryControlStreamWhenReady is the same as makeInventoryControlStream except that it blocks until
|
||||
// the InstanceReady event is emitted.
|
||||
func (process *TeleportProcess) makeInventoryControlStreamWhenReady(ctx context.Context) (client.DownstreamInventoryControlStream, error) {
|
||||
|
@ -2333,8 +2357,16 @@ func (process *TeleportProcess) NewAsyncEmitter(clt apievents.Emitter) (*events.
|
|||
|
||||
// initInstance initializes the pseudo-service "Instance" that is active on all teleport instances.
|
||||
func (process *TeleportProcess) initInstance() error {
|
||||
if process.Config.Auth.Enabled {
|
||||
// if we have a local auth server, we cannot create an instance client without breaking HSM rotation.
|
||||
var hasNonAuthRole bool
|
||||
for _, role := range process.getInstanceRoles() {
|
||||
if role != types.RoleAuth {
|
||||
hasNonAuthRole = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if process.Config.Auth.Enabled && !hasNonAuthRole {
|
||||
// if we have a local auth server and no other services, we cannot create an instance client without breaking HSM rotation.
|
||||
// instance control stream will be created via in-memory pipe, but until this limitation is resolved
|
||||
// or a fully in-memory instance client is implemented, we cannot rely on the instance client existing
|
||||
// for purposes other than the control stream.
|
||||
|
@ -2657,10 +2689,12 @@ func (process *TeleportProcess) RegisterWithAuthServer(role types.SystemRole, ev
|
|||
// the registerExpectedServices function.
|
||||
process.log.Errorf("Register called for unexpected instance role %q (this is a bug).", role)
|
||||
}
|
||||
|
||||
connector, err := process.reconnectToAuthService(role)
|
||||
if err != nil {
|
||||
return trace.Wrap(err)
|
||||
}
|
||||
|
||||
process.BroadcastEvent(Event{Name: eventName, Payload: connector})
|
||||
return nil
|
||||
})
|
||||
|
|
|
@ -868,7 +868,7 @@ func testVersionCheck(t *testing.T, nodeCfg *servicecfg.Config, skipVersionCheck
|
|||
nodeProc, err := NewTeleport(nodeCfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
c, err := nodeProc.reconnectToAuthService(types.RoleNode)
|
||||
c, err := nodeProc.reconnectToAuthService(types.RoleInstance)
|
||||
if skipVersionCheck {
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, c)
|
||||
|
|
|
@ -454,7 +454,11 @@ func (s *LocalSupervisor) WaitForEvent(ctx context.Context, name string) (Event,
|
|||
func (s *LocalSupervisor) WaitForEventTimeout(timeout time.Duration, name string) (Event, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
return s.WaitForEvent(ctx, name)
|
||||
event, err := s.WaitForEvent(ctx, name)
|
||||
if err != nil && ctx.Err() != nil {
|
||||
return event, trace.Errorf("timeout waiting for event %q (%s)", name, timeout)
|
||||
}
|
||||
return event, trace.Wrap(err)
|
||||
}
|
||||
|
||||
func (s *LocalSupervisor) ListenForEvents(ctx context.Context, name string, eventC chan<- Event) {
|
||||
|
|
Loading…
Reference in a new issue