diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 93f5d6a65..fe396ab2e 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -86,6 +86,7 @@ import ( // type: "minio" // bucket: "testbucket1" // endpoint: "https://play.min.io" +// path: "on" // credentials: // accessKey: "minioadmin" // secretKey: "minioadmin" @@ -181,6 +182,10 @@ func (t BatchJobReplicateResourceType) Validate() error { return nil } +func (t BatchJobReplicateResourceType) isMinio() bool { + return t == BatchJobReplicateResourceMinIO +} + // Different types of batch jobs.. const ( BatchJobReplicateResourceMinIO BatchJobReplicateResourceType = "minio" @@ -217,9 +222,15 @@ type BatchJobReplicateTarget struct { Bucket string `yaml:"bucket" json:"bucket"` Prefix string `yaml:"prefix" json:"prefix"` Endpoint string `yaml:"endpoint" json:"endpoint"` + Path string `yaml:"path" json:"path"` Creds BatchJobReplicateCredentials `yaml:"credentials" json:"credentials"` } +// ValidPath returns true if path is valid +func (t BatchJobReplicateTarget) ValidPath() bool { + return t.Path == "on" || t.Path == "off" || t.Path == "auto" || t.Path == "" +} + // BatchJobReplicateSource describes source element of the replication job that is // the source of the data for the target type BatchJobReplicateSource struct { @@ -227,9 +238,20 @@ type BatchJobReplicateSource struct { Bucket string `yaml:"bucket" json:"bucket"` Prefix string `yaml:"prefix" json:"prefix"` Endpoint string `yaml:"endpoint" json:"endpoint"` + Path string `yaml:"path" json:"path"` Creds BatchJobReplicateCredentials `yaml:"credentials" json:"credentials"` } +// ValidPath returns true if path is valid +func (s BatchJobReplicateSource) ValidPath() bool { + switch s.Path { + case "on", "off", "auto", "": + return true + default: + return false + } +} + // BatchJobReplicateV1 v1 of batch job replication type BatchJobReplicateV1 struct { APIVersion string `yaml:"apiVersion" json:"apiVersion"` @@ -524,9 +546,10 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay cred := r.Source.Creds c, err := miniogo.New(u.Host, &miniogo.Options{ - Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), - Secure: u.Scheme == "https", - Transport: getRemoteInstanceTransport, + Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), + Secure: u.Scheme == "https", + Transport: getRemoteInstanceTransport, + BucketLookup: lookupStyle(r.Source.Path), }) if err != nil { return err @@ -1082,9 +1105,10 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba cred := r.Target.Creds c, err := miniogo.NewCore(u.Host, &miniogo.Options{ - Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), - Secure: u.Scheme == "https", - Transport: getRemoteInstanceTransport, + Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), + Secure: u.Scheme == "https", + Transport: getRemoteInstanceTransport, + BucketLookup: lookupStyle(r.Target.Path), }) if err != nil { return err @@ -1256,6 +1280,13 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest, return errInvalidArgument } + if r.Source.Endpoint != "" && !r.Source.Type.isMinio() && !r.Source.ValidPath() { + return errInvalidArgument + } + + if r.Target.Endpoint != "" && !r.Target.Type.isMinio() && !r.Target.ValidPath() { + return errInvalidArgument + } if r.Target.Bucket == "" { return errInvalidArgument } @@ -1293,11 +1324,14 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest, remoteEp := r.Target.Endpoint remoteBkt := r.Target.Bucket cred := r.Target.Creds + pathStyle := r.Target.Path if r.Source.Endpoint != "" { remoteEp = r.Source.Endpoint cred = r.Source.Creds remoteBkt = r.Source.Bucket + pathStyle = r.Source.Path + } u, err := url.Parse(remoteEp) @@ -1306,9 +1340,10 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest, } c, err := miniogo.NewCore(u.Host, &miniogo.Options{ - Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), - Secure: u.Scheme == "https", - Transport: getRemoteInstanceTransport, + Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), + Secure: u.Scheme == "https", + Transport: getRemoteInstanceTransport, + BucketLookup: lookupStyle(pathStyle), }) if err != nil { return err @@ -1874,3 +1909,17 @@ func (m *batchJobMetrics) trace(d batchJobMetric, job string, attempts int, info } } } + +func lookupStyle(s string) miniogo.BucketLookupType { + var lookup miniogo.BucketLookupType + switch s { + case "on": + lookup = miniogo.BucketLookupPath + case "off": + lookup = miniogo.BucketLookupDNS + default: + lookup = miniogo.BucketLookupAuto + + } + return lookup +} diff --git a/cmd/batch-handlers_gen.go b/cmd/batch-handlers_gen.go index 0f0342b59..7d9327598 100644 --- a/cmd/batch-handlers_gen.go +++ b/cmd/batch-handlers_gen.go @@ -702,6 +702,12 @@ func (z *BatchJobReplicateSource) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Endpoint") return } + case "Path": + z.Path, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Path") + return + } case "Creds": var zb0003 uint32 zb0003, err = dc.ReadMapHeader() @@ -756,9 +762,9 @@ func (z *BatchJobReplicateSource) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *BatchJobReplicateSource) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 5 + // map header, size 6 // write "Type" - err = en.Append(0x85, 0xa4, 0x54, 0x79, 0x70, 0x65) + err = en.Append(0x86, 0xa4, 0x54, 0x79, 0x70, 0x65) if err != nil { return } @@ -797,6 +803,16 @@ func (z *BatchJobReplicateSource) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Endpoint") return } + // write "Path" + err = en.Append(0xa4, 0x50, 0x61, 0x74, 0x68) + if err != nil { + return + } + err = en.WriteString(z.Path) + if err != nil { + err = msgp.WrapError(err, "Path") + return + } // write "Creds" err = en.Append(0xa5, 0x43, 0x72, 0x65, 0x64, 0x73) if err != nil { @@ -839,9 +855,9 @@ func (z *BatchJobReplicateSource) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *BatchJobReplicateSource) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 5 + // map header, size 6 // string "Type" - o = append(o, 0x85, 0xa4, 0x54, 0x79, 0x70, 0x65) + o = append(o, 0x86, 0xa4, 0x54, 0x79, 0x70, 0x65) o = msgp.AppendString(o, string(z.Type)) // string "Bucket" o = append(o, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) @@ -852,6 +868,9 @@ func (z *BatchJobReplicateSource) MarshalMsg(b []byte) (o []byte, err error) { // string "Endpoint" o = append(o, 0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74) o = msgp.AppendString(o, z.Endpoint) + // string "Path" + o = append(o, 0xa4, 0x50, 0x61, 0x74, 0x68) + o = msgp.AppendString(o, z.Path) // string "Creds" o = append(o, 0xa5, 0x43, 0x72, 0x65, 0x64, 0x73) // map header, size 3 @@ -913,6 +932,12 @@ func (z *BatchJobReplicateSource) UnmarshalMsg(bts []byte) (o []byte, err error) err = msgp.WrapError(err, "Endpoint") return } + case "Path": + z.Path, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Path") + return + } case "Creds": var zb0003 uint32 zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) @@ -968,7 +993,7 @@ func (z *BatchJobReplicateSource) UnmarshalMsg(bts []byte) (o []byte, err error) // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *BatchJobReplicateSource) Msgsize() (s int) { - s = 1 + 5 + msgp.StringPrefixSize + len(string(z.Type)) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 6 + 1 + 10 + msgp.StringPrefixSize + len(z.Creds.AccessKey) + 10 + msgp.StringPrefixSize + len(z.Creds.SecretKey) + 13 + msgp.StringPrefixSize + len(z.Creds.SessionToken) + s = 1 + 5 + msgp.StringPrefixSize + len(string(z.Type)) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 5 + msgp.StringPrefixSize + len(z.Path) + 6 + 1 + 10 + msgp.StringPrefixSize + len(z.Creds.AccessKey) + 10 + msgp.StringPrefixSize + len(z.Creds.SecretKey) + 13 + msgp.StringPrefixSize + len(z.Creds.SessionToken) return } @@ -1018,6 +1043,12 @@ func (z *BatchJobReplicateTarget) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Endpoint") return } + case "Path": + z.Path, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Path") + return + } case "Creds": var zb0003 uint32 zb0003, err = dc.ReadMapHeader() @@ -1072,9 +1103,9 @@ func (z *BatchJobReplicateTarget) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *BatchJobReplicateTarget) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 5 + // map header, size 6 // write "Type" - err = en.Append(0x85, 0xa4, 0x54, 0x79, 0x70, 0x65) + err = en.Append(0x86, 0xa4, 0x54, 0x79, 0x70, 0x65) if err != nil { return } @@ -1113,6 +1144,16 @@ func (z *BatchJobReplicateTarget) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Endpoint") return } + // write "Path" + err = en.Append(0xa4, 0x50, 0x61, 0x74, 0x68) + if err != nil { + return + } + err = en.WriteString(z.Path) + if err != nil { + err = msgp.WrapError(err, "Path") + return + } // write "Creds" err = en.Append(0xa5, 0x43, 0x72, 0x65, 0x64, 0x73) if err != nil { @@ -1155,9 +1196,9 @@ func (z *BatchJobReplicateTarget) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *BatchJobReplicateTarget) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 5 + // map header, size 6 // string "Type" - o = append(o, 0x85, 0xa4, 0x54, 0x79, 0x70, 0x65) + o = append(o, 0x86, 0xa4, 0x54, 0x79, 0x70, 0x65) o = msgp.AppendString(o, string(z.Type)) // string "Bucket" o = append(o, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) @@ -1168,6 +1209,9 @@ func (z *BatchJobReplicateTarget) MarshalMsg(b []byte) (o []byte, err error) { // string "Endpoint" o = append(o, 0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74) o = msgp.AppendString(o, z.Endpoint) + // string "Path" + o = append(o, 0xa4, 0x50, 0x61, 0x74, 0x68) + o = msgp.AppendString(o, z.Path) // string "Creds" o = append(o, 0xa5, 0x43, 0x72, 0x65, 0x64, 0x73) // map header, size 3 @@ -1229,6 +1273,12 @@ func (z *BatchJobReplicateTarget) UnmarshalMsg(bts []byte) (o []byte, err error) err = msgp.WrapError(err, "Endpoint") return } + case "Path": + z.Path, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Path") + return + } case "Creds": var zb0003 uint32 zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) @@ -1284,7 +1334,7 @@ func (z *BatchJobReplicateTarget) UnmarshalMsg(bts []byte) (o []byte, err error) // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *BatchJobReplicateTarget) Msgsize() (s int) { - s = 1 + 5 + msgp.StringPrefixSize + len(string(z.Type)) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 6 + 1 + 10 + msgp.StringPrefixSize + len(z.Creds.AccessKey) + 10 + msgp.StringPrefixSize + len(z.Creds.SecretKey) + 13 + msgp.StringPrefixSize + len(z.Creds.SessionToken) + s = 1 + 5 + msgp.StringPrefixSize + len(string(z.Type)) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 5 + msgp.StringPrefixSize + len(z.Path) + 6 + 1 + 10 + msgp.StringPrefixSize + len(z.Creds.AccessKey) + 10 + msgp.StringPrefixSize + len(z.Creds.SecretKey) + 13 + msgp.StringPrefixSize + len(z.Creds.SessionToken) return }