dashboard and detect dangling sessions, refs #931

* Improve Grafana dashboard and fix some metrics
* Add dectection for dangling sessions
* Add documentation for Perf Testing
This commit is contained in:
Sasha Klizhentas 2017-06-01 19:35:14 -07:00
parent e5d6faf482
commit a22e817db9
5 changed files with 135 additions and 38 deletions

View file

@ -55,3 +55,51 @@ gops stack $(pidof teleport) | python gops.py collect > /tmp/b
python gops.py diff /tmp/a /tmp/b
```
### Performance Testing
By default tsh bench does not create interactive sessions, but is using exec.
**Loging in**
You have to login before calling `tsh bench` using `tsh login`, otherwise
requests will fail.
**Non interactive mode**
E.g. this creates requests at a rate 10 requests per second
and uses a pool of 100 execution threads (goroutines in go) for 30 seconds
```bash
tsh bench --threads=100 --duration=300s --rate=10 localhost ls -l
```
**NOTE:** Algorithm does not apply backpressure if requests delay on purpose
(watch [this](https://www.infoq.com/presentations/latency-pitfalls) for more details about why).
In practice this means that you could pick a seemingly low rate value per second,
however it could trigger system outage because you will locate the system breaking
point and the amount of connections will blow up. Also times are measured from the point where
request was originated, and not dispacthed to the thread, so latency report is closer to
what real users will observe.
**Interactive mode**
This creates real interactive session, allocating PTY, calling `ls -l` and then `exit`:
```bash
tsh bench --interactive --threads=100 --duration=300s --rate=10 localhost ls -l
```
The performance difference is huge between interactive and non interactive modes.
**Debugging the debugger**
Sometimes it is useful to see how many gorotuines `tsh bench` produces itself,
you can launch it with `gops` endpoint. (Used by https://github.com/google/gops) tool
```bash
tsh --gops --gops-addr=127.0.0.1:4322 bench --threads=100 --duration=300s --rate=10 localhost ls -l
# then use gops tool to inspect
gops stack <pid>
```

File diff suppressed because one or more lines are too long

View file

@ -186,6 +186,9 @@ func (b *BoltBackend) DeleteBucket(path []string, bucket string) error {
func (b *BoltBackend) deleteBucket(buckets []string, bucket string) error {
return b.db.Update(func(tx *bolt.Tx) error {
if len(buckets) == 0 {
return boltErr(tx.DeleteBucket([]byte(bucket)))
}
bkt, err := GetBucket(tx, buckets)
if err != nil {
return trace.Wrap(err)

View file

@ -26,6 +26,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/codahale/hdrhistogram"
"github.com/gravitational/trace"
)
// Benchmark specifies benchmark requests to run
@ -38,6 +39,8 @@ type Benchmark struct {
Duration time.Duration
// Command is a command to run
Command []string
// Interactive turns on interactive sessions
Interactive bool
}
// BenchmarkResult is a result of the benchmark
@ -68,7 +71,16 @@ func (tc *TeleportClient) Benchmark(ctx context.Context, bench Benchmark) (*Benc
// create goroutines for concurrency
for i := 0; i < bench.Threads; i++ {
go benchmarkThread(i, ctx, tc, bench.Command, requestC, responseC)
thread := &benchmarkThread{
id: i,
ctx: ctx,
client: tc,
command: bench.Command,
interactive: bench.Interactive,
receiveC: requestC,
sendC: responseC,
}
go thread.run()
}
// producer goroutine
@ -101,10 +113,21 @@ func (tc *TeleportClient) Benchmark(ctx context.Context, bench Benchmark) (*Benc
result.Histogram = hdrhistogram.New(1, 60000, 3)
var doneThreads int
var timeoutC <-chan time.Time
doneC := ctx.Done()
for {
select {
case <-ctx.Done():
case <-timeoutC:
result.LastError = trace.BadParameter("several requests hang: timeout waiting for %v threads to finish", bench.Threads-doneThreads)
return &result, nil
case <-doneC:
// give it a couple of seconds to wrap up the goroutines,
// set up the timer that will fire up if the all goroutines were not finished
doneC = nil
waitTime := time.Duration(result.Histogram.Max()) * time.Millisecond
// going to wait latency + buffer to give requests in flight to wrap up
waitTime = time.Duration(1.2 * float64(waitTime))
timeoutC = time.After(waitTime)
case measure := <-responseC:
if measure.ThreadCompleted {
doneThreads += 1
@ -132,50 +155,70 @@ type benchMeasure struct {
Error error
}
func benchmarkThread(threadID int, ctx context.Context, tc *TeleportClient, command []string, receiveC chan *benchMeasure, sendC chan *benchMeasure) {
sendMeasure := func(measure *benchMeasure) {
measure.ThreadID = threadID
select {
case sendC <- measure:
default:
log.Warningf("blocked on measure send\n")
}
type benchmarkThread struct {
id int
ctx context.Context
client *TeleportClient
command []string
interactive bool
receiveC chan *benchMeasure
sendC chan *benchMeasure
}
func (b *benchmarkThread) execute(measure *benchMeasure) {
if !b.interactive {
measure.Error = b.client.SSH(b.ctx, nil, false)
measure.End = time.Now()
b.sendMeasure(measure)
return
}
config := b.client.Config
client, err := NewClient(&config)
reader, writer := io.Pipe()
client.Stdin = reader
out := &bytes.Buffer{}
client.Stdout = out
client.Stderr = out
if err != nil {
measure.Error = err
measure.End = time.Now()
b.sendMeasure(measure)
return
}
done := make(chan bool)
go func() {
measure.Error = b.client.SSH(b.ctx, nil, false)
measure.End = time.Now()
b.sendMeasure(measure)
close(done)
}()
writer.Write([]byte(strings.Join(b.command, " ") + "\r\nexit\r\n"))
<-done
}
func (b *benchmarkThread) sendMeasure(measure *benchMeasure) {
measure.ThreadID = b.id
select {
case b.sendC <- measure:
default:
log.Warningf("blocked on measure send\n")
}
}
func (b *benchmarkThread) run() {
defer func() {
if r := recover(); r != nil {
log.Warningf("recover from panic: %v", r)
sendMeasure(&benchMeasure{ThreadCompleted: true})
b.sendMeasure(&benchMeasure{ThreadCompleted: true})
}
}()
for {
select {
case measure := <-receiveC:
config := tc.Config
client, err := NewClient(&config)
reader, writer := io.Pipe()
client.Stdin = reader
out := &bytes.Buffer{}
client.Stdout = out
client.Stderr = out
if err != nil {
measure.Error = err
measure.End = time.Now()
sendMeasure(measure)
} else {
done := make(chan bool)
go func() {
measure.Error = client.SSH(ctx, nil, false)
measure.End = time.Now()
sendMeasure(measure)
close(done)
}()
writer.Write([]byte(strings.Join(command, " ") + "\r\nexit\r\n"))
<-done
}
case <-ctx.Done():
sendMeasure(&benchMeasure{
case measure := <-b.receiveC:
b.execute(measure)
case <-b.ctx.Done():
b.sendMeasure(&benchMeasure{
ThreadCompleted: true,
})
return

View file

@ -91,6 +91,8 @@ type CLIConf struct {
BenchDuration time.Duration
// BenchRate is a requests per second rate to mantain
BenchRate int
// BenchInteractive indicates that we should create interactive session
BenchInteractive bool
// Context is a context to control execution
Context context.Context
// Gops starts gops agent on a specified address
@ -170,6 +172,7 @@ func Run(args []string, underTest bool) {
bench.Flag("threads", "Concurrent threads to run").Default("10").IntVar(&cf.BenchThreads)
bench.Flag("duration", "Test duration").Default("1s").DurationVar(&cf.BenchDuration)
bench.Flag("rate", "Requests per second rate").Default("10").IntVar(&cf.BenchRate)
bench.Flag("interactive", "Create interactive SSH session").BoolVar(&cf.BenchInteractive)
// parse CLI commands+flags:
command, err := app.Parse(args)