minio/cmd/bucket-replication-utils.go
Krishnan Parthasarathi ad8e611098
feat: implement prefix-level versioning exclusion (#14828)
Spark/Hadoop workloads which use Hadoop MR 
Committer v1/v2 algorithm upload objects to a 
temporary prefix in a bucket. These objects are 
'renamed' to a different prefix on Job commit. 
Object storage admins are forced to configure 
separate ILM policies to expire these objects 
and their versions to reclaim space.

Our solution:

This can be avoided by simply marking objects 
under these prefixes to be excluded from versioning, 
as shown below. Consequently, these objects are 
excluded from replication, and don't require ILM 
policies to prune unnecessary versions.

-  MinIO Extension to Bucket Version Configuration
```xml
<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> 
        <Status>Enabled</Status>
        <ExcludeFolders>true</ExcludeFolders>
        <ExcludedPrefixes>
          <Prefix>app1-jobs/*/_temporary/</Prefix>
        </ExcludedPrefixes>
        <ExcludedPrefixes>
          <Prefix>app2-jobs/*/__magic/</Prefix>
        </ExcludedPrefixes>

        <!-- .. up to 10 prefixes in all -->     
</VersioningConfiguration>
```
Note: `ExcludeFolders` excludes all folders in a bucket 
from versioning. This is required to prevent the parent 
folders from accumulating delete markers, especially
those which are shared across spark workloads 
spanning projects/teams.

- To enable version exclusion on a list of prefixes

```
mc version enable --excluded-prefixes "app1-jobs/*/_temporary/,app2-jobs/*/_magic," --exclude-prefix-marker myminio/test
```
2022-05-06 19:05:28 -07:00

727 lines
23 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bytes"
"fmt"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/minio/minio/internal/bucket/replication"
xhttp "github.com/minio/minio/internal/http"
)
//go:generate msgp -file=$GOFILE
// replicatedTargetInfo struct represents replication info on a target
type replicatedTargetInfo struct {
Arn string
Size int64
Duration time.Duration
ReplicationAction replicationAction // full or metadata only
OpType replication.Type // whether incoming replication, existing object, healing etc..
ReplicationStatus replication.StatusType
PrevReplicationStatus replication.StatusType
VersionPurgeStatus VersionPurgeStatusType
ResyncTimestamp string
ReplicationResynced bool // true only if resync attempted for this target
}
// Empty returns true for a target if arn is empty
func (rt replicatedTargetInfo) Empty() bool {
return rt.Arn == ""
}
type replicatedInfos struct {
ReplicationTimeStamp time.Time
Targets []replicatedTargetInfo
}
func (ri replicatedInfos) CompletedSize() (sz int64) {
for _, t := range ri.Targets {
if t.Empty() {
continue
}
if t.ReplicationStatus == replication.Completed && t.PrevReplicationStatus != replication.Completed {
sz += t.Size
}
}
return sz
}
// ReplicationAttempted returns true if replication was attempted on any of the targets for the object version
// queued
func (ri replicatedInfos) ReplicationResynced() bool {
for _, t := range ri.Targets {
if t.Empty() || !t.ReplicationResynced {
continue
}
return true
}
return false
}
func (ri replicatedInfos) ReplicationStatusInternal() string {
b := new(bytes.Buffer)
for _, t := range ri.Targets {
if t.Empty() {
continue
}
fmt.Fprintf(b, "%s=%s;", t.Arn, t.ReplicationStatus.String())
}
return b.String()
}
func (ri replicatedInfos) ReplicationStatus() replication.StatusType {
if len(ri.Targets) == 0 {
return replication.StatusType("")
}
completed := 0
for _, v := range ri.Targets {
switch v.ReplicationStatus {
case replication.Failed:
return replication.Failed
case replication.Completed:
completed++
}
}
if completed == len(ri.Targets) {
return replication.Completed
}
return replication.Pending
}
func (ri replicatedInfos) VersionPurgeStatus() VersionPurgeStatusType {
if len(ri.Targets) == 0 {
return VersionPurgeStatusType("")
}
completed := 0
for _, v := range ri.Targets {
switch v.VersionPurgeStatus {
case Failed:
return Failed
case Complete:
completed++
}
}
if completed == len(ri.Targets) {
return Complete
}
return Pending
}
func (ri replicatedInfos) VersionPurgeStatusInternal() string {
b := new(bytes.Buffer)
for _, t := range ri.Targets {
if t.Empty() {
continue
}
if t.VersionPurgeStatus.Empty() {
continue
}
fmt.Fprintf(b, "%s=%s;", t.Arn, t.VersionPurgeStatus)
}
return b.String()
}
func (ri replicatedInfos) Action() replicationAction {
for _, t := range ri.Targets {
if t.Empty() {
continue
}
// rely on replication action from target that actually performed replication now.
if t.PrevReplicationStatus != replication.Completed {
return t.ReplicationAction
}
}
return replicateNone
}
var replStatusRegex = regexp.MustCompile(`([^=].*?)=([^,].*?);`)
// TargetReplicationStatus - returns replication status of a target
func (o *ObjectInfo) TargetReplicationStatus(arn string) (status replication.StatusType) {
repStatMatches := replStatusRegex.FindAllStringSubmatch(o.ReplicationStatusInternal, -1)
for _, repStatMatch := range repStatMatches {
if len(repStatMatch) != 3 {
return
}
if repStatMatch[1] == arn {
return replication.StatusType(repStatMatch[2])
}
}
return
}
type replicateTargetDecision struct {
Replicate bool // Replicate to this target
Synchronous bool // Synchronous replication configured.
Arn string // ARN of replication target
ID string
}
func (t *replicateTargetDecision) String() string {
return fmt.Sprintf("%t;%t;%s;%s", t.Replicate, t.Synchronous, t.Arn, t.ID)
}
func newReplicateTargetDecision(arn string, replicate bool, sync bool) replicateTargetDecision {
d := replicateTargetDecision{
Replicate: replicate,
Synchronous: sync,
Arn: arn,
}
return d
}
// ReplicateDecision represents replication decision for each target
type ReplicateDecision struct {
targetsMap map[string]replicateTargetDecision
}
// ReplicateAny returns true if atleast one target qualifies for replication
func (d *ReplicateDecision) ReplicateAny() bool {
for _, t := range d.targetsMap {
if t.Replicate {
return true
}
}
return false
}
// Synchronous returns true if atleast one target qualifies for synchronous replication
func (d *ReplicateDecision) Synchronous() bool {
for _, t := range d.targetsMap {
if t.Synchronous {
return true
}
}
return false
}
func (d *ReplicateDecision) String() string {
b := new(bytes.Buffer)
for key, value := range d.targetsMap {
fmt.Fprintf(b, "%s=%s,", key, value.String())
}
return strings.TrimSuffix(b.String(), ",")
}
// Set updates ReplicateDecision with target's replication decision
func (d *ReplicateDecision) Set(t replicateTargetDecision) {
if d.targetsMap == nil {
d.targetsMap = make(map[string]replicateTargetDecision)
}
d.targetsMap[t.Arn] = t
}
// PendingStatus returns a stringified representation of internal replication status with all targets marked as `PENDING`
func (d *ReplicateDecision) PendingStatus() string {
b := new(bytes.Buffer)
for _, k := range d.targetsMap {
if k.Replicate {
fmt.Fprintf(b, "%s=%s;", k.Arn, replication.Pending.String())
}
}
return b.String()
}
// ResyncDecision is a struct representing a map with target's individual resync decisions
type ResyncDecision struct {
targets map[string]ResyncTargetDecision
}
// Empty returns true if no targets with resync decision present
func (r *ResyncDecision) Empty() bool {
return r.targets == nil
}
func (r *ResyncDecision) mustResync() bool {
for _, v := range r.targets {
if v.Replicate {
return true
}
}
return false
}
func (r *ResyncDecision) mustResyncTarget(tgtArn string) bool {
if r.targets == nil {
return false
}
v, ok := r.targets[tgtArn]
if ok && v.Replicate {
return true
}
return false
}
// ResyncTargetDecision is struct that represents resync decision for this target
type ResyncTargetDecision struct {
Replicate bool
ResetID string
ResetBeforeDate time.Time
}
var errInvalidReplicateDecisionFormat = fmt.Errorf("ReplicateDecision has invalid format")
// parse k-v pairs of target ARN to stringified ReplicateTargetDecision delimited by ',' into a
// ReplicateDecision struct
func parseReplicateDecision(s string) (r ReplicateDecision, err error) {
r = ReplicateDecision{
targetsMap: make(map[string]replicateTargetDecision),
}
if len(s) == 0 {
return
}
pairs := strings.Split(s, ",")
for _, p := range pairs {
slc := strings.Split(p, "=")
if len(slc) != 2 {
return r, errInvalidReplicateDecisionFormat
}
tgtStr := strings.TrimPrefix(slc[1], "\"")
tgtStr = strings.TrimSuffix(tgtStr, "\"")
tgt := strings.Split(tgtStr, ";")
if len(tgt) != 4 {
return r, errInvalidReplicateDecisionFormat
}
var replicate, sync bool
var err error
replicate, err = strconv.ParseBool(tgt[0])
if err != nil {
return r, err
}
sync, err = strconv.ParseBool(tgt[1])
if err != nil {
return r, err
}
r.targetsMap[slc[0]] = replicateTargetDecision{Replicate: replicate, Synchronous: sync, Arn: tgt[2], ID: tgt[3]}
}
return
}
// ReplicationState represents internal replication state
type ReplicationState struct {
ReplicaTimeStamp time.Time // timestamp when last replica update was received
ReplicaStatus replication.StatusType // replica statusstringis
DeleteMarker bool // represents DeleteMarker replication state
ReplicationTimeStamp time.Time // timestamp when last replication activity happened
ReplicationStatusInternal string // stringified representation of all replication activity
// VersionPurgeStatusInternal is internally in the format "arn1=PENDING;arn2=COMMPLETED;"
VersionPurgeStatusInternal string // stringified representation of all version purge statuses
ReplicateDecisionStr string // stringified representation of replication decision for each target
Targets map[string]replication.StatusType // map of ARN->replication status for ongoing replication activity
PurgeTargets map[string]VersionPurgeStatusType // map of ARN->VersionPurgeStatus for all the targets
ResetStatusesMap map[string]string // map of ARN-> stringified reset id and timestamp for all the targets
}
// Equal returns true if replication state is identical for version purge statuses and (replica)tion statuses.
func (rs *ReplicationState) Equal(o ReplicationState) bool {
return rs.ReplicaStatus == o.ReplicaStatus &&
rs.ReplicaTimeStamp.Equal(o.ReplicaTimeStamp) &&
rs.ReplicationTimeStamp.Equal(o.ReplicationTimeStamp) &&
rs.ReplicationStatusInternal == o.ReplicationStatusInternal &&
rs.VersionPurgeStatusInternal == o.VersionPurgeStatusInternal
}
// CompositeReplicationStatus returns overall replication status for the object version being replicated.
func (rs *ReplicationState) CompositeReplicationStatus() (st replication.StatusType) {
switch {
case rs.ReplicationStatusInternal != "":
switch replication.StatusType(rs.ReplicationStatusInternal) {
case replication.Pending, replication.Completed, replication.Failed, replication.Replica: // for backward compatibility
return replication.StatusType(rs.ReplicationStatusInternal)
default:
replStatus := getCompositeReplicationStatus(rs.Targets)
// return REPLICA status if replica received timestamp is later than replication timestamp
// provided object replication completed for all targets.
if !rs.ReplicaTimeStamp.Equal(timeSentinel) && replStatus == replication.Completed && rs.ReplicaTimeStamp.After(rs.ReplicationTimeStamp) {
return rs.ReplicaStatus
}
return replStatus
}
case !rs.ReplicaStatus.Empty():
return rs.ReplicaStatus
default:
return
}
}
// CompositeVersionPurgeStatus returns overall replication purge status for the permanent delete being replicated.
func (rs *ReplicationState) CompositeVersionPurgeStatus() VersionPurgeStatusType {
switch VersionPurgeStatusType(rs.VersionPurgeStatusInternal) {
case Pending, Complete, Failed: // for backward compatibility
return VersionPurgeStatusType(rs.VersionPurgeStatusInternal)
default:
return getCompositeVersionPurgeStatus(rs.PurgeTargets)
}
}
// TargetState returns replicatedInfos struct initialized with the previous state of replication
func (rs *ReplicationState) targetState(arn string) (r replicatedTargetInfo) {
return replicatedTargetInfo{
Arn: arn,
PrevReplicationStatus: rs.Targets[arn],
VersionPurgeStatus: rs.PurgeTargets[arn],
ResyncTimestamp: rs.ResetStatusesMap[arn],
}
}
// getReplicationState returns replication state using target replicated info for the targets
func getReplicationState(rinfos replicatedInfos, prevState ReplicationState, vID string) ReplicationState {
rs := ReplicationState{
ReplicateDecisionStr: prevState.ReplicateDecisionStr,
ResetStatusesMap: prevState.ResetStatusesMap,
ReplicaTimeStamp: prevState.ReplicaTimeStamp,
ReplicaStatus: prevState.ReplicaStatus,
}
var replStatuses, vpurgeStatuses string
replStatuses = rinfos.ReplicationStatusInternal()
rs.Targets = replicationStatusesMap(replStatuses)
rs.ReplicationStatusInternal = replStatuses
rs.ReplicationTimeStamp = rinfos.ReplicationTimeStamp
vpurgeStatuses = rinfos.VersionPurgeStatusInternal()
rs.VersionPurgeStatusInternal = vpurgeStatuses
rs.PurgeTargets = versionPurgeStatusesMap(vpurgeStatuses)
for _, rinfo := range rinfos.Targets {
if rinfo.ResyncTimestamp != "" {
rs.ResetStatusesMap[targetResetHeader(rinfo.Arn)] = rinfo.ResyncTimestamp
}
}
return rs
}
// constructs a replication status map from string representation
func replicationStatusesMap(s string) map[string]replication.StatusType {
targets := make(map[string]replication.StatusType)
repStatMatches := replStatusRegex.FindAllStringSubmatch(s, -1)
for _, repStatMatch := range repStatMatches {
if len(repStatMatch) != 3 {
continue
}
status := replication.StatusType(repStatMatch[2])
targets[repStatMatch[1]] = status
}
return targets
}
// constructs a version purge status map from string representation
func versionPurgeStatusesMap(s string) map[string]VersionPurgeStatusType {
targets := make(map[string]VersionPurgeStatusType)
purgeStatusMatches := replStatusRegex.FindAllStringSubmatch(s, -1)
for _, purgeStatusMatch := range purgeStatusMatches {
if len(purgeStatusMatch) != 3 {
continue
}
targets[purgeStatusMatch[1]] = VersionPurgeStatusType(purgeStatusMatch[2])
}
return targets
}
// return the overall replication status for all the targets
func getCompositeReplicationStatus(m map[string]replication.StatusType) replication.StatusType {
if len(m) == 0 {
return replication.StatusType("")
}
completed := 0
for _, v := range m {
switch v {
case replication.Failed:
return replication.Failed
case replication.Completed:
completed++
}
}
if completed == len(m) {
return replication.Completed
}
return replication.Pending
}
// return the overall version purge status for all the targets
func getCompositeVersionPurgeStatus(m map[string]VersionPurgeStatusType) VersionPurgeStatusType {
if len(m) == 0 {
return VersionPurgeStatusType("")
}
completed := 0
for _, v := range m {
switch v {
case Failed:
return Failed
case Complete:
completed++
}
}
if completed == len(m) {
return Complete
}
return Pending
}
// getHealReplicateObjectInfo returns info needed by heal replication in ReplicateObjectInfo
func getHealReplicateObjectInfo(objInfo ObjectInfo, rcfg replicationConfig) ReplicateObjectInfo {
oi := objInfo.Clone()
if rcfg.Config != nil && rcfg.Config.RoleArn != "" {
// For backward compatibility of objects pending/failed replication.
// Save replication related statuses in the new internal representation for
// compatible behavior.
if !oi.ReplicationStatus.Empty() {
oi.ReplicationStatusInternal = fmt.Sprintf("%s=%s;", rcfg.Config.RoleArn, oi.ReplicationStatus)
}
if !oi.VersionPurgeStatus.Empty() {
oi.VersionPurgeStatusInternal = fmt.Sprintf("%s=%s;", rcfg.Config.RoleArn, oi.VersionPurgeStatus)
}
for k, v := range oi.UserDefined {
switch {
case strings.EqualFold(k, ReservedMetadataPrefixLower+ReplicationReset):
delete(oi.UserDefined, k)
oi.UserDefined[targetResetHeader(rcfg.Config.RoleArn)] = v
}
}
}
var dsc ReplicateDecision
var tgtStatuses map[string]replication.StatusType
if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() {
dsc = checkReplicateDelete(GlobalContext, oi.Bucket, ObjectToDelete{
ObjectV: ObjectV{
ObjectName: oi.Name,
VersionID: oi.VersionID,
},
}, oi, ObjectOptions{VersionSuspended: globalBucketVersioningSys.PrefixSuspended(oi.Bucket, oi.Name)}, nil)
} else {
dsc = mustReplicate(GlobalContext, oi.Bucket, oi.Name, getMustReplicateOptions(ObjectInfo{
UserDefined: oi.UserDefined,
}, replication.HealReplicationType, ObjectOptions{}))
}
tgtStatuses = replicationStatusesMap(oi.ReplicationStatusInternal)
existingObjResync := rcfg.Resync(GlobalContext, oi, &dsc, tgtStatuses)
return ReplicateObjectInfo{
ObjectInfo: oi,
OpType: replication.HealReplicationType,
Dsc: dsc,
ExistingObjResync: existingObjResync,
TargetStatuses: tgtStatuses,
}
}
// vID here represents the versionID client specified in request - need to distinguish between delete marker and delete marker deletion
func (o *ObjectInfo) getReplicationState(dsc string, vID string, heal bool) ReplicationState {
rs := ReplicationState{
ReplicationStatusInternal: o.ReplicationStatusInternal,
VersionPurgeStatusInternal: o.VersionPurgeStatusInternal,
ReplicateDecisionStr: dsc,
Targets: make(map[string]replication.StatusType),
PurgeTargets: make(map[string]VersionPurgeStatusType),
ResetStatusesMap: make(map[string]string),
}
rs.Targets = replicationStatusesMap(o.ReplicationStatusInternal)
rs.PurgeTargets = versionPurgeStatusesMap(o.VersionPurgeStatusInternal)
for k, v := range o.UserDefined {
if strings.HasPrefix(k, ReservedMetadataPrefixLower+ReplicationReset) {
arn := strings.TrimPrefix(k, fmt.Sprintf("%s-", ReservedMetadataPrefixLower+ReplicationReset))
rs.ResetStatusesMap[arn] = v
}
}
return rs
}
// ReplicationState returns replication state using other internal replication metadata in ObjectToDelete
func (o *ObjectToDelete) ReplicationState() ReplicationState {
r := ReplicationState{
ReplicationStatusInternal: o.DeleteMarkerReplicationStatus,
VersionPurgeStatusInternal: o.VersionPurgeStatuses,
ReplicateDecisionStr: o.ReplicateDecisionStr,
}
r.Targets = replicationStatusesMap(o.DeleteMarkerReplicationStatus)
r.PurgeTargets = versionPurgeStatusesMap(o.VersionPurgeStatuses)
return r
}
// VersionPurgeStatus returns a composite version purge status across targets
func (d *DeletedObject) VersionPurgeStatus() VersionPurgeStatusType {
return d.ReplicationState.CompositeVersionPurgeStatus()
}
// DeleteMarkerReplicationStatus return composite replication status of delete marker across targets
func (d *DeletedObject) DeleteMarkerReplicationStatus() replication.StatusType {
return d.ReplicationState.CompositeReplicationStatus()
}
// ResyncTargetsInfo holds a slice of targets with resync info per target
type ResyncTargetsInfo struct {
Targets []ResyncTarget `json:"target,omitempty"`
}
// ResyncTarget is a struct representing the Target reset ID where target is identified by its Arn
type ResyncTarget struct {
Arn string `json:"arn"`
ResetID string `json:"resetid"`
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
// Status of resync operation
ResyncStatus string `json:"resyncStatus,omitempty"`
// Completed size in bytes
ReplicatedSize int64 `json:"completedReplicationSize"`
// Failed size in bytes
FailedSize int64 `json:"failedReplicationSize"`
// Total number of failed operations
FailedCount int64 `json:"failedReplicationCount"`
// Total number of failed operations
ReplicatedCount int64 `json:"replicationCount"`
// Last bucket/object replicated.
Bucket string `json:"bucket,omitempty"`
Object string `json:"object,omitempty"`
}
// VersionPurgeStatusType represents status of a versioned delete or permanent delete w.r.t bucket replication
type VersionPurgeStatusType string
const (
// Pending - versioned delete replication is pending.
Pending VersionPurgeStatusType = "PENDING"
// Complete - versioned delete replication is now complete, erase version on disk.
Complete VersionPurgeStatusType = "COMPLETE"
// Failed - versioned delete replication failed.
Failed VersionPurgeStatusType = "FAILED"
)
// Empty returns true if purge status was not set.
func (v VersionPurgeStatusType) Empty() bool {
return string(v) == ""
}
// Pending returns true if the version is pending purge.
func (v VersionPurgeStatusType) Pending() bool {
return v == Pending || v == Failed
}
type replicationResyncState struct {
// map of bucket to their resync status
statusMap map[string]BucketReplicationResyncStatus
sync.RWMutex
}
const (
replicationDir = "replication"
resyncFileName = "resync.bin"
resyncMetaFormat = 1
resyncMetaVersionV1 = 1
resyncMetaVersion = resyncMetaVersionV1
)
// ResyncStatusType status of resync operation
type ResyncStatusType int
const (
// NoResync - no resync in progress
NoResync ResyncStatusType = iota
// ResyncStarted - resync in progress
ResyncStarted
// ResyncCompleted - resync finished
ResyncCompleted
// ResyncFailed - resync failed
ResyncFailed
)
func (rt ResyncStatusType) String() string {
switch rt {
case ResyncStarted:
return "Ongoing"
case ResyncCompleted:
return "Completed"
case ResyncFailed:
return "Failed"
default:
return ""
}
}
// TargetReplicationResyncStatus status of resync of bucket for a specific target
type TargetReplicationResyncStatus struct {
StartTime time.Time `json:"startTime" msg:"st"`
EndTime time.Time `json:"endTime" msg:"et"`
// Resync ID assigned to this reset
ResyncID string `json:"resyncID" msg:"id"`
// ResyncBeforeDate - resync all objects created prior to this date
ResyncBeforeDate time.Time `json:"resyncBeforeDate" msg:"rdt"`
// Status of resync operation
ResyncStatus ResyncStatusType `json:"resyncStatus" msg:"rst"`
// Failed size in bytes
FailedSize int64 `json:"failedReplicationSize" msg:"fs"`
// Total number of failed operations
FailedCount int64 `json:"failedReplicationCount" msg:"frc"`
// Completed size in bytes
ReplicatedSize int64 `json:"completedReplicationSize" msg:"rs"`
// Total number of failed operations
ReplicatedCount int64 `json:"replicationCount" msg:"rrc"`
// Last bucket/object replicated.
Bucket string `json:"-" msg:"bkt"`
Object string `json:"-" msg:"obj"`
}
// BucketReplicationResyncStatus captures current replication resync status
type BucketReplicationResyncStatus struct {
Version int `json:"version" msg:"v"`
// map of remote arn to their resync status for a bucket
TargetsMap map[string]TargetReplicationResyncStatus `json:"resyncMap,omitempty" msg:"brs"`
ID int `json:"id" msg:"id"`
LastUpdate time.Time `json:"lastUpdate" msg:"lu"`
}
func newBucketResyncStatus(bucket string) BucketReplicationResyncStatus {
return BucketReplicationResyncStatus{
TargetsMap: make(map[string]TargetReplicationResyncStatus),
Version: resyncMetaVersion,
}
}
var contentRangeRegexp = regexp.MustCompile(`bytes ([0-9]+)-([0-9]+)/([0-9]+|\\*)`)
// parse size from content-range header
func parseSizeFromContentRange(h http.Header) (sz int64, err error) {
cr := h.Get(xhttp.ContentRange)
if cr == "" {
return sz, fmt.Errorf("Content-Range not set")
}
parts := contentRangeRegexp.FindStringSubmatch(cr)
if len(parts) != 4 {
return sz, fmt.Errorf("invalid Content-Range header %s", cr)
}
if parts[3] == "*" {
return -1, nil
}
var usz uint64
usz, err = strconv.ParseUint(parts[3], 10, 64)
if err != nil {
return sz, err
}
return int64(usz), nil
}