mirror of
https://github.com/gravitational/teleport
synced 2024-10-20 01:03:40 +00:00
Reduce latency of tsh ls -R
(#19438)
* Reduce latency of `tsh ls -R` Listing nodes across clusters was done one cluster at a time. To improve latency the same mechanism used by `tsh db ls -R` was copied to ensure listing happens in parallel with an upper limit.
This commit is contained in:
parent
3f52d028fb
commit
66b65dd2d7
|
@ -46,6 +46,7 @@ import (
|
|||
oteltrace "go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/crypto/ssh"
|
||||
"golang.org/x/exp/slices"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/gravitational/teleport"
|
||||
"github.com/gravitational/teleport/api/client/proto"
|
||||
|
@ -1930,28 +1931,70 @@ func (l nodeListings) Swap(i, j int) {
|
|||
}
|
||||
|
||||
func listNodesAllClusters(cf *CLIConf) error {
|
||||
var listings nodeListings
|
||||
// Fetch database listings for profiles in parallel. Set an arbitrary limit
|
||||
// just in case.
|
||||
group, groupCtx := errgroup.WithContext(cf.Context)
|
||||
group.SetLimit(4)
|
||||
|
||||
err := forEachProfile(cf, func(tc *client.TeleportClient, profile *client.ProfileStatus) error {
|
||||
result, err := tc.ListNodesWithFiltersAllClusters(cf.Context)
|
||||
if err != nil {
|
||||
return trace.Wrap(err)
|
||||
}
|
||||
for clusterName, nodes := range result {
|
||||
for _, node := range nodes {
|
||||
listings = append(listings, nodeListing{
|
||||
Proxy: profile.ProxyURL.Host,
|
||||
Cluster: clusterName,
|
||||
Node: node,
|
||||
})
|
||||
nodeListingsResultChan := make(chan nodeListings)
|
||||
nodeListingsCollectChan := make(chan nodeListings)
|
||||
go func() {
|
||||
var listings nodeListings
|
||||
for {
|
||||
select {
|
||||
case items := <-nodeListingsResultChan:
|
||||
listings = append(listings, items...)
|
||||
case <-groupCtx.Done():
|
||||
nodeListingsCollectChan <- listings
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
err := forEachProfile(cf, func(tc *client.TeleportClient, profile *client.ProfileStatus) error {
|
||||
group.Go(func() error {
|
||||
proxy, err := tc.ConnectToProxy(groupCtx)
|
||||
if err != nil {
|
||||
return trace.Wrap(err)
|
||||
}
|
||||
defer proxy.Close()
|
||||
|
||||
sites, err := proxy.GetSites(groupCtx)
|
||||
if err != nil {
|
||||
return trace.Wrap(err)
|
||||
}
|
||||
|
||||
var listings nodeListings
|
||||
for _, site := range sites {
|
||||
nodes, err := proxy.FindNodesByFiltersForCluster(groupCtx, *tc.DefaultResourceFilter(), site.Name)
|
||||
if err != nil {
|
||||
return trace.Wrap(err)
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
listings = append(listings, nodeListing{
|
||||
Proxy: profile.ProxyURL.Host,
|
||||
Cluster: site.Name,
|
||||
Node: node,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
nodeListingsCollectChan <- listings
|
||||
return nil
|
||||
})
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return trace.Wrap(err)
|
||||
}
|
||||
|
||||
if err := group.Wait(); err != nil {
|
||||
return trace.Wrap(err)
|
||||
}
|
||||
|
||||
listings := <-nodeListingsResultChan
|
||||
sort.Sort(listings)
|
||||
|
||||
format := strings.ToLower(cf.Format)
|
||||
|
|
Loading…
Reference in a new issue