Elasticsearch audit log support (#16191)

This commit is contained in:
Krzysztof Skrzętnicki 2022-09-23 13:46:15 +02:00 committed by GitHub
parent cbfd90601d
commit 684956a06c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 1982 additions and 583 deletions

View file

@ -2956,6 +2956,7 @@ message OneOf {
events.KubernetesClusterUpdate KubernetesClusterUpdate = 97;
events.KubernetesClusterDelete KubernetesClusterDelete = 98;
events.SSMRun SSMRun = 99;
events.ElasticsearchRequest ElasticsearchRequest = 100;
}
}
@ -3622,6 +3623,71 @@ message DatabaseSessionMalformedPacket {
bytes Payload = 5 [(gogoproto.jsontag) = "payload,omitempty"];
}
// ElasticsearchCategory specifies Elasticsearch request category.
enum ElasticsearchCategory {
// GENERAL is for otherwise uncategorized calls.
GENERAL = 0;
// SECURITY is for _security and _ssl APIs.
SECURITY = 1;
// SEARCH is for search-related APIs.
SEARCH = 2;
// SQL covers _sql API.
SQL = 3;
}
// ElasticsearchRequest is emitted when user executes an Elasticsearch request, which isn't
// covered by API-specific events.
message ElasticsearchRequest {
// Metadata is a common event metadata.
Metadata Metadata = 1 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// User is a common user event metadata.
UserMetadata User = 2 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// SessionMetadata is a common event session metadata.
SessionMetadata Session = 3 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// Database contains database related metadata.
DatabaseMetadata Database = 4 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// Path is relative path in the URL.
string Path = 5 [(gogoproto.jsontag) = "path"];
// RawQuery are the encoded query values.
string RawQuery = 6 [(gogoproto.jsontag) = "raw_query"];
// Method is the request HTTP method, like GET/POST/DELETE/etc.
string Method = 7 [(gogoproto.jsontag) = "method"];
// Body is the request HTTP body.
bytes Body = 8 [(gogoproto.jsontag) = "body"];
// Headers are the HTTP request headers.
wrappers.LabelValues Headers = 9 [
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "headers,omitempty",
(gogoproto.customtype) = "github.com/gravitational/teleport/api/types/wrappers.Traits"
];
// Category represents the category if API being accessed in a given request.
ElasticsearchCategory Category = 10;
// Target is an optional field indicating the target index or set of indices used as a subject of request.
string Target = 11;
// Query is an optional text of query (e.g. an SQL select statement for _sql API), if a request includes it.
string Query = 12;
}
// UpgradeWindowStartMetadata contains common upgrade window information.
message UpgradeWindowStartMetadata {
// UpgradeWindowStart is the upgrade window time.

File diff suppressed because it is too large Load diff

View file

@ -391,6 +391,10 @@ func ToOneOf(in AuditEvent) (*OneOf, error) {
out.Event = &OneOf_SQLServerRPCRequest{
SQLServerRPCRequest: e,
}
case *ElasticsearchRequest:
out.Event = &OneOf_ElasticsearchRequest{
ElasticsearchRequest: e,
}
case *DatabaseSessionMalformedPacket:
out.Event = &OneOf_DatabaseSessionMalformedPacket{
DatabaseSessionMalformedPacket: e,

View file

@ -25,11 +25,12 @@ import (
"strings"
"time"
"github.com/jonboulle/clockwork"
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/lib/limiter"
"github.com/gravitational/teleport/lib/utils"
"github.com/jonboulle/clockwork"
"github.com/gravitational/trace"
"gopkg.in/square/go-jose.v2"
@ -515,6 +516,8 @@ func ReadableDatabaseProtocol(p string) string {
return "Redis"
case ProtocolSnowflake:
return "Snowflake"
case ProtocolElasticsearch:
return "Elasticsearch"
case ProtocolSQLServer:
return "Microsoft SQL Server"
default:

View file

@ -481,6 +481,10 @@ const (
// RPC request command.
DatabaseSessionSQLServerRPCRequestEvent = "db.session.sqlserver.rpc_request"
// DatabaseSessionElasticsearchRequestEvent is emitted when Elasticsearch client sends
// a generic request.
DatabaseSessionElasticsearchRequestEvent = "db.session.elasticsearch.request"
// DatabaseSessionMalformedPacketEvent is emitted when SQL packet is malformed.
DatabaseSessionMalformedPacketEvent = "db.session.malformed_packet"

View file

@ -172,6 +172,9 @@ const (
// SQLServerRPCRequestCode is the db.session.sqlserver.rpc_request event code.
SQLServerRPCRequestCode = "TMS00I"
// ElasticsearchRequestCode is the db.session.elasticsearch.request event code.
ElasticsearchRequestCode = "TES00I"
// DatabaseCreateCode is the db.create event code.
DatabaseCreateCode = "TDB03I"
// DatabaseUpdateCode is the db.update event code.

View file

@ -211,6 +211,8 @@ func FromEventFields(fields EventFields) (events.AuditEvent, error) {
e = &events.MySQLRefresh{}
case DatabaseSessionSQLServerRPCRequestEvent:
e = &events.SQLServerRPCRequest{}
case DatabaseSessionElasticsearchRequestEvent:
e = &events.ElasticsearchRequest{}
case KubeRequestEvent:
e = &events.KubeRequest{}
case MFADeviceAddEvent:

View file

@ -169,7 +169,7 @@ func (l *FileLog) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent
default:
fields := log.Fields{"event_type": event.GetType(), "event_size": len(line)}
l.WithFields(fields).Warnf("Got a event that exeeded max allowed size.")
return trace.BadParameter("event size %q exceeds max entry size %q", len(line), l.MaxScanTokenSize)
return trace.BadParameter("event size %v exceeds max entry size %v", len(line), l.MaxScanTokenSize)
}
}

View file

@ -27,14 +27,21 @@ import (
"net"
"net/http"
"strconv"
"strings"
elastic "github.com/elastic/go-elasticsearch/v8/typedapi/types"
"github.com/gravitational/trace"
"github.com/gravitational/teleport"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/api/types/wrappers"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/srv/db/common"
"github.com/gravitational/teleport/lib/srv/db/common/role"
"github.com/gravitational/teleport/lib/utils"
"github.com/ghodss/yaml"
)
func init() {
@ -79,7 +86,6 @@ func (e *Engine) SendError(err error) {
if trace.IsAccessDenied(err) {
statusCode = http.StatusUnauthorized
cause.Type = "access_denied_exception"
}
jsonBody, err := json.Marshal(cause)
@ -141,8 +147,8 @@ func (e *Engine) HandleConnection(ctx context.Context, sessionCtx *common.Sessio
}
}
func copyRequest(ctx context.Context, req *http.Request) (*http.Request, error) {
reqCopy, err := http.NewRequestWithContext(ctx, req.Method, req.URL.String(), req.Body)
func copyRequest(ctx context.Context, req *http.Request, body io.Reader) (*http.Request, error) {
reqCopy, err := http.NewRequestWithContext(ctx, req.Method, req.URL.String(), body)
if err != nil {
return nil, trace.Wrap(err)
}
@ -155,11 +161,19 @@ func copyRequest(ctx context.Context, req *http.Request) (*http.Request, error)
// process reads request from connected elasticsearch client, processes the requests/responses and send data back
// to the client.
func (e *Engine) process(ctx context.Context, sessionCtx *common.Session, req *http.Request, client *http.Client) error {
reqCopy, err := copyRequest(ctx, req)
body, err := io.ReadAll(io.LimitReader(req.Body, teleport.MaxHTTPRequestSize))
if err != nil {
return trace.Wrap(err)
}
reqCopy, err := copyRequest(ctx, req, bytes.NewReader(body))
if err != nil {
return trace.Wrap(err)
}
defer req.Body.Close()
e.emitAuditEvent(reqCopy, body)
// force HTTPS, set host URL.
reqCopy.URL.Scheme = "https"
reqCopy.URL.Host = sessionCtx.Database.GetURI()
@ -174,6 +188,185 @@ func (e *Engine) process(ctx context.Context, sessionCtx *common.Session, req *h
return trace.Wrap(e.sendResponse(resp))
}
// parsePath returns (optional) target of query as well as the event category.
func parsePath(path string) (string, apievents.ElasticsearchCategory) {
parts := strings.Split(path, "/")
if len(parts) < 2 {
return "", apievents.ElasticsearchCategory_GENERAL
}
// first term starts with _
switch parts[1] {
case "_security", "_ssl":
return "", apievents.ElasticsearchCategory_SECURITY
case
"_search", // https://www.elastic.co/guide/en/elasticsearch/reference/master/search-search.html
"_async_search", // https://www.elastic.co/guide/en/elasticsearch/reference/master/async-search.html
"_pit", // https://www.elastic.co/guide/en/elasticsearch/reference/master/point-in-time-api.html
"_msearch", // https://www.elastic.co/guide/en/elasticsearch/reference/master/multi-search-template.html, https://www.elastic.co/guide/en/elasticsearch/reference/master/search-multi-search.html
"_render", // https://www.elastic.co/guide/en/elasticsearch/reference/master/render-search-template-api.html
"_field_caps": // https://www.elastic.co/guide/en/elasticsearch/reference/master/search-field-caps.html
return "", apievents.ElasticsearchCategory_SEARCH
case "_sql":
return "", apievents.ElasticsearchCategory_SQL
}
// starts with _, but we don't handle it explicitly
if strings.HasPrefix("_", parts[1]) {
return "", apievents.ElasticsearchCategory_GENERAL
}
if len(parts) < 3 {
return "", apievents.ElasticsearchCategory_GENERAL
}
// a number of APIs are invoked by providing a target first, e.g. /<target>/_search, where <target> is an index or expression matching a group of indices.
switch parts[2] {
case
"_search", // https://www.elastic.co/guide/en/elasticsearch/reference/master/search-search.html
"_async_search", // https://www.elastic.co/guide/en/elasticsearch/reference/master/async-search.html
"_pit", // https://www.elastic.co/guide/en/elasticsearch/reference/master/point-in-time-api.html
"_knn_search", // https://www.elastic.co/guide/en/elasticsearch/reference/master/knn-search-api.html
"_msearch", // https://www.elastic.co/guide/en/elasticsearch/reference/master/multi-search-template.html, https://www.elastic.co/guide/en/elasticsearch/reference/master/search-multi-search.html
"_search_shards", // https://www.elastic.co/guide/en/elasticsearch/reference/master/search-shards.html
"_count", // https://www.elastic.co/guide/en/elasticsearch/reference/master/search-count.html
"_validate", // https://www.elastic.co/guide/en/elasticsearch/reference/master/search-validate.html
"_terms_enum", // https://www.elastic.co/guide/en/elasticsearch/reference/master/search-terms-enum.html
"_explain", // https://www.elastic.co/guide/en/elasticsearch/reference/master/search-explain.html
"_field_caps", // https://www.elastic.co/guide/en/elasticsearch/reference/master/search-field-caps.html
"_rank_eval", // https://www.elastic.co/guide/en/elasticsearch/reference/master/search-rank-eval.html
"_mvt": // https://www.elastic.co/guide/en/elasticsearch/reference/master/search-vector-tile-api.html
return parts[1], apievents.ElasticsearchCategory_SEARCH
}
return "", apievents.ElasticsearchCategory_GENERAL
}
// getQueryFromRequestBody attempts to find the actual query from the request body, to be shown to the interested user.
func (e *Engine) getQueryFromRequestBody(contentType string, body []byte) string {
// Elasticsearch APIs have no shared schema, but the ones we support have the query either
// as 'query' or as 'knn'.
// We will attempt to deserialize the query as 'q' to discover these fields.
// The type for those is 'any': both strings and objects can be found.
var q struct {
Query any `json:"query" yaml:"query"`
Knn any `json:"knn" yaml:"knn"`
}
switch contentType {
// CBOR and Smile are officially supported by Elasticsearch:
// https://www.elastic.co/guide/en/elasticsearch/reference/master/api-conventions.html#_content_type_requirements
// We don't support introspection of these content types, at least for now.
case "application/cbor":
e.Log.Warnf("Content type not supported: %q.", contentType)
return ""
case "application/smile":
e.Log.Warnf("Content type not supported: %q.", contentType)
return ""
case "application/yaml":
if len(body) == 0 {
e.Log.WithField("content-type", contentType).Infof("Empty request body.")
return ""
}
err := yaml.Unmarshal(body, &q)
if err != nil {
e.Log.WithError(err).Warnf("Error decoding request body as %q.", contentType)
return ""
}
case "application/json":
if len(body) == 0 {
e.Log.WithField("content-type", contentType).Infof("Empty request body.")
return ""
}
err := json.Unmarshal(body, &q)
if err != nil {
e.Log.WithError(err).Warnf("Error decoding request body as %q.", contentType)
return ""
}
default:
e.Log.Warnf("Unknown or missing 'Content-Type': %q, assuming 'application/json'.", contentType)
if len(body) == 0 {
e.Log.WithField("content-type", contentType).Infof("Empty request body.")
return ""
}
err := json.Unmarshal(body, &q)
if err != nil {
e.Log.WithError(err).Warnf("Error decoding request body as %q.", contentType)
return ""
}
}
result := q.Query
if result == nil {
result = q.Knn
}
if result == nil {
return ""
}
switch qt := result.(type) {
case string:
return qt
default:
marshal, err := json.Marshal(result)
if err != nil {
e.Log.WithError(err).Warnf("Error encoding query to json; body: %x, content type: %v.", body, contentType)
return ""
}
return string(marshal)
}
}
// emitAuditEvent writes the request and response to audit stream.
func (e *Engine) emitAuditEvent(req *http.Request, body []byte) {
contentType := req.Header.Get("Content-Type")
source := req.URL.Query().Get("source")
if len(source) > 0 {
e.Log.Infof("'source' parameter found, overriding request body.")
body = []byte(source)
contentType = req.URL.Query().Get("source_content_type")
}
target, category := parsePath(req.URL.Path)
// Heuristic to calculate the query field.
// The priority is given to 'q' URL param. If not found, we look at the request body.
// This is not guaranteed to give us actual query, for example:
// - we may not support given API
// - we may not support given content encoding
query := req.URL.Query().Get("q")
if query == "" {
query = e.getQueryFromRequestBody(contentType, body)
}
ev := &apievents.ElasticsearchRequest{
Metadata: common.MakeEventMetadata(e.sessionCtx,
events.DatabaseSessionElasticsearchRequestEvent,
events.ElasticsearchRequestCode),
UserMetadata: common.MakeUserMetadata(e.sessionCtx),
SessionMetadata: common.MakeSessionMetadata(e.sessionCtx),
DatabaseMetadata: common.MakeDatabaseMetadata(e.sessionCtx),
Method: req.Method,
Path: req.URL.Path,
RawQuery: req.URL.RawQuery,
Body: body,
Headers: wrappers.Traits(req.Header),
Category: category,
Target: target,
Query: query,
}
e.Audit.EmitEvent(req.Context(), ev)
}
// sendResponse sends the response back to the elasticsearch client.
func (e *Engine) sendResponse(resp *http.Response) error {
if err := resp.Write(e.clientConn); err != nil {

View file

@ -0,0 +1,243 @@
// Copyright 2022 Gravitational, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package elasticsearch
import (
"testing"
"github.com/ghodss/yaml"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/gravitational/teleport/api/types/events"
)
func Test_parsePath(t *testing.T) {
tests := []struct {
// name string
path string
wantTarget string
wantCategory events.ElasticsearchCategory
}{
{
path: "",
wantTarget: "",
wantCategory: events.ElasticsearchCategory_GENERAL,
},
{
path: "/",
wantTarget: "",
wantCategory: events.ElasticsearchCategory_GENERAL,
},
{
path: "/bah",
wantTarget: "",
wantCategory: events.ElasticsearchCategory_GENERAL,
},
{
path: "/foo/bar/baz",
wantTarget: "",
wantCategory: events.ElasticsearchCategory_GENERAL,
},
{
path: "/_security",
wantTarget: "",
wantCategory: events.ElasticsearchCategory_SECURITY,
},
{
path: "/_security/foo",
wantTarget: "",
wantCategory: events.ElasticsearchCategory_SECURITY,
},
{
path: "/_search",
wantTarget: "",
wantCategory: events.ElasticsearchCategory_SEARCH,
},
{
path: "/_search/",
wantTarget: "",
wantCategory: events.ElasticsearchCategory_SEARCH,
},
{
path: "/_search/asd",
wantTarget: "",
wantCategory: events.ElasticsearchCategory_SEARCH,
},
{
path: "/blah/_search/asd",
wantTarget: "blah",
wantCategory: events.ElasticsearchCategory_SEARCH,
},
{
path: "/_async_search/",
wantTarget: "",
wantCategory: events.ElasticsearchCategory_SEARCH,
},
{
path: "/_async_search/asd",
wantTarget: "",
wantCategory: events.ElasticsearchCategory_SEARCH,
},
{
path: "/blah/_async_search/asd",
wantTarget: "blah",
wantCategory: events.ElasticsearchCategory_SEARCH,
},
{
path: "/_sql/",
wantTarget: "",
wantCategory: events.ElasticsearchCategory_SQL,
},
{
path: "/_sql",
wantTarget: "",
wantCategory: events.ElasticsearchCategory_SQL,
},
{
path: "/_sql/asd",
wantTarget: "",
wantCategory: events.ElasticsearchCategory_SQL,
},
}
for _, tt := range tests {
t.Run(tt.path, func(t *testing.T) {
target, category := parsePath(tt.path)
require.Equal(t, tt.wantTarget, target)
require.Equal(t, tt.wantCategory.String(), category.String())
})
}
}
func TestEngine_getQueryFromRequestBody(t *testing.T) {
const jsonSearchAPIQuery = `{
"query": {
"bool" : {
"must" : {
"term" : { "user.id" : "pam" }
},
"filter": {
"term" : { "tags" : "production" }
}
}
}
}`
const jsonSearchAPIJustQuery = `{"bool":{"filter":{"term":{"tags":"production"}},"must":{"term":{"user.id":"pam"}}}}`
const jsonKNNSearchAPIQuery = `{
"knn": {
"field": "image_vector",
"query_vector": [0.3, 0.1, 1.2],
"k": 10,
"num_candidates": 100
},
"_source": ["name", "file_type"]
}`
const jsonKNNSearchAPIJustQuery = `{"field":"image_vector","k":10,"num_candidates":100,"query_vector":[0.3,0.1,1.2]}`
const jsonSQLSearchAPIQuery = `{
"query": "SELECT * FROM library ORDER BY page_count DESC LIMIT 5"
}`
const jsonSQLSearchAPIJustQuery = `SELECT * FROM library ORDER BY page_count DESC LIMIT 5`
toYAML := func(js string) string {
yamlBytes, err := yaml.JSONToYAML([]byte(js))
require.NoError(t, err)
return string(yamlBytes)
}
tests := []struct {
name string
contentType string
body string
want string
}{
{
name: "empty",
contentType: "",
body: "",
want: "",
},
// json
{
name: "json query search api",
contentType: "application/json",
body: jsonSearchAPIQuery,
want: jsonSearchAPIJustQuery,
},
{
name: "json query knn",
contentType: "application/json",
body: jsonKNNSearchAPIQuery,
want: jsonKNNSearchAPIJustQuery,
},
{
name: "json query sql",
contentType: "application/json",
body: jsonSQLSearchAPIQuery,
want: jsonSQLSearchAPIJustQuery,
},
{
name: "json bad encoding",
contentType: "application/json",
body: "",
want: "",
},
// yaml
{
name: "yaml query search api",
contentType: "application/yaml",
body: toYAML(jsonSearchAPIQuery),
want: jsonSearchAPIJustQuery,
},
{
name: "yaml query knn",
contentType: "application/yaml",
body: toYAML(jsonKNNSearchAPIQuery),
want: jsonKNNSearchAPIJustQuery,
},
{
name: "yaml query sql",
contentType: "application/yaml",
body: toYAML(jsonSQLSearchAPIQuery),
want: jsonSQLSearchAPIJustQuery,
},
{
name: "yaml bad encoding",
contentType: "application/yaml",
body: "",
want: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &Engine{}
e.Log = logrus.StandardLogger()
result := e.getQueryFromRequestBody(tt.contentType, []byte(tt.body))
t.Log(result)
require.Equal(t, tt.want, result)
})
}
}

View file

@ -0,0 +1,75 @@
/*
Copyright 2022 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package elasticsearch
import (
"io"
"testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
)
func FuzzGetQueryFromRequestBody(f *testing.F) {
mkEngine := func() *Engine {
e := &Engine{}
log := logrus.New()
log.SetOutput(io.Discard)
e.Log = log
return e
}
f.Fuzz(func(t *testing.T, contentType string, body []byte) {
require.NotPanics(t, func() {
e := mkEngine()
e.getQueryFromRequestBody(contentType, body)
})
})
}
func FuzzPathToMatcher(f *testing.F) {
f.Add("/_security/foo")
f.Add("/_ssl/asd")
f.Add("/_search/")
f.Add("/_async_search/")
f.Add("/_pit/")
f.Add("/_msearch/")
f.Add("/_render/")
f.Add("/_field_caps/")
f.Add("/_sql/")
f.Add("/_eql/")
f.Add("/target/_search")
f.Add("/target/_async_search")
f.Add("/target/_pit")
f.Add("/target/_knn_search")
f.Add("/target/_msearch")
f.Add("/target/_search_shards")
f.Add("/target/_count")
f.Add("/target/_validate")
f.Add("/target/_terms_enum")
f.Add("/target/_explain")
f.Add("/target/_field_caps")
f.Add("/target/_rank_eval")
f.Add("/target/_mvt")
f.Fuzz(func(t *testing.T, path string) {
require.NotPanics(t, func() {
parsePath(path)
})
})
}

View file

@ -26,13 +26,12 @@ import (
"strings"
"testing"
elastic "github.com/elastic/go-elasticsearch/v8"
"github.com/stretchr/testify/require"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/defaults"
libevents "github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/srv/alpnproxy"
"github.com/gravitational/teleport/lib/srv/db/common"
"github.com/gravitational/teleport/lib/srv/db/elasticsearch"
)
@ -158,8 +157,9 @@ func TestAuditElasticsearch(t *testing.T) {
proxy.Close()
})
var dbConn *elastic.Client
var proxy *alpnproxy.LocalProxy
dbConn, proxy, err := testCtx.elasticsearchClient(ctx, "alice", "Elasticsearch", "admin")
require.NoError(t, err)
t.Cleanup(func() {
if proxy != nil {
proxy.Close()
@ -168,15 +168,25 @@ func TestAuditElasticsearch(t *testing.T) {
t.Run("session starts event", func(t *testing.T) {
// Connect should trigger successful session start event.
var err error
dbConn, proxy, err = testCtx.elasticsearchClient(ctx, "alice", "Elasticsearch", "admin")
require.NoError(t, err)
resp, err := dbConn.Ping()
require.NoError(t, err)
require.False(t, resp.IsError())
waitForEvent(t, testCtx, libevents.DatabaseSessionStartCode)
})
t.Run("command sends", func(t *testing.T) {
// should trigger Query event.
result, err := dbConn.SQL.Query(strings.NewReader(`{ "query": "SELECT 42" }`))
require.NoError(t, err)
require.Equal(t, `[200 OK] {"columns":[{"name":"42","type":"integer"}],"rows":[[42]]}`, result.String())
_ = waitForEvent(t, testCtx, libevents.ElasticsearchRequestCode)
// actual query
ev := waitForEvent(t, testCtx, libevents.ElasticsearchRequestCode)
require.Equal(t, "/_sql", ev.(*events.ElasticsearchRequest).Path)
require.Equal(t, []byte(`{ "query": "SELECT 42" }`), ev.(*events.ElasticsearchRequest).Body)
})
}
func withElasticsearch(name string, opts ...elasticsearch.TestServerOption) withDatabaseOption {

View file

@ -337,10 +337,6 @@ func onProxyCommandDB(cf *CLIConf) error {
return trace.BadParameter("Snowflake proxy works only in the tunnel mode. Please add --tunnel flag to enable it")
}
if routeToDatabase.Protocol == defaults.ProtocolElasticsearch && !cf.LocalProxyTunnel {
return trace.BadParameter("Elasticsearch proxy works only in the tunnel mode. Please add --tunnel flag to enable it")
}
rootCluster, err := client.RootClusterName(cf.Context)
if err != nil {
return trace.Wrap(err)