Validate if replication config being added is self referential (#17142)

This commit is contained in:
Poorna 2023-05-06 13:35:43 -07:00 committed by GitHub
parent be18d435a2
commit c5c1426262
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 63 additions and 2 deletions

View file

@ -26,6 +26,7 @@ import (
"math"
"math/rand"
"net/http"
"net/url"
"path"
"reflect"
"strings"
@ -138,6 +139,13 @@ func validateReplicationDestination(ctx context.Context, bucket string, rCfg *re
// validate replication ARN against target endpoint
c := globalBucketTargetSys.GetRemoteTargetClient(ctx, arnStr)
if c != nil {
if err := checkRemoteEndpoint(ctx, c.EndpointURL()); err != nil {
switch err.(type) {
case BucketRemoteIdenticalToSource:
return true, errorCodes.ToAPIErrWithErr(ErrBucketRemoteIdenticalToSource, fmt.Errorf("remote target endpoint %s is self referential", c.EndpointURL().String()))
default:
}
}
if c.EndpointURL().String() == clnt.EndpointURL().String() {
selfTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort)
if !sameTarget {
@ -154,6 +162,45 @@ func validateReplicationDestination(ctx context.Context, bucket string, rCfg *re
return sameTarget, toAPIError(ctx, nil)
}
// performs a http request to remote endpoint to check if deployment id of remote endpoint is same as
// local cluster deployment id. This is to prevent replication to self, especially in case of a loadbalancer
// in front of MinIO.
func checkRemoteEndpoint(ctx context.Context, epURL *url.URL) error {
reqURL := &url.URL{
Scheme: epURL.Scheme,
Host: epURL.Host,
Path: healthCheckReadinessPath,
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL.String(), nil)
if err != nil {
return err
}
client := &http.Client{
Transport: NewHTTPTransport(),
Timeout: 10 * time.Second,
}
resp, err := client.Do(req)
if err != nil {
return err
}
if err == nil {
// Drain the connection.
xhttp.DrainBody(resp.Body)
}
if resp != nil {
amzid := resp.Header.Get(xhttp.AmzRequestHostID)
if _, ok := globalNodeNamesHex[amzid]; ok {
return BucketRemoteIdenticalToSource{
Endpoint: epURL.String(),
}
}
}
return nil
}
type mustReplicateOptions struct {
meta map[string]string
status replication.StatusType

View file

@ -232,6 +232,7 @@ var (
// The name of this local node, fetched from arguments
globalLocalNodeName string
globalLocalNodeNameHex string
globalNodeNamesHex map[string]struct{}
// The global subnet config
globalSubnetConfig subnet.Config

View file

@ -437,6 +437,16 @@ func (e RemoteTargetConnectionErr) Error() string {
return fmt.Sprintf("Remote service endpoint %s not available\n\t%s", e.Endpoint, e.Err.Error())
}
// BucketRemoteIdenticalToSource remote already exists for this target type.
type BucketRemoteIdenticalToSource struct {
GenericError
Endpoint string
}
func (e BucketRemoteIdenticalToSource) Error() string {
return fmt.Sprintf("Remote service endpoint %s is self referential to current cluster", e.Endpoint)
}
// BucketRemoteAlreadyExists remote already exists for this target type.
type BucketRemoteAlreadyExists GenericError

View file

@ -247,9 +247,9 @@ func serverHandleCmdArgs(ctx *cli.Context) {
logger.FatalIf(err, "Invalid command line arguments")
globalLocalNodeName = GetLocalPeer(globalEndpoints, globalMinioHost, globalMinioPort)
nodeNameSum := sha256.Sum256([]byte(globalLocalNodeNameHex))
nodeNameSum := sha256.Sum256([]byte(globalLocalNodeName))
globalLocalNodeNameHex = hex.EncodeToString(nodeNameSum[:])
globalNodeNamesHex = make(map[string]struct{})
globalRemoteEndpoints = make(map[string]Endpoint)
for _, z := range globalEndpoints {
for _, ep := range z.Endpoints {
@ -258,6 +258,9 @@ func serverHandleCmdArgs(ctx *cli.Context) {
} else {
globalRemoteEndpoints[ep.Host] = ep
}
nodeNameSum := sha256.Sum256([]byte(ep.Host))
globalNodeNamesHex[hex.EncodeToString(nodeNameSum[:])] = struct{}{}
}
}