publish storage API latency through node metrics (#14117)

Publish storage functions latency to help compare the performance 
of different disks in a single deployment.

e.g.:
```
minio_node_disk_latency_us{api="storage.WalkDir",disk="/tmp/xl/1",server="localhost:9001"} 226
minio_node_disk_latency_us{api="storage.WalkDir",disk="/tmp/xl/2",server="localhost:9002"} 1180
minio_node_disk_latency_us{api="storage.WalkDir",disk="/tmp/xl/3",server="localhost:9003"} 1183
minio_node_disk_latency_us{api="storage.WalkDir",disk="/tmp/xl/4",server="localhost:9004"} 1625
```
This commit is contained in:
Anis Elleuch 2022-01-26 01:31:44 +01:00 committed by GitHub
parent 58e6b83e95
commit 45a99c3fd3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 590 additions and 185 deletions

View file

@ -38,7 +38,7 @@ func (rl ReplicationLatency) merge(other ReplicationLatency) (newReplLatency Rep
// Get upload latency of each object size range
func (rl ReplicationLatency) getUploadLatency() (ret map[string]uint64) {
ret = make(map[string]uint64)
avg := rl.UploadHistogram.GetAvg()
avg := rl.UploadHistogram.GetAvgData()
for k, v := range avg {
// Convert nanoseconds to milliseconds
ret[sizeTagToString(k)] = v.avg() / uint64(time.Millisecond)

View file

@ -15,6 +15,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//go:generate msgp -file=$GOFILE -unexported
package cmd
import (
@ -76,34 +78,34 @@ type AccElem struct {
N int64
}
// add dur to a as a single element.
// Add a duration to a single element.
func (a *AccElem) add(dur time.Duration) {
a.Total += int64(dur)
a.N++
}
// merge b into a.
// Merge b into a.
func (a *AccElem) merge(b AccElem) {
a.N += b.N
a.Total += b.Total
}
// avg converts total to average.
func (a *AccElem) avg() uint64 {
// Avg converts total to average.
func (a AccElem) avg() uint64 {
if a.N >= 1 && a.Total > 0 {
return uint64(a.Total / a.N)
}
return 0
}
// LastMinuteLatencies keeps track of last minute latencies.
type LastMinuteLatencies struct {
Totals [60][sizeLastElemMarker]AccElem
// lastMinuteLatency keeps track of last minute latency.
type lastMinuteLatency struct {
Totals [60]AccElem
LastSec int64
}
// Merge safely merges two LastMinuteLatencies structures into one
func (l LastMinuteLatencies) Merge(o LastMinuteLatencies) (merged LastMinuteLatencies) {
// Merge data of two lastMinuteLatency structure
func (l lastMinuteLatency) merge(o lastMinuteLatency) (merged lastMinuteLatency) {
if l.LastSec > o.LastSec {
o.forwardTo(l.LastSec)
merged.LastSec = l.LastSec
@ -113,57 +115,73 @@ func (l LastMinuteLatencies) Merge(o LastMinuteLatencies) (merged LastMinuteLate
}
for i := range merged.Totals {
for j := range merged.Totals[i] {
merged.Totals[i][j] = AccElem{
Total: l.Totals[i][j].Total + o.Totals[i][j].Total,
N: l.Totals[i][j].N + o.Totals[i][j].N,
}
merged.Totals[i] = AccElem{
Total: l.Totals[i].Total + o.Totals[i].Total,
N: l.Totals[i].N + o.Totals[i].N,
}
}
return merged
}
// Add a new duration data
func (l *lastMinuteLatency) add(t time.Duration) {
sec := time.Now().Unix()
l.forwardTo(sec)
winIdx := sec % 60
l.Totals[winIdx].add(t)
l.LastSec = sec
}
// Merge all recorded latencies of last minute into one
func (l *lastMinuteLatency) getAvgData() AccElem {
var res AccElem
sec := time.Now().Unix()
l.forwardTo(sec)
for _, elem := range l.Totals[:] {
res.merge(elem)
}
return res
}
// forwardTo time t, clearing any entries in between.
func (l *lastMinuteLatency) forwardTo(t int64) {
if l.LastSec >= t {
return
}
if t-l.LastSec >= 60 {
l.Totals = [60]AccElem{}
return
}
for l.LastSec != t {
// Clear next element.
idx := (l.LastSec + 1) % 60
l.Totals[idx] = AccElem{}
l.LastSec++
}
}
// LastMinuteLatencies keeps track of last minute latencies.
type LastMinuteLatencies [sizeLastElemMarker]lastMinuteLatency
// Merge safely merges two LastMinuteLatencies structures into one
func (l LastMinuteLatencies) Merge(o LastMinuteLatencies) (merged LastMinuteLatencies) {
for i := range l {
merged[i] = l[i].merge(o[i])
}
return merged
}
// Add latency t from object with the specified size.
func (l *LastMinuteLatencies) Add(size int64, t time.Duration) {
tag := sizeToTag(size)
// Update...
sec := time.Now().Unix()
l.forwardTo(sec)
winIdx := sec % 60
l.Totals[winIdx][tag].add(t)
l.LastSec = sec
l[sizeToTag(size)].add(t)
}
// GetAvg will return the average for each bucket from the last time minute.
// GetAvgData will return the average for each bucket from the last time minute.
// The number of objects is also included.
func (l *LastMinuteLatencies) GetAvg() [sizeLastElemMarker]AccElem {
func (l *LastMinuteLatencies) GetAvgData() [sizeLastElemMarker]AccElem {
var res [sizeLastElemMarker]AccElem
sec := time.Now().Unix()
l.forwardTo(sec)
for _, elems := range l.Totals[:] {
for j := range elems {
res[j].merge(elems[j])
}
for i, elem := range l[:] {
res[i] = elem.getAvgData()
}
return res
}
// forwardTo time t, clearing any entries in between.
func (l *LastMinuteLatencies) forwardTo(t int64) {
if l.LastSec >= t {
return
}
if t-l.LastSec >= 60 {
l.Totals = [60][sizeLastElemMarker]AccElem{}
return
}
for l.LastSec != t {
// Clear next element.
idx := (l.LastSec + 1) % 60
l.Totals[idx] = [sizeLastElemMarker]AccElem{}
l.LastSec++
}
}

View file

@ -136,6 +136,282 @@ func (z AccElem) Msgsize() (s int) {
// DecodeMsg implements msgp.Decodable
func (z *LastMinuteLatencies) DecodeMsg(dc *msgp.Reader) (err error) {
var zb0001 uint32
zb0001, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
if zb0001 != uint32(sizeLastElemMarker) {
err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0001}
return
}
for za0001 := range z {
var field []byte
_ = field
var zb0002 uint32
zb0002, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, za0001)
return
}
for zb0002 > 0 {
zb0002--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err, za0001)
return
}
switch msgp.UnsafeString(field) {
case "Totals":
var zb0003 uint32
zb0003, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, za0001, "Totals")
return
}
if zb0003 != uint32(60) {
err = msgp.ArrayError{Wanted: uint32(60), Got: zb0003}
return
}
for za0002 := range z[za0001].Totals {
var zb0004 uint32
zb0004, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, za0001, "Totals", za0002)
return
}
for zb0004 > 0 {
zb0004--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err, za0001, "Totals", za0002)
return
}
switch msgp.UnsafeString(field) {
case "Total":
z[za0001].Totals[za0002].Total, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, za0001, "Totals", za0002, "Total")
return
}
case "N":
z[za0001].Totals[za0002].N, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, za0001, "Totals", za0002, "N")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err, za0001, "Totals", za0002)
return
}
}
}
}
case "LastSec":
z[za0001].LastSec, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, za0001, "LastSec")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err, za0001)
return
}
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *LastMinuteLatencies) EncodeMsg(en *msgp.Writer) (err error) {
err = en.WriteArrayHeader(uint32(sizeLastElemMarker))
if err != nil {
err = msgp.WrapError(err)
return
}
for za0001 := range z {
// map header, size 2
// write "Totals"
err = en.Append(0x82, 0xa6, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x73)
if err != nil {
return
}
err = en.WriteArrayHeader(uint32(60))
if err != nil {
err = msgp.WrapError(err, za0001, "Totals")
return
}
for za0002 := range z[za0001].Totals {
// map header, size 2
// write "Total"
err = en.Append(0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
if err != nil {
return
}
err = en.WriteInt64(z[za0001].Totals[za0002].Total)
if err != nil {
err = msgp.WrapError(err, za0001, "Totals", za0002, "Total")
return
}
// write "N"
err = en.Append(0xa1, 0x4e)
if err != nil {
return
}
err = en.WriteInt64(z[za0001].Totals[za0002].N)
if err != nil {
err = msgp.WrapError(err, za0001, "Totals", za0002, "N")
return
}
}
// write "LastSec"
err = en.Append(0xa7, 0x4c, 0x61, 0x73, 0x74, 0x53, 0x65, 0x63)
if err != nil {
return
}
err = en.WriteInt64(z[za0001].LastSec)
if err != nil {
err = msgp.WrapError(err, za0001, "LastSec")
return
}
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *LastMinuteLatencies) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendArrayHeader(o, uint32(sizeLastElemMarker))
for za0001 := range z {
// map header, size 2
// string "Totals"
o = append(o, 0x82, 0xa6, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x73)
o = msgp.AppendArrayHeader(o, uint32(60))
for za0002 := range z[za0001].Totals {
// map header, size 2
// string "Total"
o = append(o, 0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
o = msgp.AppendInt64(o, z[za0001].Totals[za0002].Total)
// string "N"
o = append(o, 0xa1, 0x4e)
o = msgp.AppendInt64(o, z[za0001].Totals[za0002].N)
}
// string "LastSec"
o = append(o, 0xa7, 0x4c, 0x61, 0x73, 0x74, 0x53, 0x65, 0x63)
o = msgp.AppendInt64(o, z[za0001].LastSec)
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *LastMinuteLatencies) UnmarshalMsg(bts []byte) (o []byte, err error) {
var zb0001 uint32
zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
if zb0001 != uint32(sizeLastElemMarker) {
err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0001}
return
}
for za0001 := range z {
var field []byte
_ = field
var zb0002 uint32
zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, za0001)
return
}
for zb0002 > 0 {
zb0002--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err, za0001)
return
}
switch msgp.UnsafeString(field) {
case "Totals":
var zb0003 uint32
zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, za0001, "Totals")
return
}
if zb0003 != uint32(60) {
err = msgp.ArrayError{Wanted: uint32(60), Got: zb0003}
return
}
for za0002 := range z[za0001].Totals {
var zb0004 uint32
zb0004, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, za0001, "Totals", za0002)
return
}
for zb0004 > 0 {
zb0004--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err, za0001, "Totals", za0002)
return
}
switch msgp.UnsafeString(field) {
case "Total":
z[za0001].Totals[za0002].Total, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, za0001, "Totals", za0002, "Total")
return
}
case "N":
z[za0001].Totals[za0002].N, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, za0001, "Totals", za0002, "N")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err, za0001, "Totals", za0002)
return
}
}
}
}
case "LastSec":
z[za0001].LastSec, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, za0001, "LastSec")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err, za0001)
return
}
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *LastMinuteLatencies) Msgsize() (s int) {
s = msgp.ArrayHeaderSize + (sizeLastElemMarker * (16 + (60 * (9 + msgp.Int64Size + msgp.Int64Size)) + msgp.Int64Size))
return
}
// DecodeMsg implements msgp.Decodable
func (z *lastMinuteLatency) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
@ -165,48 +441,36 @@ func (z *LastMinuteLatencies) DecodeMsg(dc *msgp.Reader) (err error) {
}
for za0001 := range z.Totals {
var zb0003 uint32
zb0003, err = dc.ReadArrayHeader()
zb0003, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "Totals", za0001)
return
}
if zb0003 != uint32(sizeLastElemMarker) {
err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0003}
return
}
for za0002 := range z.Totals[za0001] {
var zb0004 uint32
zb0004, err = dc.ReadMapHeader()
for zb0003 > 0 {
zb0003--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002)
err = msgp.WrapError(err, "Totals", za0001)
return
}
for zb0004 > 0 {
zb0004--
field, err = dc.ReadMapKeyPtr()
switch msgp.UnsafeString(field) {
case "Total":
z.Totals[za0001].Total, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002)
err = msgp.WrapError(err, "Totals", za0001, "Total")
return
}
switch msgp.UnsafeString(field) {
case "Total":
z.Totals[za0001][za0002].Total, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002, "Total")
return
}
case "N":
z.Totals[za0001][za0002].N, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002, "N")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002)
return
}
case "N":
z.Totals[za0001].N, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, "N")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err, "Totals", za0001)
return
}
}
}
@ -229,7 +493,7 @@ func (z *LastMinuteLatencies) DecodeMsg(dc *msgp.Reader) (err error) {
}
// EncodeMsg implements msgp.Encodable
func (z *LastMinuteLatencies) EncodeMsg(en *msgp.Writer) (err error) {
func (z *lastMinuteLatency) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "Totals"
err = en.Append(0x82, 0xa6, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x73)
@ -242,33 +506,26 @@ func (z *LastMinuteLatencies) EncodeMsg(en *msgp.Writer) (err error) {
return
}
for za0001 := range z.Totals {
err = en.WriteArrayHeader(uint32(sizeLastElemMarker))
// map header, size 2
// write "Total"
err = en.Append(0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001)
return
}
for za0002 := range z.Totals[za0001] {
// map header, size 2
// write "Total"
err = en.Append(0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
if err != nil {
return
}
err = en.WriteInt64(z.Totals[za0001][za0002].Total)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002, "Total")
return
}
// write "N"
err = en.Append(0xa1, 0x4e)
if err != nil {
return
}
err = en.WriteInt64(z.Totals[za0001][za0002].N)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002, "N")
return
}
err = en.WriteInt64(z.Totals[za0001].Total)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, "Total")
return
}
// write "N"
err = en.Append(0xa1, 0x4e)
if err != nil {
return
}
err = en.WriteInt64(z.Totals[za0001].N)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, "N")
return
}
}
// write "LastSec"
@ -285,23 +542,20 @@ func (z *LastMinuteLatencies) EncodeMsg(en *msgp.Writer) (err error) {
}
// MarshalMsg implements msgp.Marshaler
func (z *LastMinuteLatencies) MarshalMsg(b []byte) (o []byte, err error) {
func (z *lastMinuteLatency) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "Totals"
o = append(o, 0x82, 0xa6, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x73)
o = msgp.AppendArrayHeader(o, uint32(60))
for za0001 := range z.Totals {
o = msgp.AppendArrayHeader(o, uint32(sizeLastElemMarker))
for za0002 := range z.Totals[za0001] {
// map header, size 2
// string "Total"
o = append(o, 0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
o = msgp.AppendInt64(o, z.Totals[za0001][za0002].Total)
// string "N"
o = append(o, 0xa1, 0x4e)
o = msgp.AppendInt64(o, z.Totals[za0001][za0002].N)
}
// map header, size 2
// string "Total"
o = append(o, 0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
o = msgp.AppendInt64(o, z.Totals[za0001].Total)
// string "N"
o = append(o, 0xa1, 0x4e)
o = msgp.AppendInt64(o, z.Totals[za0001].N)
}
// string "LastSec"
o = append(o, 0xa7, 0x4c, 0x61, 0x73, 0x74, 0x53, 0x65, 0x63)
@ -310,7 +564,7 @@ func (z *LastMinuteLatencies) MarshalMsg(b []byte) (o []byte, err error) {
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *LastMinuteLatencies) UnmarshalMsg(bts []byte) (o []byte, err error) {
func (z *lastMinuteLatency) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
@ -340,48 +594,36 @@ func (z *LastMinuteLatencies) UnmarshalMsg(bts []byte) (o []byte, err error) {
}
for za0001 := range z.Totals {
var zb0003 uint32
zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts)
zb0003, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001)
return
}
if zb0003 != uint32(sizeLastElemMarker) {
err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0003}
return
}
for za0002 := range z.Totals[za0001] {
var zb0004 uint32
zb0004, bts, err = msgp.ReadMapHeaderBytes(bts)
for zb0003 > 0 {
zb0003--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002)
err = msgp.WrapError(err, "Totals", za0001)
return
}
for zb0004 > 0 {
zb0004--
field, bts, err = msgp.ReadMapKeyZC(bts)
switch msgp.UnsafeString(field) {
case "Total":
z.Totals[za0001].Total, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002)
err = msgp.WrapError(err, "Totals", za0001, "Total")
return
}
switch msgp.UnsafeString(field) {
case "Total":
z.Totals[za0001][za0002].Total, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002, "Total")
return
}
case "N":
z.Totals[za0001][za0002].N, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002, "N")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002)
return
}
case "N":
z.Totals[za0001].N, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, "N")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001)
return
}
}
}
@ -405,7 +647,7 @@ func (z *LastMinuteLatencies) UnmarshalMsg(bts []byte) (o []byte, err error) {
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *LastMinuteLatencies) Msgsize() (s int) {
s = 1 + 7 + msgp.ArrayHeaderSize + (60 * (sizeLastElemMarker * (9 + msgp.Int64Size + msgp.Int64Size))) + 8 + msgp.Int64Size
func (z *lastMinuteLatency) Msgsize() (s int) {
s = 1 + 7 + msgp.ArrayHeaderSize + (60 * (9 + msgp.Int64Size + msgp.Int64Size)) + 8 + msgp.Int64Size
return
}

View file

@ -234,3 +234,116 @@ func BenchmarkDecodeLastMinuteLatencies(b *testing.B) {
}
}
}
func TestMarshalUnmarshallastMinuteLatency(t *testing.T) {
v := lastMinuteLatency{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsglastMinuteLatency(b *testing.B) {
v := lastMinuteLatency{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsglastMinuteLatency(b *testing.B) {
v := lastMinuteLatency{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshallastMinuteLatency(b *testing.B) {
v := lastMinuteLatency{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodelastMinuteLatency(t *testing.T) {
v := lastMinuteLatency{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodelastMinuteLatency Msgsize() is inaccurate")
}
vn := lastMinuteLatency{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodelastMinuteLatency(b *testing.B) {
v := lastMinuteLatency{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodelastMinuteLatency(b *testing.B) {
v := lastMinuteLatency{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View file

@ -70,6 +70,7 @@ func init() {
nodeCollector = newMinioCollectorNode([]*MetricsGroup{
getNodeHealthMetrics(),
getLocalDiskStorageMetrics(),
getCacheMetrics(),
getHTTPMetrics(),
getNetworkMetrics(),
@ -157,6 +158,8 @@ const (
writeBytes MetricName = "write_bytes"
wcharBytes MetricName = "wchar_bytes"
apiLatencyMicroSec MetricName = "latency_us"
usagePercent MetricName = "update_percent"
commitInfo MetricName = "commit_info"
@ -315,6 +318,16 @@ func getClusterCapacityUsageFreeBytesMD() MetricDescription {
}
}
func getNodeDiskAPILatencyMD() MetricDescription {
return MetricDescription{
Namespace: nodeMetricNamespace,
Subsystem: diskSubsystem,
Name: apiLatencyMicroSec,
Help: "Average last minute latency in µs for disk API storage operations.",
Type: gaugeMetric,
}
}
func getNodeDiskUsedBytesMD() MetricDescription {
return MetricDescription{
Namespace: nodeMetricNamespace,
@ -1583,6 +1596,35 @@ func getLocalStorageMetrics() *MetricsGroup {
Value: float64(disk.FreeInodes),
VariableLabels: map[string]string{"disk": disk.DrivePath},
})
}
return
})
return mg
}
func getLocalDiskStorageMetrics() *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 3 * time.Second,
}
mg.RegisterRead(func(ctx context.Context) (metrics []Metric) {
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil || globalIsGateway {
return
}
metrics = make([]Metric, 0, 50)
storageInfo, _ := objLayer.LocalStorageInfo(ctx)
for _, disk := range storageInfo.Disks {
for apiName, latency := range disk.Metrics.APILatencies {
val := latency.(uint64)
metrics = append(metrics, Metric{
Description: getNodeDiskAPILatencyMD(),
Value: float64(val / 1000),
VariableLabels: map[string]string{"disk": disk.DrivePath, "api": "storage." + apiName},
})
}
}
return
})

View file

@ -47,7 +47,7 @@ type DiskInfo struct {
// the number of calls of each API and the moving average of
// the duration of each API.
type DiskMetrics struct {
APILatencies map[string]string `json:"apiLatencies,omitempty"`
APILatencies map[string]uint64 `json:"apiLatencies,omitempty"`
APICalls map[string]uint64 `json:"apiCalls,omitempty"`
}

View file

@ -299,7 +299,7 @@ func (z *DiskMetrics) DecodeMsg(dc *msgp.Reader) (err error) {
return
}
if z.APILatencies == nil {
z.APILatencies = make(map[string]string, zb0002)
z.APILatencies = make(map[string]uint64, zb0002)
} else if len(z.APILatencies) > 0 {
for key := range z.APILatencies {
delete(z.APILatencies, key)
@ -308,13 +308,13 @@ func (z *DiskMetrics) DecodeMsg(dc *msgp.Reader) (err error) {
for zb0002 > 0 {
zb0002--
var za0001 string
var za0002 string
var za0002 uint64
za0001, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "APILatencies")
return
}
za0002, err = dc.ReadString()
za0002, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "APILatencies", za0001)
return
@ -381,7 +381,7 @@ func (z *DiskMetrics) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "APILatencies")
return
}
err = en.WriteString(za0002)
err = en.WriteUint64(za0002)
if err != nil {
err = msgp.WrapError(err, "APILatencies", za0001)
return
@ -421,7 +421,7 @@ func (z *DiskMetrics) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.AppendMapHeader(o, uint32(len(z.APILatencies)))
for za0001, za0002 := range z.APILatencies {
o = msgp.AppendString(o, za0001)
o = msgp.AppendString(o, za0002)
o = msgp.AppendUint64(o, za0002)
}
// string "APICalls"
o = append(o, 0xa8, 0x41, 0x50, 0x49, 0x43, 0x61, 0x6c, 0x6c, 0x73)
@ -459,7 +459,7 @@ func (z *DiskMetrics) UnmarshalMsg(bts []byte) (o []byte, err error) {
return
}
if z.APILatencies == nil {
z.APILatencies = make(map[string]string, zb0002)
z.APILatencies = make(map[string]uint64, zb0002)
} else if len(z.APILatencies) > 0 {
for key := range z.APILatencies {
delete(z.APILatencies, key)
@ -467,14 +467,14 @@ func (z *DiskMetrics) UnmarshalMsg(bts []byte) (o []byte, err error) {
}
for zb0002 > 0 {
var za0001 string
var za0002 string
var za0002 uint64
zb0002--
za0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "APILatencies")
return
}
za0002, bts, err = msgp.ReadStringBytes(bts)
za0002, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "APILatencies", za0001)
return
@ -529,7 +529,7 @@ func (z *DiskMetrics) Msgsize() (s int) {
if z.APILatencies != nil {
for za0001, za0002 := range z.APILatencies {
_ = za0002
s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002)
s += msgp.StringPrefixSize + len(za0001) + msgp.Uint64Size
}
}
s += 9 + msgp.MapHeaderSize

View file

@ -25,7 +25,6 @@ import (
"sync/atomic"
"time"
"github.com/VividCortex/ewma"
"github.com/minio/madmin-go"
)
@ -71,18 +70,18 @@ type xlStorageDiskIDCheck struct {
// please use `fieldalignment ./...` to check
// if your changes are not causing any problems.
storage StorageAPI
apiLatencies [storageMetricLast]ewma.MovingAverage
apiLatencies [storageMetricLast]*lockedLastMinuteLatency
diskID string
apiCalls [storageMetricLast]uint64
}
func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
diskMetric := DiskMetrics{
APILatencies: make(map[string]string),
APILatencies: make(map[string]uint64),
APICalls: make(map[string]uint64),
}
for i, v := range p.apiLatencies {
diskMetric.APILatencies[storageMetric(i).String()] = time.Duration(v.Value()).String()
diskMetric.APILatencies[storageMetric(i).String()] = v.value()
}
for i := range p.apiCalls {
diskMetric.APICalls[storageMetric(i).String()] = atomic.LoadUint64(&p.apiCalls[i])
@ -90,28 +89,21 @@ func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
return diskMetric
}
type lockedSimpleEWMA struct {
sync.RWMutex
*ewma.SimpleEWMA
type lockedLastMinuteLatency struct {
sync.Mutex
lastMinuteLatency
}
func (e *lockedSimpleEWMA) Add(value float64) {
func (e *lockedLastMinuteLatency) add(value time.Duration) {
e.Lock()
defer e.Unlock()
e.SimpleEWMA.Add(value)
e.lastMinuteLatency.add(value)
}
func (e *lockedSimpleEWMA) Set(value float64) {
func (e *lockedLastMinuteLatency) value() uint64 {
e.Lock()
defer e.Unlock()
e.SimpleEWMA.Set(value)
}
func (e *lockedSimpleEWMA) Value() float64 {
e.RLock()
defer e.RUnlock()
return e.SimpleEWMA.Value()
return e.lastMinuteLatency.getAvgData().avg()
}
func newXLStorageDiskIDCheck(storage *xlStorage) *xlStorageDiskIDCheck {
@ -119,9 +111,7 @@ func newXLStorageDiskIDCheck(storage *xlStorage) *xlStorageDiskIDCheck {
storage: storage,
}
for i := range xl.apiLatencies[:] {
xl.apiLatencies[i] = &lockedSimpleEWMA{
SimpleEWMA: new(ewma.SimpleEWMA),
}
xl.apiLatencies[i] = &lockedLastMinuteLatency{}
}
return &xl
}
@ -582,7 +572,7 @@ func (p *xlStorageDiskIDCheck) updateStorageMetrics(s storageMetric, paths ...st
duration := time.Since(startTime)
atomic.AddUint64(&p.apiCalls[s], 1)
p.apiLatencies[s].Add(float64(duration))
p.apiLatencies[s].add(duration)
if trace {
globalTrace.Publish(storageTrace(s, startTime, duration, strings.Join(paths, " ")))