allow specification of path/virtual style bucket lookup in batch replication (#17201)

This commit is contained in:
Poorna 2023-05-21 15:16:31 -07:00 committed by GitHub
parent a30a55f3b1
commit 2920b0fc6d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 118 additions and 19 deletions

View file

@ -86,6 +86,7 @@ import (
// type: "minio"
// bucket: "testbucket1"
// endpoint: "https://play.min.io"
// path: "on"
// credentials:
// accessKey: "minioadmin"
// secretKey: "minioadmin"
@ -181,6 +182,10 @@ func (t BatchJobReplicateResourceType) Validate() error {
return nil
}
func (t BatchJobReplicateResourceType) isMinio() bool {
return t == BatchJobReplicateResourceMinIO
}
// Different types of batch jobs..
const (
BatchJobReplicateResourceMinIO BatchJobReplicateResourceType = "minio"
@ -217,9 +222,15 @@ type BatchJobReplicateTarget struct {
Bucket string `yaml:"bucket" json:"bucket"`
Prefix string `yaml:"prefix" json:"prefix"`
Endpoint string `yaml:"endpoint" json:"endpoint"`
Path string `yaml:"path" json:"path"`
Creds BatchJobReplicateCredentials `yaml:"credentials" json:"credentials"`
}
// ValidPath returns true if path is valid
func (t BatchJobReplicateTarget) ValidPath() bool {
return t.Path == "on" || t.Path == "off" || t.Path == "auto" || t.Path == ""
}
// BatchJobReplicateSource describes source element of the replication job that is
// the source of the data for the target
type BatchJobReplicateSource struct {
@ -227,9 +238,20 @@ type BatchJobReplicateSource struct {
Bucket string `yaml:"bucket" json:"bucket"`
Prefix string `yaml:"prefix" json:"prefix"`
Endpoint string `yaml:"endpoint" json:"endpoint"`
Path string `yaml:"path" json:"path"`
Creds BatchJobReplicateCredentials `yaml:"credentials" json:"credentials"`
}
// ValidPath returns true if path is valid
func (s BatchJobReplicateSource) ValidPath() bool {
switch s.Path {
case "on", "off", "auto", "":
return true
default:
return false
}
}
// BatchJobReplicateV1 v1 of batch job replication
type BatchJobReplicateV1 struct {
APIVersion string `yaml:"apiVersion" json:"apiVersion"`
@ -524,9 +546,10 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
cred := r.Source.Creds
c, err := miniogo.New(u.Host, &miniogo.Options{
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken),
Secure: u.Scheme == "https",
Transport: getRemoteInstanceTransport,
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken),
Secure: u.Scheme == "https",
Transport: getRemoteInstanceTransport,
BucketLookup: lookupStyle(r.Source.Path),
})
if err != nil {
return err
@ -1082,9 +1105,10 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
cred := r.Target.Creds
c, err := miniogo.NewCore(u.Host, &miniogo.Options{
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken),
Secure: u.Scheme == "https",
Transport: getRemoteInstanceTransport,
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken),
Secure: u.Scheme == "https",
Transport: getRemoteInstanceTransport,
BucketLookup: lookupStyle(r.Target.Path),
})
if err != nil {
return err
@ -1256,6 +1280,13 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest,
return errInvalidArgument
}
if r.Source.Endpoint != "" && !r.Source.Type.isMinio() && !r.Source.ValidPath() {
return errInvalidArgument
}
if r.Target.Endpoint != "" && !r.Target.Type.isMinio() && !r.Target.ValidPath() {
return errInvalidArgument
}
if r.Target.Bucket == "" {
return errInvalidArgument
}
@ -1293,11 +1324,14 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest,
remoteEp := r.Target.Endpoint
remoteBkt := r.Target.Bucket
cred := r.Target.Creds
pathStyle := r.Target.Path
if r.Source.Endpoint != "" {
remoteEp = r.Source.Endpoint
cred = r.Source.Creds
remoteBkt = r.Source.Bucket
pathStyle = r.Source.Path
}
u, err := url.Parse(remoteEp)
@ -1306,9 +1340,10 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest,
}
c, err := miniogo.NewCore(u.Host, &miniogo.Options{
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken),
Secure: u.Scheme == "https",
Transport: getRemoteInstanceTransport,
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken),
Secure: u.Scheme == "https",
Transport: getRemoteInstanceTransport,
BucketLookup: lookupStyle(pathStyle),
})
if err != nil {
return err
@ -1874,3 +1909,17 @@ func (m *batchJobMetrics) trace(d batchJobMetric, job string, attempts int, info
}
}
}
func lookupStyle(s string) miniogo.BucketLookupType {
var lookup miniogo.BucketLookupType
switch s {
case "on":
lookup = miniogo.BucketLookupPath
case "off":
lookup = miniogo.BucketLookupDNS
default:
lookup = miniogo.BucketLookupAuto
}
return lookup
}

View file

@ -702,6 +702,12 @@ func (z *BatchJobReplicateSource) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "Endpoint")
return
}
case "Path":
z.Path, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Path")
return
}
case "Creds":
var zb0003 uint32
zb0003, err = dc.ReadMapHeader()
@ -756,9 +762,9 @@ func (z *BatchJobReplicateSource) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *BatchJobReplicateSource) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 5
// map header, size 6
// write "Type"
err = en.Append(0x85, 0xa4, 0x54, 0x79, 0x70, 0x65)
err = en.Append(0x86, 0xa4, 0x54, 0x79, 0x70, 0x65)
if err != nil {
return
}
@ -797,6 +803,16 @@ func (z *BatchJobReplicateSource) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "Endpoint")
return
}
// write "Path"
err = en.Append(0xa4, 0x50, 0x61, 0x74, 0x68)
if err != nil {
return
}
err = en.WriteString(z.Path)
if err != nil {
err = msgp.WrapError(err, "Path")
return
}
// write "Creds"
err = en.Append(0xa5, 0x43, 0x72, 0x65, 0x64, 0x73)
if err != nil {
@ -839,9 +855,9 @@ func (z *BatchJobReplicateSource) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler
func (z *BatchJobReplicateSource) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 5
// map header, size 6
// string "Type"
o = append(o, 0x85, 0xa4, 0x54, 0x79, 0x70, 0x65)
o = append(o, 0x86, 0xa4, 0x54, 0x79, 0x70, 0x65)
o = msgp.AppendString(o, string(z.Type))
// string "Bucket"
o = append(o, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74)
@ -852,6 +868,9 @@ func (z *BatchJobReplicateSource) MarshalMsg(b []byte) (o []byte, err error) {
// string "Endpoint"
o = append(o, 0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74)
o = msgp.AppendString(o, z.Endpoint)
// string "Path"
o = append(o, 0xa4, 0x50, 0x61, 0x74, 0x68)
o = msgp.AppendString(o, z.Path)
// string "Creds"
o = append(o, 0xa5, 0x43, 0x72, 0x65, 0x64, 0x73)
// map header, size 3
@ -913,6 +932,12 @@ func (z *BatchJobReplicateSource) UnmarshalMsg(bts []byte) (o []byte, err error)
err = msgp.WrapError(err, "Endpoint")
return
}
case "Path":
z.Path, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Path")
return
}
case "Creds":
var zb0003 uint32
zb0003, bts, err = msgp.ReadMapHeaderBytes(bts)
@ -968,7 +993,7 @@ func (z *BatchJobReplicateSource) UnmarshalMsg(bts []byte) (o []byte, err error)
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *BatchJobReplicateSource) Msgsize() (s int) {
s = 1 + 5 + msgp.StringPrefixSize + len(string(z.Type)) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 6 + 1 + 10 + msgp.StringPrefixSize + len(z.Creds.AccessKey) + 10 + msgp.StringPrefixSize + len(z.Creds.SecretKey) + 13 + msgp.StringPrefixSize + len(z.Creds.SessionToken)
s = 1 + 5 + msgp.StringPrefixSize + len(string(z.Type)) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 5 + msgp.StringPrefixSize + len(z.Path) + 6 + 1 + 10 + msgp.StringPrefixSize + len(z.Creds.AccessKey) + 10 + msgp.StringPrefixSize + len(z.Creds.SecretKey) + 13 + msgp.StringPrefixSize + len(z.Creds.SessionToken)
return
}
@ -1018,6 +1043,12 @@ func (z *BatchJobReplicateTarget) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "Endpoint")
return
}
case "Path":
z.Path, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Path")
return
}
case "Creds":
var zb0003 uint32
zb0003, err = dc.ReadMapHeader()
@ -1072,9 +1103,9 @@ func (z *BatchJobReplicateTarget) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *BatchJobReplicateTarget) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 5
// map header, size 6
// write "Type"
err = en.Append(0x85, 0xa4, 0x54, 0x79, 0x70, 0x65)
err = en.Append(0x86, 0xa4, 0x54, 0x79, 0x70, 0x65)
if err != nil {
return
}
@ -1113,6 +1144,16 @@ func (z *BatchJobReplicateTarget) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "Endpoint")
return
}
// write "Path"
err = en.Append(0xa4, 0x50, 0x61, 0x74, 0x68)
if err != nil {
return
}
err = en.WriteString(z.Path)
if err != nil {
err = msgp.WrapError(err, "Path")
return
}
// write "Creds"
err = en.Append(0xa5, 0x43, 0x72, 0x65, 0x64, 0x73)
if err != nil {
@ -1155,9 +1196,9 @@ func (z *BatchJobReplicateTarget) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler
func (z *BatchJobReplicateTarget) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 5
// map header, size 6
// string "Type"
o = append(o, 0x85, 0xa4, 0x54, 0x79, 0x70, 0x65)
o = append(o, 0x86, 0xa4, 0x54, 0x79, 0x70, 0x65)
o = msgp.AppendString(o, string(z.Type))
// string "Bucket"
o = append(o, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74)
@ -1168,6 +1209,9 @@ func (z *BatchJobReplicateTarget) MarshalMsg(b []byte) (o []byte, err error) {
// string "Endpoint"
o = append(o, 0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74)
o = msgp.AppendString(o, z.Endpoint)
// string "Path"
o = append(o, 0xa4, 0x50, 0x61, 0x74, 0x68)
o = msgp.AppendString(o, z.Path)
// string "Creds"
o = append(o, 0xa5, 0x43, 0x72, 0x65, 0x64, 0x73)
// map header, size 3
@ -1229,6 +1273,12 @@ func (z *BatchJobReplicateTarget) UnmarshalMsg(bts []byte) (o []byte, err error)
err = msgp.WrapError(err, "Endpoint")
return
}
case "Path":
z.Path, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Path")
return
}
case "Creds":
var zb0003 uint32
zb0003, bts, err = msgp.ReadMapHeaderBytes(bts)
@ -1284,7 +1334,7 @@ func (z *BatchJobReplicateTarget) UnmarshalMsg(bts []byte) (o []byte, err error)
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *BatchJobReplicateTarget) Msgsize() (s int) {
s = 1 + 5 + msgp.StringPrefixSize + len(string(z.Type)) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 6 + 1 + 10 + msgp.StringPrefixSize + len(z.Creds.AccessKey) + 10 + msgp.StringPrefixSize + len(z.Creds.SecretKey) + 13 + msgp.StringPrefixSize + len(z.Creds.SessionToken)
s = 1 + 5 + msgp.StringPrefixSize + len(string(z.Type)) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 5 + msgp.StringPrefixSize + len(z.Path) + 6 + 1 + 10 + msgp.StringPrefixSize + len(z.Creds.AccessKey) + 10 + msgp.StringPrefixSize + len(z.Creds.SecretKey) + 13 + msgp.StringPrefixSize + len(z.Creds.SessionToken)
return
}