add benchmark tool and fix concurrent DeleteBucket

* Add bench tool and library functions
* Fix concurrent DeleteBucket problem refs #931
This commit is contained in:
Sasha Klizhentas 2017-04-30 16:25:17 -07:00
parent b8d72bfe42
commit 7d741b10a9
4 changed files with 257 additions and 4 deletions

View file

@ -1,5 +1,5 @@
/*
Copyright 2016 Gravitational, Inc.
Copyright 2016-2017 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -21,6 +21,7 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"time"
"github.com/gravitational/teleport/lib/backend"
@ -195,8 +196,43 @@ func (bk *Backend) DeleteKey(bucket []string, key string) error {
// DeleteBucket deletes the bucket by a given path
func (bk *Backend) DeleteBucket(parent []string, bucket string) error {
return trace.ConvertSystemError(os.RemoveAll(
path.Join(path.Join(bk.RootDir, path.Join(parent...)), bucket)))
return removeFiles(path.Join(path.Join(bk.RootDir, path.Join(parent...)), bucket))
}
// removeFiles removes files from the directory non-recursively
// we need this function because os.RemoveAll does not work
// on concurrent requests - can produce directory not empty
// error, because someone could create a new file in the directory
func removeFiles(dir string) error {
d, err := os.Open(dir)
if err != nil {
return trace.ConvertSystemError(err)
}
defer d.Close()
names, err := d.Readdirnames(-1)
if err != nil {
return trace.ConvertSystemError(err)
}
for _, name := range names {
path := filepath.Join(dir, name)
fi, err := os.Stat(path)
if err != nil {
err = trace.ConvertSystemError(err)
if !trace.IsNotFound(err) {
return err
}
}
if !fi.IsDir() {
err = os.Remove(path)
if err != nil {
err = trace.ConvertSystemError(err)
if !trace.IsNotFound(err) {
return err
}
}
}
}
return nil
}
// AcquireLock grabs a lock that will be released automatically in TTL

154
lib/client/bench.go Normal file
View file

@ -0,0 +1,154 @@
/*
Copyright 2017 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"io/ioutil"
"time"
log "github.com/Sirupsen/logrus"
"github.com/codahale/hdrhistogram"
)
// Benchmark specifies benchmark requests to run
type Benchmark struct {
// Threads is amount of concurrent execution threads to run
Threads int
// Rate is requests per second origination rate
Rate int
// Duration is test duration
Duration time.Duration
// Command is a command to run
Command []string
}
// BenchmarkResult is a result of the benchmark
type BenchmarkResult struct {
// RequestsOriginated is amount of reuqests originated
RequestsOriginated int
// RequestsFailed is amount of requests failed
RequestsFailed int
// Histogram is a duration histogram
Histogram *hdrhistogram.Histogram
}
func (tc *TeleportClient) Benchmark(ctx context.Context, bench Benchmark) (*BenchmarkResult, error) {
tc.Stdout = ioutil.Discard
tc.Stderr = ioutil.Discard
ctx, cancel := context.WithTimeout(ctx, bench.Duration)
defer cancel()
requestC := make(chan *benchMeasure)
responseC := make(chan *benchMeasure, bench.Threads)
// create goroutines for concurrency
for i := 0; i < bench.Threads; i++ {
go benchmarkThread(i, ctx, tc, bench.Command, requestC, responseC)
}
// producer thread
go func() {
interval := time.Duration(float64(1) / float64(bench.Rate) * float64(time.Second))
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// notice how we star the timer regardless of whether thread can process it
// this is to account for coordinated omission
measure := &benchMeasure{
Start: time.Now(),
}
select {
case requestC <- measure:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
var result BenchmarkResult
// from one millisecond to 60000 milliseconds (minute)
result.Histogram = hdrhistogram.New(1, 60000, 3)
var doneThreads int
for {
select {
case <-ctx.Done():
return &result, nil
case measure := <-responseC:
if measure.ThreadCompleted {
doneThreads += 1
if doneThreads == bench.Threads {
return &result, nil
}
} else {
if measure.Error != nil {
result.RequestsFailed += 1
}
result.RequestsOriginated += 1
result.Histogram.RecordValue(int64(measure.End.Sub(measure.Start) / time.Millisecond))
}
}
}
}
type benchMeasure struct {
Start time.Time
End time.Time
ThreadCompleted bool
ThreadID int
Error error
}
func benchmarkThread(threadID int, ctx context.Context, tc *TeleportClient, command []string, receiveC chan *benchMeasure, sendC chan *benchMeasure) {
sendMeasure := func(measure *benchMeasure) {
measure.ThreadID = threadID
select {
case sendC <- measure:
default:
log.Warningf("blocked on measure send\n")
}
}
defer func() {
if r := recover(); r != nil {
log.Warningf("recover from panic: %v", r)
sendMeasure(&benchMeasure{ThreadCompleted: true})
}
}()
for {
select {
case measure := <-receiveC:
err := tc.SSH(ctx, command, false)
measure.Error = err
measure.End = time.Now()
sendMeasure(measure)
case <-ctx.Done():
sendMeasure(&benchMeasure{
ThreadCompleted: true,
})
return
}
}
}

View file

@ -512,7 +512,7 @@ func (s *Server) isAuthority(cert ssh.PublicKey) bool {
// find cert authority by it's key
cas, err := s.authService.GetCertAuthorities(services.UserCA, false)
if err != nil {
log.Warningf("%v", err)
log.Warningf("%v", trace.DebugReport(err))
return false
}

View file

@ -20,8 +20,10 @@ import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/gravitational/teleport/lib/client"
@ -82,6 +84,14 @@ type CLIConf struct {
// then exit. This is useful when calling tsh agent from a script (for example ~/.bash_profile)
// to load keys into your system agent.
LoadSystemAgentOnly bool
// BenchThreads is amount of concurrent threads to run
BenchThreads int
// BenchDuration is a duration for the benchmark
BenchDuration time.Duration
// BenchRate is a requests per second rate to mantain
BenchRate int
// Context is a context to control execution
Context context.Context
}
// Run executes TSH client. same as main() but easier to test
@ -144,6 +154,15 @@ func Run(args []string, underTest bool) {
// logout deletes obtained session certificates in ~/.tsh
logout := app.Command("logout", "Delete a cluster certificate")
// bench
bench := app.Command("bench", "Run shell or execute a command on a remote SSH node")
bench.Arg("[user@]host", "Remote hostname and the login to use").Required().StringVar(&cf.UserHost)
bench.Arg("command", "Command to execute on a remote host").Required().StringsVar(&cf.RemoteCommand)
bench.Flag("port", "SSH port on a remote host").Short('p').Int16Var(&cf.NodePort)
bench.Flag("threads", "Concurrent threads to run").Default("10").IntVar(&cf.BenchThreads)
bench.Flag("duration", "Test duration").Default("1s").DurationVar(&cf.BenchDuration)
bench.Flag("rate", "Requests per second rate").Default("10").IntVar(&cf.BenchRate)
// parse CLI commands+flags:
command, err := app.Parse(args)
if err != nil {
@ -155,11 +174,26 @@ func Run(args []string, underTest bool) {
utils.InitLogger(utils.LoggingForCLI, logrus.DebugLevel)
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
exitSignals := make(chan os.Signal, 1)
signal.Notify(exitSignals, syscall.SIGTERM, syscall.SIGINT)
select {
case sig := <-exitSignals:
logrus.Debugf("signal: %v", sig)
cancel()
}
}()
cf.Context = ctx
switch command {
case ver.FullCommand():
onVersion()
case ssh.FullCommand():
onSSH(&cf)
case bench.FullCommand():
onBenchmark(&cf)
case join.FullCommand():
onJoin(&cf)
case scp.FullCommand():
@ -315,6 +349,35 @@ func onSSH(cf *CLIConf) {
}
}
// onBenchmark executes benchmark
func onBenchmark(cf *CLIConf) {
tc, err := makeClient(cf, false)
if err != nil {
utils.FatalError(err)
}
result, err := tc.Benchmark(cf.Context, client.Benchmark{
Command: cf.RemoteCommand,
Threads: cf.BenchThreads,
Duration: cf.BenchDuration,
Rate: cf.BenchRate,
})
if err != nil {
fmt.Fprintln(os.Stderr, utils.UserMessageFromError(err))
os.Exit(255)
}
fmt.Printf("\n")
fmt.Printf("* Requests originated: %v\n", result.RequestsOriginated)
fmt.Printf("* Requests failed: %v\n", result.RequestsFailed)
fmt.Printf("\nHistogram\n\n")
t := goterm.NewTable(0, 10, 5, ' ', 0)
printHeader(t, []string{"Percentile", "Duration"})
for _, quantile := range []float64{25, 50, 75, 90, 95, 99, 100} {
fmt.Fprintf(t, "%v\t%v ms\n", quantile, result.Histogram.ValueAtQuantile(quantile))
}
fmt.Fprintf(os.Stdout, t.String())
fmt.Printf("\n")
}
// onJoin executes 'ssh join' command
func onJoin(cf *CLIConf) {
tc, err := makeClient(cf, true)