Persist in-memory replication stats to disk (#15594)

to avoid relying on scanner-calculated replication metrics.
This will improve the accuracy of the replication stats reported.

This PR also adds on to #15556 by handing replication
traffic that could not be queued by available workers to the 
MRF queue so that entries in `PENDING` status are healed faster.
This commit is contained in:
Poorna 2022-09-12 12:40:02 -07:00 committed by GitHub
parent d6132b854f
commit 6b9fd256e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 476 additions and 219 deletions

View file

@ -217,7 +217,7 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseW
w.Header().Set(xhttp.ContentType, string(mimeJSON))
enc := json.NewEncoder(w)
if err = enc.Encode(getLatestReplicationStats(bucket, usageInfo)); err != nil {
if err = enc.Encode(globalReplicationStats.getLatestReplicationStats(bucket, usageInfo)); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}

View file

@ -19,6 +19,7 @@ package cmd
import (
"context"
"math"
"sync"
"time"
@ -36,10 +37,12 @@ func (b *BucketReplicationStats) hasReplicationUsage() bool {
// ReplicationStats holds the global in-memory replication stats
type ReplicationStats struct {
Cache map[string]*BucketReplicationStats
UsageCache map[string]*BucketReplicationStats
sync.RWMutex
ulock sync.RWMutex
Cache map[string]*BucketReplicationStats
UsageCache map[string]*BucketReplicationStats
mostRecentStats BucketStatsMap
sync.RWMutex // mutex for Cache
ulock sync.RWMutex // mutex for UsageCache
dlock sync.RWMutex // mutex for mostRecentStats
}
// Delete deletes in-memory replication statistics for a bucket.
@ -92,14 +95,23 @@ func (r *ReplicationStats) Update(bucket string, arn string, n int64, duration t
bs.Stats[arn] = b
}
switch status {
case replication.Pending:
if opType.IsDataReplication() && prevStatus != status {
b.PendingSize += n
b.PendingCount++
}
case replication.Completed:
switch prevStatus { // adjust counters based on previous state
case replication.Pending:
b.PendingCount--
case replication.Failed:
b.FailedCount--
}
if opType == replication.ObjectReplicationType {
if opType.IsDataReplication() {
b.ReplicatedSize += n
switch prevStatus {
case replication.Pending:
b.PendingSize -= n
case replication.Failed:
b.FailedSize -= n
}
@ -108,10 +120,12 @@ func (r *ReplicationStats) Update(bucket string, arn string, n int64, duration t
}
}
case replication.Failed:
if opType == replication.ObjectReplicationType {
if opType.IsDataReplication() {
if prevStatus == replication.Pending {
b.FailedSize += n
b.FailedCount++
b.PendingSize -= n
b.PendingCount--
}
}
case replication.Replica:
@ -176,9 +190,20 @@ func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *Replicatio
}
}
// load replication metrics at cluster start from initial data usage
// load replication metrics at cluster start from latest replication stats saved in .minio.sys/buckets/replication/node-name.stats
// fallback to replication stats in data usage to be backward compatible
func (r *ReplicationStats) loadInitialReplicationMetrics(ctx context.Context) {
rTimer := time.NewTimer(time.Minute)
m := make(map[string]*BucketReplicationStats)
if stats, err := globalReplicationPool.loadStatsFromDisk(); err == nil {
for b, st := range stats {
m[b] = &st
}
r.ulock.Lock()
r.UsageCache = m
r.ulock.Unlock()
return
}
rTimer := time.NewTimer(time.Second * 5)
defer rTimer.Stop()
var (
dui DataUsageInfo
@ -192,15 +217,12 @@ outer:
case <-rTimer.C:
dui, err = loadDataUsageFromBackend(GlobalContext, newObjectLayerFn())
// If LastUpdate is set, data usage is available.
if err == nil && !dui.LastUpdate.IsZero() {
if err == nil {
break outer
}
rTimer.Reset(time.Minute)
rTimer.Reset(time.Second * 5)
}
}
m := make(map[string]*BucketReplicationStats)
for bucket, usage := range dui.BucketsUsage {
b := &BucketReplicationStats{
Stats: make(map[string]*BucketReplicationStat, len(usage.ReplicationInfo)),
@ -219,6 +241,123 @@ outer:
}
}
r.ulock.Lock()
defer r.ulock.Unlock()
r.UsageCache = m
r.ulock.Unlock()
}
func (r *ReplicationStats) getAllCachedLatest() BucketStatsMap {
r.dlock.RLock()
defer r.dlock.RUnlock()
return r.mostRecentStats
}
func (r *ReplicationStats) getAllLatest(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketReplicationStats) {
peerBucketStatsList := globalNotificationSys.GetClusterAllBucketStats(GlobalContext)
bucketsReplicationStats = make(map[string]BucketReplicationStats, len(bucketsUsage))
for bucket, u := range bucketsUsage {
bucketStats := make([]BucketStats, len(peerBucketStatsList))
for i, peerBucketStats := range peerBucketStatsList {
bucketStat, ok := peerBucketStats.Stats[bucket]
if !ok {
continue
}
bucketStats[i] = bucketStat
}
bucketsReplicationStats[bucket] = r.calculateBucketReplicationStats(bucket, u, bucketStats)
}
return bucketsReplicationStats
}
func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, u BucketUsageInfo, bucketStats []BucketStats) (s BucketReplicationStats) {
// accumulate cluster bucket stats
stats := make(map[string]*BucketReplicationStat)
var totReplicaSize int64
for _, bucketStat := range bucketStats {
totReplicaSize += bucketStat.ReplicationStats.ReplicaSize
for arn, stat := range bucketStat.ReplicationStats.Stats {
oldst := stats[arn]
if oldst == nil {
oldst = &BucketReplicationStat{}
}
stats[arn] = &BucketReplicationStat{
FailedCount: stat.FailedCount + oldst.FailedCount,
FailedSize: stat.FailedSize + oldst.FailedSize,
ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize,
Latency: stat.Latency.merge(oldst.Latency),
PendingCount: stat.PendingCount + oldst.PendingCount,
PendingSize: stat.PendingSize + oldst.PendingSize,
}
}
}
// add initial usage stat to cluster stats
usageStat := globalReplicationStats.GetInitialUsage(bucket)
totReplicaSize += usageStat.ReplicaSize
for arn, stat := range usageStat.Stats {
st, ok := stats[arn]
if !ok {
st = &BucketReplicationStat{}
stats[arn] = st
}
st.ReplicatedSize += stat.ReplicatedSize
st.FailedSize += stat.FailedSize
st.FailedCount += stat.FailedCount
st.PendingSize += stat.PendingSize
st.PendingCount += stat.PendingCount
}
s = BucketReplicationStats{
Stats: make(map[string]*BucketReplicationStat, len(stats)),
}
var latestTotReplicatedSize int64
for _, st := range u.ReplicationInfo {
latestTotReplicatedSize += int64(st.ReplicatedSize)
}
// normalize computed real time stats with latest usage stat
for arn, tgtstat := range stats {
st := BucketReplicationStat{}
bu, ok := u.ReplicationInfo[arn]
if !ok {
bu = BucketTargetUsageInfo{}
}
// use in memory replication stats if it is ahead of usage info.
st.ReplicatedSize = int64(bu.ReplicatedSize)
if tgtstat.ReplicatedSize >= int64(bu.ReplicatedSize) {
st.ReplicatedSize = tgtstat.ReplicatedSize
}
s.ReplicatedSize += st.ReplicatedSize
// Reset FailedSize and FailedCount to 0 for negative overflows which can
// happen since data usage picture can lag behind actual usage state at the time of cluster start
st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0))
st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0))
st.PendingSize = int64(math.Max(float64(tgtstat.PendingSize), 0))
st.PendingCount = int64(math.Max(float64(tgtstat.PendingCount), 0))
st.Latency = tgtstat.Latency
s.Stats[arn] = &st
s.FailedSize += st.FailedSize
s.FailedCount += st.FailedCount
s.PendingCount += st.PendingCount
s.PendingSize += st.PendingSize
}
// normalize overall stats
s.ReplicaSize = int64(math.Max(float64(totReplicaSize), float64(u.ReplicaSize)))
s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(latestTotReplicatedSize)))
r.dlock.Lock()
if len(r.mostRecentStats.Stats) == 0 {
r.mostRecentStats = BucketStatsMap{Stats: make(map[string]BucketStats, 1), Timestamp: UTCNow()}
}
r.mostRecentStats.Stats[bucket] = BucketStats{ReplicationStats: s}
r.mostRecentStats.Timestamp = UTCNow()
r.dlock.Unlock()
return s
}
// get the most current of in-memory replication stats and data usage info from crawler.
func (r *ReplicationStats) getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) {
bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket)
return r.calculateBucketReplicationStats(bucket, u, bucketStats)
}

View file

@ -766,3 +766,14 @@ func (ri ReplicateObjectInfo) ToMRFEntry() MRFReplicateEntry {
versionID: ri.VersionID,
}
}
func getReplicationStatsPath(nodeName string) string {
return bucketMetaPrefix + SlashSeparator + replicationDir + SlashSeparator + nodeName + ".stats"
}
const (
replStatsMetaFormat = 1
replStatsVersionV1 = 1
replStatsVersion = replStatsVersionV1
replStatsSaveInterval = time.Minute * 5
)

View file

@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"io"
"math"
"net/http"
"path"
"reflect"
@ -1369,6 +1368,7 @@ type ReplicationPool struct {
existingReplicaCh chan ReplicateObjectInfo
existingReplicaDeleteCh chan DeletedObjectReplicationInfo
mrfSaveCh chan MRFReplicateEntry
saveStateCh chan struct{}
workerSize int
mrfWorkerSize int
@ -1391,6 +1391,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000),
resyncState: replicationResyncState{statusMap: make(map[string]BucketReplicationResyncStatus)},
mrfSaveCh: make(chan MRFReplicateEntry, 100000),
saveStateCh: make(chan struct{}, 1),
ctx: ctx,
objLayer: o,
}
@ -1401,6 +1402,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
go pool.updateResyncStatus(ctx, o)
go pool.processMRF()
go pool.persistMRF()
go pool.saveStatsToDisk()
return pool
}
@ -1526,10 +1528,12 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
close(p.replicaCh)
close(p.mrfReplicaCh)
close(p.existingReplicaCh)
close(p.saveStateCh)
})
case healCh <- ri:
case ch <- ri:
default:
globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing number of replicate object workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), string(replicationSubsystem))
}
}
@ -1567,6 +1571,7 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
})
case ch <- doi:
default:
globalReplicationPool.queueMRFSave(doi.ToMRFEntry())
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing number of replicate workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), string(replicationSubsystem))
}
}
@ -1886,100 +1891,6 @@ func resyncTarget(oi ObjectInfo, arn string, resetID string, resetBeforeDate tim
return
}
func getAllLatestReplicationStats(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketReplicationStats) {
peerBucketStatsList := globalNotificationSys.GetClusterAllBucketStats(GlobalContext)
bucketsReplicationStats = make(map[string]BucketReplicationStats, len(bucketsUsage))
for bucket, u := range bucketsUsage {
bucketStats := make([]BucketStats, len(peerBucketStatsList))
for i, peerBucketStats := range peerBucketStatsList {
bucketStat, ok := peerBucketStats[bucket]
if !ok {
continue
}
bucketStats[i] = bucketStat
}
bucketsReplicationStats[bucket] = calculateBucketReplicationStats(bucket, u, bucketStats)
}
return bucketsReplicationStats
}
func calculateBucketReplicationStats(bucket string, u BucketUsageInfo, bucketStats []BucketStats) (s BucketReplicationStats) {
// accumulate cluster bucket stats
stats := make(map[string]*BucketReplicationStat)
var totReplicaSize int64
for _, bucketStat := range bucketStats {
totReplicaSize += bucketStat.ReplicationStats.ReplicaSize
for arn, stat := range bucketStat.ReplicationStats.Stats {
oldst := stats[arn]
if oldst == nil {
oldst = &BucketReplicationStat{}
}
stats[arn] = &BucketReplicationStat{
FailedCount: stat.FailedCount + oldst.FailedCount,
FailedSize: stat.FailedSize + oldst.FailedSize,
ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize,
Latency: stat.Latency.merge(oldst.Latency),
}
}
}
// add initial usage stat to cluster stats
usageStat := globalReplicationStats.GetInitialUsage(bucket)
totReplicaSize += usageStat.ReplicaSize
for arn, stat := range usageStat.Stats {
st, ok := stats[arn]
if !ok {
st = &BucketReplicationStat{}
stats[arn] = st
}
st.ReplicatedSize += stat.ReplicatedSize
st.FailedSize += stat.FailedSize
st.FailedCount += stat.FailedCount
}
s = BucketReplicationStats{
Stats: make(map[string]*BucketReplicationStat, len(stats)),
}
var latestTotReplicatedSize int64
for _, st := range u.ReplicationInfo {
latestTotReplicatedSize += int64(st.ReplicatedSize)
}
// normalize computed real time stats with latest usage stat
for arn, tgtstat := range stats {
st := BucketReplicationStat{}
bu, ok := u.ReplicationInfo[arn]
if !ok {
bu = BucketTargetUsageInfo{}
}
// use in memory replication stats if it is ahead of usage info.
st.ReplicatedSize = int64(bu.ReplicatedSize)
if tgtstat.ReplicatedSize >= int64(bu.ReplicatedSize) {
st.ReplicatedSize = tgtstat.ReplicatedSize
}
s.ReplicatedSize += st.ReplicatedSize
// Reset FailedSize and FailedCount to 0 for negative overflows which can
// happen since data usage picture can lag behind actual usage state at the time of cluster start
st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0))
st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0))
st.Latency = tgtstat.Latency
s.Stats[arn] = &st
s.FailedSize += st.FailedSize
s.FailedCount += st.FailedCount
}
// normalize overall stats
s.ReplicaSize = int64(math.Max(float64(totReplicaSize), float64(u.ReplicaSize)))
s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(latestTotReplicatedSize)))
return s
}
// get the most current of in-memory replication stats and data usage info from crawler.
func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) {
bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket)
return calculateBucketReplicationStats(bucket, u, bucketStats)
}
const resyncTimeInterval = time.Minute * 1
// updateResyncStatus persists in-memory resync metadata stats to disk at periodic intervals
@ -2520,7 +2431,6 @@ func (p *ReplicationPool) persistMRF() {
logger.LogOnceIf(p.ctx, fmt.Errorf("Unable to persist replication failures to disk:%w", err), string(replicationSubsystem))
}
entries = make(map[string]MRFReplicateEntry)
return
}
for {
select {
@ -2531,6 +2441,9 @@ func (p *ReplicationPool) persistMRF() {
close(p.mrfSaveCh)
saveMRFToDisk(true)
return
case <-p.saveStateCh:
saveMRFToDisk(true)
return
case e, ok := <-p.mrfSaveCh:
if !ok {
return
@ -2683,3 +2596,91 @@ func (p *ReplicationPool) queueMRFHeal(file string) error {
}
return nil
}
// load replication stats from disk
func (p *ReplicationPool) loadStatsFromDisk() (rs map[string]BucketReplicationStats, e error) {
data, err := readConfig(p.ctx, p.objLayer, getReplicationStatsPath(globalLocalNodeName))
if err != nil {
if !errors.Is(err, errConfigNotFound) {
return rs, nil
}
return rs, err
}
if len(data) <= 4 {
logger.LogIf(p.ctx, fmt.Errorf("replication stats: no data"))
return map[string]BucketReplicationStats{}, nil
}
// Read repl stats meta header
switch binary.LittleEndian.Uint16(data[0:2]) {
case replStatsMetaFormat:
default:
return rs, fmt.Errorf("replication stats: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
}
switch binary.LittleEndian.Uint16(data[2:4]) {
case replStatsVersion:
default:
return rs, fmt.Errorf("replication stats: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
}
ss := BucketStatsMap{}
if _, err = ss.UnmarshalMsg(data[4:]); err != nil {
return rs, err
}
rs = make(map[string]BucketReplicationStats, len(ss.Stats))
for bucket, st := range ss.Stats {
rs[bucket] = st.ReplicationStats
}
return rs, nil
}
func (p *ReplicationPool) saveStatsToDisk() {
if p == nil || p.objLayer == nil {
return
}
sTimer := time.NewTimer(replStatsSaveInterval)
defer sTimer.Stop()
for {
select {
case <-sTimer.C:
dui, err := loadDataUsageFromBackend(GlobalContext, newObjectLayerFn())
if err == nil && !dui.LastUpdate.IsZero() {
globalReplicationStats.getAllLatest(dui.BucketsUsage)
}
p.saveStats(p.ctx)
sTimer.Reset(replStatsSaveInterval)
case <-p.ctx.Done():
return
}
}
}
// save replication stats to .minio.sys/buckets/replication/node-name.stats
func (p *ReplicationPool) saveStats(ctx context.Context) error {
bsm := globalReplicationStats.getAllCachedLatest()
if len(bsm.Stats) == 0 {
return nil
}
data := make([]byte, 4, 4+bsm.Msgsize())
// Add the replication stats meta header.
binary.LittleEndian.PutUint16(data[0:2], replStatsMetaFormat)
binary.LittleEndian.PutUint16(data[2:4], replStatsVersion)
// Add data
data, err := bsm.MarshalMsg(data)
if err != nil {
return err
}
return saveConfig(ctx, p.objLayer, getReplicationStatsPath(globalLocalNodeName), data)
}
// SaveState saves replication stats and mrf data before server restart
func (p *ReplicationPool) SaveState(ctx context.Context) error {
go func() {
select {
case p.saveStateCh <- struct{}{}:
case <-p.ctx.Done():
return
}
}()
return p.saveStats(ctx)
}

View file

@ -18,6 +18,7 @@
package cmd
import (
"fmt"
"time"
)
@ -52,7 +53,10 @@ func (rl *ReplicationLatency) update(size int64, duration time.Duration) {
}
// BucketStatsMap captures bucket statistics for all buckets
type BucketStatsMap map[string]BucketStats
type BucketStatsMap struct {
Stats map[string]BucketStats
Timestamp time.Time
}
// BucketStats bucket statistics
type BucketStats struct {
@ -126,3 +130,11 @@ func (bs *BucketReplicationStat) hasReplicationUsage() bool {
bs.PendingCount > 0 ||
bs.PendingSize > 0
}
func (brs BucketReplicationStats) String() string {
s := "ReplicatedSize=" + fmt.Sprintf("%d", brs.ReplicatedSize) + "+\n ReplicaSize=" + fmt.Sprintf("%d", brs.ReplicaSize)
for arn, st := range brs.Stats {
s += "\n arn: " + arn + " ReplicatedSize=" + fmt.Sprintf("%d", st.ReplicatedSize) + " +::ReplicaSize=" + fmt.Sprintf("%d", st.ReplicaSize)
}
return s
}

View file

@ -794,74 +794,109 @@ func (z *BucketStats) Msgsize() (s int) {
// DecodeMsg implements msgp.Decodable
func (z *BucketStatsMap) DecodeMsg(dc *msgp.Reader) (err error) {
var zb0003 uint32
zb0003, err = dc.ReadMapHeader()
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
if (*z) == nil {
(*z) = make(BucketStatsMap, zb0003)
} else if len((*z)) > 0 {
for key := range *z {
delete((*z), key)
}
}
for zb0003 > 0 {
zb0003--
var zb0001 string
var zb0002 BucketStats
zb0001, err = dc.ReadString()
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
var field []byte
_ = field
var zb0004 uint32
zb0004, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, zb0001)
return
}
for zb0004 > 0 {
zb0004--
field, err = dc.ReadMapKeyPtr()
switch msgp.UnsafeString(field) {
case "Stats":
var zb0002 uint32
zb0002, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, zb0001)
err = msgp.WrapError(err, "Stats")
return
}
switch msgp.UnsafeString(field) {
case "ReplicationStats":
err = zb0002.ReplicationStats.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, zb0001, "ReplicationStats")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err, zb0001)
return
if z.Stats == nil {
z.Stats = make(map[string]BucketStats, zb0002)
} else if len(z.Stats) > 0 {
for key := range z.Stats {
delete(z.Stats, key)
}
}
for zb0002 > 0 {
zb0002--
var za0001 string
var za0002 BucketStats
za0001, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Stats")
return
}
var zb0003 uint32
zb0003, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "Stats", za0001)
return
}
for zb0003 > 0 {
zb0003--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err, "Stats", za0001)
return
}
switch msgp.UnsafeString(field) {
case "ReplicationStats":
err = za0002.ReplicationStats.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "Stats", za0001, "ReplicationStats")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err, "Stats", za0001)
return
}
}
}
z.Stats[za0001] = za0002
}
case "Timestamp":
z.Timestamp, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "Timestamp")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
(*z)[zb0001] = zb0002
}
return
}
// EncodeMsg implements msgp.Encodable
func (z BucketStatsMap) EncodeMsg(en *msgp.Writer) (err error) {
err = en.WriteMapHeader(uint32(len(z)))
func (z *BucketStatsMap) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "Stats"
err = en.Append(0x82, 0xa5, 0x53, 0x74, 0x61, 0x74, 0x73)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0005, zb0006 := range z {
err = en.WriteString(zb0005)
err = en.WriteMapHeader(uint32(len(z.Stats)))
if err != nil {
err = msgp.WrapError(err, "Stats")
return
}
for za0001, za0002 := range z.Stats {
err = en.WriteString(za0001)
if err != nil {
err = msgp.WrapError(err)
err = msgp.WrapError(err, "Stats")
return
}
// map header, size 1
@ -870,102 +905,148 @@ func (z BucketStatsMap) EncodeMsg(en *msgp.Writer) (err error) {
if err != nil {
return
}
err = zb0006.ReplicationStats.EncodeMsg(en)
err = za0002.ReplicationStats.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, zb0005, "ReplicationStats")
err = msgp.WrapError(err, "Stats", za0001, "ReplicationStats")
return
}
}
// write "Timestamp"
err = en.Append(0xa9, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70)
if err != nil {
return
}
err = en.WriteTime(z.Timestamp)
if err != nil {
err = msgp.WrapError(err, "Timestamp")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z BucketStatsMap) MarshalMsg(b []byte) (o []byte, err error) {
func (z *BucketStatsMap) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendMapHeader(o, uint32(len(z)))
for zb0005, zb0006 := range z {
o = msgp.AppendString(o, zb0005)
// map header, size 2
// string "Stats"
o = append(o, 0x82, 0xa5, 0x53, 0x74, 0x61, 0x74, 0x73)
o = msgp.AppendMapHeader(o, uint32(len(z.Stats)))
for za0001, za0002 := range z.Stats {
o = msgp.AppendString(o, za0001)
// map header, size 1
// string "ReplicationStats"
o = append(o, 0x81, 0xb0, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73)
o, err = zb0006.ReplicationStats.MarshalMsg(o)
o, err = za0002.ReplicationStats.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, zb0005, "ReplicationStats")
err = msgp.WrapError(err, "Stats", za0001, "ReplicationStats")
return
}
}
// string "Timestamp"
o = append(o, 0xa9, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70)
o = msgp.AppendTime(o, z.Timestamp)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *BucketStatsMap) UnmarshalMsg(bts []byte) (o []byte, err error) {
var zb0003 uint32
zb0003, bts, err = msgp.ReadMapHeaderBytes(bts)
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
if (*z) == nil {
(*z) = make(BucketStatsMap, zb0003)
} else if len((*z)) > 0 {
for key := range *z {
delete((*z), key)
}
}
for zb0003 > 0 {
var zb0001 string
var zb0002 BucketStats
zb0003--
zb0001, bts, err = msgp.ReadStringBytes(bts)
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
var field []byte
_ = field
var zb0004 uint32
zb0004, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, zb0001)
return
}
for zb0004 > 0 {
zb0004--
field, bts, err = msgp.ReadMapKeyZC(bts)
switch msgp.UnsafeString(field) {
case "Stats":
var zb0002 uint32
zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, zb0001)
err = msgp.WrapError(err, "Stats")
return
}
switch msgp.UnsafeString(field) {
case "ReplicationStats":
bts, err = zb0002.ReplicationStats.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, zb0001, "ReplicationStats")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err, zb0001)
return
if z.Stats == nil {
z.Stats = make(map[string]BucketStats, zb0002)
} else if len(z.Stats) > 0 {
for key := range z.Stats {
delete(z.Stats, key)
}
}
for zb0002 > 0 {
var za0001 string
var za0002 BucketStats
zb0002--
za0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Stats")
return
}
var zb0003 uint32
zb0003, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Stats", za0001)
return
}
for zb0003 > 0 {
zb0003--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err, "Stats", za0001)
return
}
switch msgp.UnsafeString(field) {
case "ReplicationStats":
bts, err = za0002.ReplicationStats.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "Stats", za0001, "ReplicationStats")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err, "Stats", za0001)
return
}
}
}
z.Stats[za0001] = za0002
}
case "Timestamp":
z.Timestamp, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Timestamp")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
(*z)[zb0001] = zb0002
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z BucketStatsMap) Msgsize() (s int) {
s = msgp.MapHeaderSize
if z != nil {
for zb0005, zb0006 := range z {
_ = zb0006
s += msgp.StringPrefixSize + len(zb0005) + 1 + 17 + zb0006.ReplicationStats.Msgsize()
func (z *BucketStatsMap) Msgsize() (s int) {
s = 1 + 6 + msgp.MapHeaderSize
if z.Stats != nil {
for za0001, za0002 := range z.Stats {
_ = za0002
s += msgp.StringPrefixSize + len(za0001) + 1 + 17 + za0002.ReplicationStats.Msgsize()
}
}
s += 10 + msgp.TimeSize
return
}

View file

@ -1661,7 +1661,7 @@ func getBucketUsageMetrics() *MetricsGroup {
Value: float64(time.Since(dataUsageInfo.LastUpdate)),
})
bucketReplStats := getAllLatestReplicationStats(dataUsageInfo.BucketsUsage)
bucketReplStats := globalReplicationStats.getAllLatest(dataUsageInfo.BucketsUsage)
for bucket, usage := range dataUsageInfo.BucketsUsage {
stats := bucketReplStats[bucket]

View file

@ -458,7 +458,7 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {
}
for bucket, usageInfo := range dataUsageInfo.BucketsUsage {
stat := getLatestReplicationStats(bucket, usageInfo)
stat := globalReplicationStats.getLatestReplicationStats(bucket, usageInfo)
// Total space used by bucket
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(

View file

@ -569,14 +569,17 @@ func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []Buck
}
replicationStatsList := globalReplicationStats.GetAll()
bucketStatsMap := make(map[string]BucketStats, len(replicationStatsList))
bucketStatsMap := BucketStatsMap{
Stats: make(map[string]BucketStats, len(replicationStatsList)),
Timestamp: UTCNow(),
}
for k, replicationStats := range replicationStatsList {
bucketStatsMap[k] = BucketStats{
bucketStatsMap.Stats[k] = BucketStats{
ReplicationStats: replicationStats,
}
}
replicationStats = append(replicationStats, BucketStatsMap(bucketStatsMap))
replicationStats = append(replicationStats, bucketStatsMap)
return replicationStats
}

View file

@ -263,7 +263,7 @@ func (client *peerRESTClient) GetAllBucketStats() (BucketStatsMap, error) {
values := make(url.Values)
respBody, err := client.call(peerRESTMethodGetAllBucketStats, values, nil, -1)
if err != nil {
return nil, err
return BucketStatsMap{}, err
}
bsMap := BucketStatsMap{}

View file

@ -18,7 +18,7 @@
package cmd
const (
peerRESTVersion = "v26" // Add user-type to LoadPolicyMapping
peerRESTVersion = "v27" // change in GetAllBucketStats response.
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
peerRESTPrefix = minioReservedBucketPath + "/peer"
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix

View file

@ -549,8 +549,7 @@ func (s *peerRESTServer) GetAllBucketStatsHandler(w http.ResponseWriter, r *http
ReplicationStats: v,
}
}
logger.LogIf(r.Context(), msgp.Encode(w, BucketStatsMap(bucketStatsMap)))
logger.LogIf(r.Context(), msgp.Encode(w, &BucketStatsMap{Stats: bucketStatsMap, Timestamp: UTCNow()}))
}
// GetBucketStatsHandler - fetches current in-memory bucket stats, currently only

View file

@ -78,9 +78,11 @@ func handleSignals() {
case <-globalHTTPServerErrorCh:
exit(stopProcess())
case osSignal := <-globalOSSignalCh:
globalReplicationPool.SaveState(context.Background())
logger.Info("Exiting on signal: %s", strings.ToUpper(osSignal.String()))
exit(stopProcess())
case signal := <-globalServiceSignalCh:
globalReplicationPool.SaveState(context.Background())
switch signal {
case serviceRestart:
logger.Info("Restarting on service signal")

View file

@ -138,6 +138,15 @@ func (t Type) Valid() bool {
return t > 0
}
// IsDataReplication returns true if content being replicated
func (t Type) IsDataReplication() bool {
switch t {
case ObjectReplicationType, HealReplicationType, ExistingObjectReplicationType:
return true
}
return false
}
// ObjectOpts provides information to deduce whether replication
// can be triggered on the resultant object.
type ObjectOpts struct {