Add sizes to traces (#19851)

added to storage and grid traces. Can provide more context for traces that aren't HTTP. Others may apply.
This commit is contained in:
Klaus Post 2024-05-31 22:17:37 -07:00 committed by GitHub
parent c5b3f5553f
commit e72429c79c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 46 additions and 36 deletions

View file

@ -405,7 +405,7 @@ func (p *xlStorageDiskIDCheck) WalkDir(ctx context.Context, opts WalkDirOptions,
if err != nil {
return err
}
defer done(&err)
defer done(0, &err)
return p.storage.WalkDir(ctx, opts, wr)
}

View file

@ -288,7 +288,7 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context, opts DiskInfoOption
}
si := p.updateStorageMetrics(storageMetricDiskInfo)
defer si(&err)
defer si(0, &err)
if opts.NoOp {
if opts.Metrics {
@ -341,7 +341,7 @@ func (p *xlStorageDiskIDCheck) MakeVolBulk(ctx context.Context, volumes ...strin
if err != nil {
return err
}
defer done(&err)
defer done(0, &err)
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.MakeVolBulk(ctx, volumes...) })
@ -352,7 +352,7 @@ func (p *xlStorageDiskIDCheck) MakeVol(ctx context.Context, volume string) (err
if err != nil {
return err
}
defer done(&err)
defer done(0, &err)
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.MakeVol(ctx, volume) })
@ -363,7 +363,7 @@ func (p *xlStorageDiskIDCheck) ListVols(ctx context.Context) (vi []VolInfo, err
if err != nil {
return nil, err
}
defer done(&err)
defer done(0, &err)
return p.storage.ListVols(ctx)
}
@ -373,7 +373,7 @@ func (p *xlStorageDiskIDCheck) StatVol(ctx context.Context, volume string) (vol
if err != nil {
return vol, err
}
defer done(&err)
defer done(0, &err)
return xioutil.WithDeadline[VolInfo](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (result VolInfo, err error) {
return p.storage.StatVol(ctx, volume)
@ -385,7 +385,7 @@ func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, for
if err != nil {
return err
}
defer done(&err)
defer done(0, &err)
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.DeleteVol(ctx, volume, forceDelete) })
@ -396,7 +396,7 @@ func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, origvolume, volume,
if err != nil {
return nil, err
}
defer done(&err)
defer done(0, &err)
return p.storage.ListDir(ctx, origvolume, volume, dirPath, count)
}
@ -407,7 +407,9 @@ func (p *xlStorageDiskIDCheck) ReadFile(ctx context.Context, volume string, path
if err != nil {
return 0, err
}
defer done(&err)
defer func() {
done(n, &err)
}()
return xioutil.WithDeadline[int64](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (result int64, err error) {
return p.storage.ReadFile(ctx, volume, path, offset, buf, verifier)
@ -420,7 +422,7 @@ func (p *xlStorageDiskIDCheck) AppendFile(ctx context.Context, volume string, pa
if err != nil {
return err
}
defer done(&err)
defer done(int64(len(buf)), &err)
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error {
@ -433,7 +435,7 @@ func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, origvolume, volum
if err != nil {
return err
}
defer done(&err)
defer done(size, &err)
return p.storage.CreateFile(ctx, origvolume, volume, path, size, io.NopCloser(reader))
}
@ -443,7 +445,7 @@ func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path
if err != nil {
return nil, err
}
defer done(&err)
defer done(length, &err)
return xioutil.WithDeadline[io.ReadCloser](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (result io.ReadCloser, err error) {
return p.storage.ReadFileStream(ctx, volume, path, offset, length)
@ -455,7 +457,7 @@ func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPat
if err != nil {
return err
}
defer done(&err)
defer done(0, &err)
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.RenameFile(ctx, srcVolume, srcPath, dstVolume, dstPath) })
@ -470,7 +472,7 @@ func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPat
if err == nil && !skipAccessChecks(dstVolume) {
p.storage.setWriteAttribute(p.totalWrites.Add(1))
}
done(&err)
done(0, &err)
}()
// Copy inline data to a new buffer to function with deadlines.
@ -490,7 +492,7 @@ func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, pa
if err != nil {
return err
}
defer done(&err)
defer done(0, &err)
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.CheckParts(ctx, volume, path, fi) })
@ -501,7 +503,7 @@ func (p *xlStorageDiskIDCheck) Delete(ctx context.Context, volume string, path s
if err != nil {
return err
}
defer done(&err)
defer done(0, &err)
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.Delete(ctx, volume, path, deleteOpts) })
@ -548,7 +550,7 @@ func (p *xlStorageDiskIDCheck) DeleteVersions(ctx context.Context, volume string
p.storage.setDeleteAttribute(p.totalDeletes.Add(permanentDeletes))
}
}
done(&err)
done(0, &err)
}()
errs = p.storage.DeleteVersions(ctx, volume, versions, opts)
@ -567,7 +569,7 @@ func (p *xlStorageDiskIDCheck) VerifyFile(ctx context.Context, volume, path stri
if err != nil {
return err
}
defer done(&err)
defer done(0, &err)
return p.storage.VerifyFile(ctx, volume, path, fi)
}
@ -577,7 +579,7 @@ func (p *xlStorageDiskIDCheck) WriteAll(ctx context.Context, volume string, path
if err != nil {
return err
}
defer done(&err)
defer done(int64(len(b)), &err)
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.WriteAll(ctx, volume, path, b) })
@ -589,7 +591,7 @@ func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path s
return err
}
defer func() {
defer done(&err)
defer done(0, &err)
if err == nil && !skipAccessChecks(volume) {
if opts.UndoWrite {
@ -616,7 +618,7 @@ func (p *xlStorageDiskIDCheck) UpdateMetadata(ctx context.Context, volume, path
if err != nil {
return err
}
defer done(&err)
defer done(0, &err)
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.UpdateMetadata(ctx, volume, path, fi, opts) })
@ -627,7 +629,7 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, origvolume, vo
if err != nil {
return err
}
defer done(&err)
defer done(0, &err)
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.WriteMetadata(ctx, origvolume, volume, path, fi) })
@ -638,7 +640,7 @@ func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, origvolume, volu
if err != nil {
return fi, err
}
defer done(&err)
defer done(0, &err)
return xioutil.WithDeadline[FileInfo](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (result FileInfo, err error) {
return p.storage.ReadVersion(ctx, origvolume, volume, path, versionID, opts)
@ -650,7 +652,11 @@ func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path
if err != nil {
return nil, err
}
defer done(&err)
var sz int
defer func() {
sz = len(buf)
done(int64(sz), &err)
}()
return xioutil.WithDeadline[[]byte](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (result []byte, err error) {
return p.storage.ReadAll(ctx, volume, path)
@ -662,7 +668,9 @@ func (p *xlStorageDiskIDCheck) ReadXL(ctx context.Context, volume string, path s
if err != nil {
return RawFileInfo{}, err
}
defer done(&err)
defer func() {
done(int64(len(rf.Buf)), &err)
}()
return xioutil.WithDeadline[RawFileInfo](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (result RawFileInfo, err error) {
return p.storage.ReadXL(ctx, volume, path, readData)
@ -674,7 +682,7 @@ func (p *xlStorageDiskIDCheck) StatInfoFile(ctx context.Context, volume, path st
if err != nil {
return nil, err
}
defer done(&err)
defer done(0, &err)
return p.storage.StatInfoFile(ctx, volume, path, glob)
}
@ -689,7 +697,7 @@ func (p *xlStorageDiskIDCheck) ReadMultiple(ctx context.Context, req ReadMultipl
xioutil.SafeClose(resp)
return err
}
defer done(&err)
defer done(0, &err)
return p.storage.ReadMultiple(ctx, req, resp)
}
@ -701,19 +709,20 @@ func (p *xlStorageDiskIDCheck) CleanAbandonedData(ctx context.Context, volume st
if err != nil {
return err
}
defer done(&err)
defer done(0, &err)
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.CleanAbandonedData(ctx, volume, path) })
}
func storageTrace(s storageMetric, startTime time.Time, duration time.Duration, path string, err string, custom map[string]string) madmin.TraceInfo {
func storageTrace(s storageMetric, startTime time.Time, duration time.Duration, path string, size int64, err string, custom map[string]string) madmin.TraceInfo {
return madmin.TraceInfo{
TraceType: madmin.TraceStorage,
Time: startTime,
NodeName: globalLocalNodeName,
FuncName: "storage." + s.String(),
Duration: duration,
Bytes: size,
Path: path,
Error: err,
Custom: custom,
@ -733,10 +742,10 @@ func scannerTrace(s scannerMetric, startTime time.Time, duration time.Duration,
}
// Update storage metrics
func (p *xlStorageDiskIDCheck) updateStorageMetrics(s storageMetric, paths ...string) func(err *error) {
func (p *xlStorageDiskIDCheck) updateStorageMetrics(s storageMetric, paths ...string) func(sz int64, err *error) {
startTime := time.Now()
trace := globalTrace.NumSubscribers(madmin.TraceStorage) > 0
return func(errp *error) {
return func(sz int64, errp *error) {
duration := time.Since(startTime)
var err error
@ -767,7 +776,7 @@ func (p *xlStorageDiskIDCheck) updateStorageMetrics(s storageMetric, paths ...st
}
custom["total-errs-timeout"] = strconv.FormatUint(p.totalErrsTimeout.Load(), 10)
custom["total-errs-availability"] = strconv.FormatUint(p.totalErrsAvailability.Load(), 10)
globalTrace.Publish(storageTrace(s, startTime, duration, strings.Join(paths, " "), errStr, custom))
globalTrace.Publish(storageTrace(s, startTime, duration, strings.Join(paths, " "), sz, errStr, custom))
}
}
}
@ -824,7 +833,7 @@ func (h *healthDiskCtxValue) logSuccess() {
// noopDoneFunc is a no-op done func.
// Can be reused.
var noopDoneFunc = func(_ *error) {}
var noopDoneFunc = func(_ int64, _ *error) {}
// TrackDiskHealth for this request.
// When a non-nil error is returned 'done' MUST be called
@ -833,7 +842,7 @@ var noopDoneFunc = func(_ *error) {}
// is either nil or io.EOF the disk is considered good.
// So if unsure if the disk status is ok, return nil as a parameter to done.
// Shadowing will work as long as return error is named: https://go.dev/play/p/sauq86SsTN2
func (p *xlStorageDiskIDCheck) TrackDiskHealth(ctx context.Context, s storageMetric, paths ...string) (c context.Context, done func(*error), err error) {
func (p *xlStorageDiskIDCheck) TrackDiskHealth(ctx context.Context, s storageMetric, paths ...string) (c context.Context, done func(int64, *error), err error) {
done = noopDoneFunc
if contextCanceled(ctx) {
return ctx, done, ctx.Err()
@ -866,7 +875,7 @@ func (p *xlStorageDiskIDCheck) TrackDiskHealth(ctx context.Context, s storageMet
ctx = context.WithValue(ctx, healthDiskCtxKey{}, &healthDiskCtxValue{lastSuccess: &p.health.lastSuccess})
si := p.updateStorageMetrics(s, paths...)
var once sync.Once
return ctx, func(errp *error) {
return ctx, func(sz int64, errp *error) {
p.health.waiting.Add(-1)
once.Do(func() {
if errp != nil {
@ -875,7 +884,7 @@ func (p *xlStorageDiskIDCheck) TrackDiskHealth(ctx context.Context, s storageMet
p.health.logSuccess()
}
}
si(errp)
si(sz, errp)
})
}, nil
}

View file

@ -105,6 +105,7 @@ func (c *muxClient) traceRoundtrip(ctx context.Context, t *tracer, h HandlerID,
Duration: end.Sub(start),
Path: t.Subroute,
Error: errString,
Bytes: int64(len(req) + len(resp)),
HTTP: &madmin.TraceHTTPStats{
ReqInfo: madmin.TraceRequestInfo{
Time: start,