Push CAs after establishing watchers in remoteSite. (#13895)

Update CAs on connection in remoteSite
This commit is contained in:
Edoardo Spadolini 2022-06-30 21:25:54 +02:00 committed by GitHub
parent 69fd10e9a5
commit f3ceca0278
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 74 additions and 31 deletions

View file

@ -60,8 +60,8 @@ type CertAuthID struct {
DomainName string `json:"domain_name"`
}
func (c *CertAuthID) String() string {
return fmt.Sprintf("CA(type=%v, domain=%v)", c.Type, c.DomainName)
func (c CertAuthID) String() string {
return fmt.Sprintf("CA(type=%q, domain=%q)", c.Type, c.DomainName)
}
// Check returns error if any of the id parameters are bad, nil otherwise

View file

@ -458,9 +458,8 @@ func (s *remoteSite) compareAndSwapCertAuthority(ca types.CertAuthority) error {
func (s *remoteSite) updateCertAuthorities(retry utils.Retry, remoteWatcher *services.CertAuthorityWatcher, remoteVersion string) {
defer remoteWatcher.Close()
cas := make(map[types.CertAuthType]types.CertAuthority)
for {
err := s.watchCertAuthorities(remoteWatcher, remoteVersion, cas)
err := s.watchCertAuthorities(remoteWatcher, remoteVersion)
if err != nil {
switch {
case trace.IsNotFound(err):
@ -487,7 +486,7 @@ func (s *remoteSite) updateCertAuthorities(retry utils.Retry, remoteWatcher *ser
}
}
func (s *remoteSite) watchCertAuthorities(remoteWatcher *services.CertAuthorityWatcher, remoteVersion string, cas map[types.CertAuthType]types.CertAuthority) error {
func (s *remoteSite) watchCertAuthorities(remoteWatcher *services.CertAuthorityWatcher, remoteVersion string) error {
targets, err := s.getLocalWatchedCerts(remoteVersion)
if err != nil {
return trace.Wrap(err)
@ -519,6 +518,63 @@ func (s *remoteSite) watchCertAuthorities(remoteWatcher *services.CertAuthorityW
}
}()
localCAs := make(map[types.CertAuthType]types.CertAuthority, len(targets))
for _, t := range targets {
caID := types.CertAuthID{
Type: t.Type,
DomainName: t.ClusterName,
}
ca, err := s.localAccessPoint.GetCertAuthority(s.ctx, caID, false)
if err != nil {
return trace.Wrap(err, "failed to get local cert authority")
}
if err := s.remoteClient.RotateExternalCertAuthority(s.ctx, ca); err != nil {
return trace.Wrap(err, "failed to push local cert authority")
}
s.Debugf("Pushed local cert authority %v", caID.String())
localCAs[t.Type] = ca
}
remoteCA, err := s.remoteAccessPoint.GetCertAuthority(s.ctx, types.CertAuthID{
Type: types.HostCA,
DomainName: s.domainName,
}, false)
if err != nil {
return trace.Wrap(err, "failed to get remote cert authority")
}
if remoteCA.GetName() != s.domainName || remoteCA.GetType() != types.HostCA {
return trace.BadParameter("received wrong CA, expected remote host CA, got %v", remoteCA.GetID())
}
maybeUpsertRemoteCA := func(remoteCA types.CertAuthority) error {
oldRemoteCA, err := s.localAccessPoint.GetCertAuthority(s.ctx, types.CertAuthID{
Type: types.HostCA,
DomainName: remoteCA.GetClusterName(),
}, false)
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
}
// if CA is changed or does not exist, update backend
if err != nil || !services.CertAuthoritiesEquivalent(oldRemoteCA, remoteCA) {
s.Debugf("Ingesting remote cert authority %v", remoteCA.GetID())
if err := s.localClient.UpsertCertAuthority(remoteCA); err != nil {
return trace.Wrap(err)
}
}
// keep track of when the remoteSite needs to reconnect
if err := s.compareAndSwapCertAuthority(remoteCA); err != nil {
return trace.Wrap(err)
}
return nil
}
if err := maybeUpsertRemoteCA(remoteCA); err != nil {
return trace.Wrap(err)
}
s.Debugf("Watching for cert authority changes.")
for {
select {
@ -534,24 +590,27 @@ func (s *remoteSite) watchCertAuthorities(remoteWatcher *services.CertAuthorityW
case evt := <-localWatch.Events():
switch evt.Type {
case types.OpPut:
localCA, ok := evt.Resource.(types.CertAuthority)
newCA, ok := evt.Resource.(types.CertAuthority)
if !ok {
continue
}
ca, ok := cas[localCA.GetType()]
if ok && services.CertAuthoritiesEquivalent(ca, localCA) {
previousCA, ok := localCAs[newCA.GetType()]
if ok && services.CertAuthoritiesEquivalent(previousCA, newCA) {
continue
}
// clone to prevent a race with watcher filtering
localCA = localCA.Clone()
if err := s.remoteClient.RotateExternalCertAuthority(s.ctx, localCA); err != nil {
// clone to prevent a race with watcher filtering, as
// RotateExternalCertAuthority (client side) will end up calling
// CheckAndSetDefaults
// TODO(espadolini): figure out who should be responsible for validating the CA *once*
newCA = newCA.Clone()
if err := s.remoteClient.RotateExternalCertAuthority(s.ctx, newCA); err != nil {
log.WithError(err).Warn("Failed to rotate external ca")
return trace.Wrap(err)
}
cas[localCA.GetType()] = localCA
localCAs[newCA.GetType()] = newCA
}
case evt := <-remoteWatch.Events():
switch evt.Type {
@ -561,24 +620,9 @@ func (s *remoteSite) watchCertAuthorities(remoteWatcher *services.CertAuthorityW
continue
}
oldRemoteCA, err := s.localClient.GetCertAuthority(s.ctx, types.CertAuthID{
Type: types.HostCA,
DomainName: remoteCA.GetClusterName(),
}, false)
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
}
// if CA is changed or does not exist, update backend
if err != nil || !services.CertAuthoritiesEquivalent(oldRemoteCA, remoteCA) {
if err := s.localClient.UpsertCertAuthority(remoteCA); err != nil {
return trace.Wrap(err)
}
}
// always update our local reference to the cert authority
if err := s.compareAndSwapCertAuthority(remoteCA); err != nil {
// the CA might not be trusted but the watcher's fanout logic is
// local, so this is ok
if err := maybeUpsertRemoteCA(remoteCA); err != nil {
return trace.Wrap(err)
}
}
@ -806,7 +850,6 @@ func UseTunnel(logger *log.Logger, c *sshutils.ChConn) bool {
}
func (s *remoteSite) connThroughTunnel(req *sshutils.DialReq) (*sshutils.ChConn, error) {
s.Debugf("Requesting connection to %v [%v] in remote cluster.",
req.Address, req.ServerID)