From 7d741b10a9e7e23bdbf3becbbaff8c7e1cedc515 Mon Sep 17 00:00:00 2001 From: Sasha Klizhentas Date: Sun, 30 Apr 2017 16:25:17 -0700 Subject: [PATCH] add benchmark tool and fix concurrent DeleteBucket * Add bench tool and library functions * Fix concurrent DeleteBucket problem refs #931 --- lib/backend/dir/impl.go | 42 ++++++++++- lib/client/bench.go | 154 ++++++++++++++++++++++++++++++++++++++++ lib/srv/sshserver.go | 2 +- tool/tsh/common/tsh.go | 63 ++++++++++++++++ 4 files changed, 257 insertions(+), 4 deletions(-) create mode 100644 lib/client/bench.go diff --git a/lib/backend/dir/impl.go b/lib/backend/dir/impl.go index 4c4f3e05076..ca614ac5a2c 100644 --- a/lib/backend/dir/impl.go +++ b/lib/backend/dir/impl.go @@ -1,5 +1,5 @@ /* -Copyright 2016 Gravitational, Inc. +Copyright 2016-2017 Gravitational, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import ( "io/ioutil" "os" "path" + "path/filepath" "time" "github.com/gravitational/teleport/lib/backend" @@ -195,8 +196,43 @@ func (bk *Backend) DeleteKey(bucket []string, key string) error { // DeleteBucket deletes the bucket by a given path func (bk *Backend) DeleteBucket(parent []string, bucket string) error { - return trace.ConvertSystemError(os.RemoveAll( - path.Join(path.Join(bk.RootDir, path.Join(parent...)), bucket))) + return removeFiles(path.Join(path.Join(bk.RootDir, path.Join(parent...)), bucket)) +} + +// removeFiles removes files from the directory non-recursively +// we need this function because os.RemoveAll does not work +// on concurrent requests - can produce directory not empty +// error, because someone could create a new file in the directory +func removeFiles(dir string) error { + d, err := os.Open(dir) + if err != nil { + return trace.ConvertSystemError(err) + } + defer d.Close() + names, err := d.Readdirnames(-1) + if err != nil { + return trace.ConvertSystemError(err) + } + for _, name := range names { + path := filepath.Join(dir, name) + fi, err := os.Stat(path) + if err != nil { + err = trace.ConvertSystemError(err) + if !trace.IsNotFound(err) { + return err + } + } + if !fi.IsDir() { + err = os.Remove(path) + if err != nil { + err = trace.ConvertSystemError(err) + if !trace.IsNotFound(err) { + return err + } + } + } + } + return nil } // AcquireLock grabs a lock that will be released automatically in TTL diff --git a/lib/client/bench.go b/lib/client/bench.go new file mode 100644 index 00000000000..c7162c3a89c --- /dev/null +++ b/lib/client/bench.go @@ -0,0 +1,154 @@ +/* +Copyright 2017 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + "io/ioutil" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/codahale/hdrhistogram" +) + +// Benchmark specifies benchmark requests to run +type Benchmark struct { + // Threads is amount of concurrent execution threads to run + Threads int + // Rate is requests per second origination rate + Rate int + // Duration is test duration + Duration time.Duration + // Command is a command to run + Command []string +} + +// BenchmarkResult is a result of the benchmark +type BenchmarkResult struct { + // RequestsOriginated is amount of reuqests originated + RequestsOriginated int + // RequestsFailed is amount of requests failed + RequestsFailed int + // Histogram is a duration histogram + Histogram *hdrhistogram.Histogram +} + +func (tc *TeleportClient) Benchmark(ctx context.Context, bench Benchmark) (*BenchmarkResult, error) { + tc.Stdout = ioutil.Discard + tc.Stderr = ioutil.Discard + + ctx, cancel := context.WithTimeout(ctx, bench.Duration) + defer cancel() + + requestC := make(chan *benchMeasure) + responseC := make(chan *benchMeasure, bench.Threads) + + // create goroutines for concurrency + for i := 0; i < bench.Threads; i++ { + go benchmarkThread(i, ctx, tc, bench.Command, requestC, responseC) + } + + // producer thread + go func() { + interval := time.Duration(float64(1) / float64(bench.Rate) * float64(time.Second)) + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + // notice how we star the timer regardless of whether thread can process it + // this is to account for coordinated omission + measure := &benchMeasure{ + Start: time.Now(), + } + select { + case requestC <- measure: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }() + + var result BenchmarkResult + // from one millisecond to 60000 milliseconds (minute) + result.Histogram = hdrhistogram.New(1, 60000, 3) + + var doneThreads int + for { + select { + case <-ctx.Done(): + return &result, nil + case measure := <-responseC: + if measure.ThreadCompleted { + doneThreads += 1 + if doneThreads == bench.Threads { + return &result, nil + } + } else { + if measure.Error != nil { + result.RequestsFailed += 1 + } + result.RequestsOriginated += 1 + result.Histogram.RecordValue(int64(measure.End.Sub(measure.Start) / time.Millisecond)) + } + } + } + +} + +type benchMeasure struct { + Start time.Time + End time.Time + ThreadCompleted bool + ThreadID int + Error error +} + +func benchmarkThread(threadID int, ctx context.Context, tc *TeleportClient, command []string, receiveC chan *benchMeasure, sendC chan *benchMeasure) { + sendMeasure := func(measure *benchMeasure) { + measure.ThreadID = threadID + select { + case sendC <- measure: + default: + log.Warningf("blocked on measure send\n") + } + } + defer func() { + if r := recover(); r != nil { + log.Warningf("recover from panic: %v", r) + sendMeasure(&benchMeasure{ThreadCompleted: true}) + } + }() + + for { + select { + case measure := <-receiveC: + err := tc.SSH(ctx, command, false) + measure.Error = err + measure.End = time.Now() + sendMeasure(measure) + case <-ctx.Done(): + sendMeasure(&benchMeasure{ + ThreadCompleted: true, + }) + return + } + } +} diff --git a/lib/srv/sshserver.go b/lib/srv/sshserver.go index 124c7a1fdac..beb1549bd7a 100644 --- a/lib/srv/sshserver.go +++ b/lib/srv/sshserver.go @@ -512,7 +512,7 @@ func (s *Server) isAuthority(cert ssh.PublicKey) bool { // find cert authority by it's key cas, err := s.authService.GetCertAuthorities(services.UserCA, false) if err != nil { - log.Warningf("%v", err) + log.Warningf("%v", trace.DebugReport(err)) return false } diff --git a/tool/tsh/common/tsh.go b/tool/tsh/common/tsh.go index da5b3f36853..de3b845650e 100644 --- a/tool/tsh/common/tsh.go +++ b/tool/tsh/common/tsh.go @@ -20,8 +20,10 @@ import ( "context" "fmt" "os" + "os/signal" "path/filepath" "strings" + "syscall" "time" "github.com/gravitational/teleport/lib/client" @@ -82,6 +84,14 @@ type CLIConf struct { // then exit. This is useful when calling tsh agent from a script (for example ~/.bash_profile) // to load keys into your system agent. LoadSystemAgentOnly bool + // BenchThreads is amount of concurrent threads to run + BenchThreads int + // BenchDuration is a duration for the benchmark + BenchDuration time.Duration + // BenchRate is a requests per second rate to mantain + BenchRate int + // Context is a context to control execution + Context context.Context } // Run executes TSH client. same as main() but easier to test @@ -144,6 +154,15 @@ func Run(args []string, underTest bool) { // logout deletes obtained session certificates in ~/.tsh logout := app.Command("logout", "Delete a cluster certificate") + // bench + bench := app.Command("bench", "Run shell or execute a command on a remote SSH node") + bench.Arg("[user@]host", "Remote hostname and the login to use").Required().StringVar(&cf.UserHost) + bench.Arg("command", "Command to execute on a remote host").Required().StringsVar(&cf.RemoteCommand) + bench.Flag("port", "SSH port on a remote host").Short('p').Int16Var(&cf.NodePort) + bench.Flag("threads", "Concurrent threads to run").Default("10").IntVar(&cf.BenchThreads) + bench.Flag("duration", "Test duration").Default("1s").DurationVar(&cf.BenchDuration) + bench.Flag("rate", "Requests per second rate").Default("10").IntVar(&cf.BenchRate) + // parse CLI commands+flags: command, err := app.Parse(args) if err != nil { @@ -155,11 +174,26 @@ func Run(args []string, underTest bool) { utils.InitLogger(utils.LoggingForCLI, logrus.DebugLevel) } + ctx, cancel := context.WithCancel(context.Background()) + go func() { + exitSignals := make(chan os.Signal, 1) + signal.Notify(exitSignals, syscall.SIGTERM, syscall.SIGINT) + + select { + case sig := <-exitSignals: + logrus.Debugf("signal: %v", sig) + cancel() + } + }() + cf.Context = ctx + switch command { case ver.FullCommand(): onVersion() case ssh.FullCommand(): onSSH(&cf) + case bench.FullCommand(): + onBenchmark(&cf) case join.FullCommand(): onJoin(&cf) case scp.FullCommand(): @@ -315,6 +349,35 @@ func onSSH(cf *CLIConf) { } } +// onBenchmark executes benchmark +func onBenchmark(cf *CLIConf) { + tc, err := makeClient(cf, false) + if err != nil { + utils.FatalError(err) + } + result, err := tc.Benchmark(cf.Context, client.Benchmark{ + Command: cf.RemoteCommand, + Threads: cf.BenchThreads, + Duration: cf.BenchDuration, + Rate: cf.BenchRate, + }) + if err != nil { + fmt.Fprintln(os.Stderr, utils.UserMessageFromError(err)) + os.Exit(255) + } + fmt.Printf("\n") + fmt.Printf("* Requests originated: %v\n", result.RequestsOriginated) + fmt.Printf("* Requests failed: %v\n", result.RequestsFailed) + fmt.Printf("\nHistogram\n\n") + t := goterm.NewTable(0, 10, 5, ' ', 0) + printHeader(t, []string{"Percentile", "Duration"}) + for _, quantile := range []float64{25, 50, 75, 90, 95, 99, 100} { + fmt.Fprintf(t, "%v\t%v ms\n", quantile, result.Histogram.ValueAtQuantile(quantile)) + } + fmt.Fprintf(os.Stdout, t.String()) + fmt.Printf("\n") +} + // onJoin executes 'ssh join' command func onJoin(cf *CLIConf) { tc, err := makeClient(cf, true)