more work and instrumentation

This commit is contained in:
Sasha Klizhentas 2017-05-26 14:53:17 -07:00
parent e766a3c902
commit 4827d8e39d

View file

@ -19,21 +19,17 @@ package state
import (
"context"
"fmt"
"io"
"io/ioutil"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/codahale/hdrhistogram"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/trace"
)
var (
errNotSupported = trace.BadParameter("method not supported")
log "github.com/Sirupsen/logrus"
"github.com/gravitational/trace"
"github.com/prometheus/client_golang/prometheus"
)
const (
@ -53,24 +49,70 @@ const (
ThrottleDuration = 10 * time.Second
)
var (
errNotSupported = trace.BadParameter("method not supported")
)
var (
auditLatencies = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "audit_latency_microseconds",
Help: "Latency for audit log submission",
// Buckets in microsecnd latencies
Buckets: prometheus.ExponentialBuckets(5000, 1.5, 15),
},
)
auditChunks = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "audit_chunks_total",
Help: "Chunks per slice submitted",
Buckets: prometheus.LinearBuckets(10, 20, 10),
},
)
auditBytesPerChunk = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "audit_chunk_bytes",
Help: "Bytes per submitted per chunk",
Buckets: prometheus.ExponentialBuckets(10, 3.0, 10),
},
)
auditBytesPerSlice = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "audit_slice_bytes",
Help: "Bytes per submitted slice of chunks",
Buckets: prometheus.ExponentialBuckets(100, 3.0, 10),
},
)
auditRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "audit_requests_total",
Help: "Number of audit requests",
},
[]string{"result"},
)
)
func init() {
// Metrics have to be registered to be exposed:
prometheus.MustRegister(auditLatencies)
prometheus.MustRegister(auditChunks)
prometheus.MustRegister(auditBytesPerChunk)
prometheus.MustRegister(auditBytesPerSlice)
prometheus.MustRegister(auditRequests)
}
// CachingAuditLog implements events.IAuditLog on the recording machine (SSH server)
// It captures the local recording and forwards it to the AuditLog network server
type CachingAuditLog struct {
sync.Mutex
server events.IAuditLog
queue chan []events.SessionChunk
cancel context.CancelFunc
ctx context.Context
chunks []events.SessionChunk
bytes int64
latencyHist *hdrhistogram.Histogram
chunksHist *hdrhistogram.Histogram
bytesHist *hdrhistogram.Histogram
bytesPerChunkHist *hdrhistogram.Histogram
throttleStart time.Time
lastReport time.Time
requests int
withBackPressure bool
server events.IAuditLog
queue chan []events.SessionChunk
cancel context.CancelFunc
ctx context.Context
chunks []events.SessionChunk
bytes int64
throttleStart time.Time
withBackPressure bool
}
func (ll *CachingAuditLog) add(chunks []events.SessionChunk) {
@ -91,13 +133,9 @@ func (ll *CachingAuditLog) reset() []events.SessionChunk {
func NewCachingAuditLog(logServer events.IAuditLog) *CachingAuditLog {
ctx, cancel := context.WithCancel(context.TODO())
ll := &CachingAuditLog{
server: logServer,
cancel: cancel,
ctx: ctx,
latencyHist: hdrhistogram.New(1, 60000, 3),
chunksHist: hdrhistogram.New(1, 60000, 3),
bytesHist: hdrhistogram.New(1, 600000, 3),
bytesPerChunkHist: hdrhistogram.New(1, 600000, 3),
server: logServer,
cancel: cancel,
ctx: ctx,
}
// start the queue:
if logServer != nil {
@ -120,7 +158,6 @@ func (ll *CachingAuditLog) run() {
case <-tickerC:
// tick received to force flush after time passed
tickerC = nil
log.Warningf("flushing after timeout")
ll.flush(true)
case chunks := <-ll.queue:
ll.add(chunks)
@ -144,59 +181,39 @@ func (ll *CachingAuditLog) flush(force bool) {
chunks := ll.reset()
start := time.Now()
err := ll.server.PostSessionChunks(chunks)
auditRequests.WithLabelValues("total").Inc()
if err != nil {
log.Warningf("failed to post chunk: %v", err)
auditRequests.WithLabelValues("failure").Inc()
}
ll.requests += 1
ll.latencyHist.RecordValue(int64(time.Now().Sub(start) / time.Microsecond))
ll.chunksHist.RecordValue(int64(len(chunks)))
auditLatencies.Observe(float64(time.Now().Sub(start) / time.Microsecond))
auditChunks.Observe(float64(len(chunks)))
var bytes int64
for _, c := range chunks {
bytes += int64(len(c.Data))
ll.bytesPerChunkHist.RecordValue(int64(len(c.Data)))
}
ll.bytesHist.RecordValue(bytes)
if time.Now().Sub(ll.lastReport) > 10*time.Second {
diff := time.Now().Sub(ll.lastReport) / time.Second
requests := ll.requests
ll.requests = 0
ll.lastReport = time.Now()
fmt.Printf("Latency histogram\n")
for _, quantile := range []float64{25, 50, 75, 90, 95, 99, 100} {
fmt.Printf("%v\t%v microseconds\n", quantile, ll.latencyHist.ValueAtQuantile(quantile))
}
fmt.Printf("%v requests/sec\n", requests/int(diff))
fmt.Printf("Chunk count histogram\n")
for _, quantile := range []float64{25, 50, 75, 90, 95, 99, 100} {
fmt.Printf("%v\t%v chunks\n", quantile, ll.chunksHist.ValueAtQuantile(quantile))
}
fmt.Printf("Bytes per slice of chunks histogram\n")
for _, quantile := range []float64{25, 50, 75, 90, 95, 99, 100} {
fmt.Printf("%v\t%v bytes\n", quantile, ll.bytesHist.ValueAtQuantile(quantile))
}
fmt.Printf("Bytes per chunk histogram\n")
for _, quantile := range []float64{25, 50, 75, 90, 95, 99, 100} {
fmt.Printf("%v\t%v bytes\n", quantile, ll.bytesPerChunkHist.ValueAtQuantile(quantile))
}
auditBytesPerChunk.Observe(float64(len(c.Data)))
}
auditBytesPerSlice.Observe(float64(bytes))
}
func (ll *CachingAuditLog) post(chunks []events.SessionChunk) error {
if time.Now().After(ll.throttleStart) {
if time.Now().Before(ll.throttleStart) {
return nil
}
select {
case ll.queue <- chunks:
return nil
default:
// the queue is blocked, now we will create a timer
// to detect the timeout
}
timer := time.NewTimer(ThrottleLatency)
defer timer.Stop()
select {
case ll.queue <- chunks:
case <-timer.C:
ll.throttleStart = time.Now().Add(ThrottleDuration)
log.Warningf("will throttle connection until %v", ll.throttleStart)
log.Warningf("will throttle audit log forward until %v", ll.throttleStart)
}
return nil
}