From 05332d73678d73ad38efab77ab658dc427e5b105 Mon Sep 17 00:00:00 2001 From: Yashwanth Reddy <89122324+pmuls99@users.noreply.github.com> Date: Thu, 23 Nov 2023 11:25:00 +0530 Subject: [PATCH] Support Elasticsearch 8.x (#4829) ## Which problem is this PR solving? - Resolves #3571 ## Description of the changes - Added index templates for esv8 - Added a esv8 client to handle the put index template request that is not currently possible by olivere/elastic - After these changes are merged , users no more need to use create.index-templates= false unless they want to add their custom index templates and force the es.version = 7 --------- Signed-off-by: bugslayer-332 Signed-off-by: Yashwanth Reddy <89122324+pmuls99@users.noreply.github.com> Signed-off-by: Yuri Shkuro Co-authored-by: bugslayer-332 Co-authored-by: Yuri Shkuro --- .github/workflows/ci-elasticsearch.yml | 3 + cmd/es-rollover/app/init/action.go | 20 +- cmd/es-rollover/app/init/flags.go | 20 +- cmd/es-rollover/app/init/flags_test.go | 6 + go.mod | 2 + go.sum | 8 + pkg/es/client/client.go | 3 +- pkg/es/client/cluster_client.go | 2 +- pkg/es/client/cluster_client_test.go | 20 +- pkg/es/client/index_client.go | 15 +- pkg/es/client/index_client_test.go | 15 +- .../{cluter_client.go => cluster_client.go} | 0 pkg/es/config/config.go | 39 +++- pkg/es/wrapper/wrapper.go | 54 +++++- plugin/storage/es/factory.go | 16 +- .../fixtures/jaeger-dependencies-8.json | 20 ++ .../mappings/fixtures/jaeger-service-8.json | 51 ++++++ .../es/mappings/fixtures/jaeger-span-8.json | 167 +++++++++++++++++ .../es/mappings/jaeger-dependencies-8.json | 24 +++ .../storage/es/mappings/jaeger-service-8.json | 55 ++++++ plugin/storage/es/mappings/jaeger-span-8.json | 172 ++++++++++++++++++ plugin/storage/es/mappings/mapping.go | 21 ++- plugin/storage/es/mappings/mapping_test.go | 37 ++-- plugin/storage/es/options.go | 41 ++++- plugin/storage/es/spanstore/writer.go | 5 +- .../storage/integration/elasticsearch_test.go | 101 +++++++--- .../integration/es_index_cleaner_test.go | 30 ++- .../integration/es_index_rollover_test.go | 30 ++- scripts/es-integration-test.sh | 13 +- 29 files changed, 892 insertions(+), 98 deletions(-) rename pkg/es/client/mocks/{cluter_client.go => cluster_client.go} (100%) create mode 100644 plugin/storage/es/mappings/fixtures/jaeger-dependencies-8.json create mode 100644 plugin/storage/es/mappings/fixtures/jaeger-service-8.json create mode 100644 plugin/storage/es/mappings/fixtures/jaeger-span-8.json create mode 100644 plugin/storage/es/mappings/jaeger-dependencies-8.json create mode 100644 plugin/storage/es/mappings/jaeger-service-8.json create mode 100644 plugin/storage/es/mappings/jaeger-span-8.json diff --git a/.github/workflows/ci-elasticsearch.yml b/.github/workflows/ci-elasticsearch.yml index fda2a9a7435..f52e823a7b7 100644 --- a/.github/workflows/ci-elasticsearch.yml +++ b/.github/workflows/ci-elasticsearch.yml @@ -30,6 +30,9 @@ jobs: - major: 7.x image: 7.14.0 distribution: elasticsearch + - major: 8.x + image: 8.8.2 + distribution: elasticsearch name: ${{ matrix.version.distribution }} ${{ matrix.version.major }} steps: - name: Harden Runner diff --git a/cmd/es-rollover/app/init/action.go b/cmd/es-rollover/app/init/action.go index abdc26c64da..6dcea2f36d4 100644 --- a/cmd/es-rollover/app/init/action.go +++ b/cmd/es-rollover/app/init/action.go @@ -39,13 +39,16 @@ type Action struct { func (c Action) getMapping(version uint, templateName string) (string, error) { mappingBuilder := mappings.MappingBuilder{ - TemplateBuilder: es.TextTemplateBuilder{}, - Shards: int64(c.Config.Shards), - Replicas: int64(c.Config.Replicas), - IndexPrefix: c.Config.IndexPrefix, - UseILM: c.Config.UseILM, - ILMPolicyName: c.Config.ILMPolicyName, - EsVersion: version, + TemplateBuilder: es.TextTemplateBuilder{}, + PrioritySpanTemplate: int64(c.Config.PrioritySpanTemplate), + PriorityServiceTemplate: int64(c.Config.PriorityServiceTemplate), + PriorityDependenciesTemplate: int64(c.Config.PriorityDependenciesTemplate), + Shards: int64(c.Config.Shards), + Replicas: int64(c.Config.Replicas), + IndexPrefix: c.Config.IndexPrefix, + UseILM: c.Config.UseILM, + ILMPolicyName: c.Config.ILMPolicyName, + EsVersion: version, } return mappingBuilder.GetMapping(templateName) } @@ -57,7 +60,7 @@ func (c Action) Do() error { return err } if c.Config.UseILM { - if version == ilmVersionSupport { + if version >= ilmVersionSupport { policyExist, err := c.ILMClient.Exists(c.Config.ILMPolicyName) if err != nil { return err @@ -109,7 +112,6 @@ func (c Action) init(version uint, indexopt app.IndexOption) error { if err != nil { return err } - err = c.IndicesClient.CreateTemplate(mapping, indexopt.TemplateName()) if err != nil { return err diff --git a/cmd/es-rollover/app/init/flags.go b/cmd/es-rollover/app/init/flags.go index 763656a7b3c..27fe910e569 100644 --- a/cmd/es-rollover/app/init/flags.go +++ b/cmd/es-rollover/app/init/flags.go @@ -23,25 +23,37 @@ import ( ) const ( - shards = "shards" - replicas = "replicas" + shards = "shards" + replicas = "replicas" + prioritySpanTemplate = "priority-span-template" + priorityServiceTemplate = "priority-service-template" + priorityDependenciesTemplate = "priority-dependencies-template" ) // Config holds configuration for index cleaner binary. type Config struct { app.Config - Shards int - Replicas int + Shards int + Replicas int + PrioritySpanTemplate int + PriorityServiceTemplate int + PriorityDependenciesTemplate int } // AddFlags adds flags for TLS to the FlagSet. func (c *Config) AddFlags(flags *flag.FlagSet) { flags.Int(shards, 5, "Number of shards") flags.Int(replicas, 1, "Number of replicas") + flags.Int(prioritySpanTemplate, 0, "Priority of jaeger-span index template (ESv8 only)") + flags.Int(priorityServiceTemplate, 0, "Priority of jaeger-service index template (ESv8 only)") + flags.Int(priorityDependenciesTemplate, 0, "Priority of jaeger-dependecies index template (ESv8 only)") } // InitFromViper initializes config from viper.Viper. func (c *Config) InitFromViper(v *viper.Viper) { c.Shards = v.GetInt(shards) c.Replicas = v.GetInt(replicas) + c.PrioritySpanTemplate = v.GetInt(prioritySpanTemplate) + c.PriorityServiceTemplate = v.GetInt(priorityServiceTemplate) + c.PriorityDependenciesTemplate = v.GetInt(priorityDependenciesTemplate) } diff --git a/cmd/es-rollover/app/init/flags_test.go b/cmd/es-rollover/app/init/flags_test.go index 483a3ce9690..4104f71e5ff 100644 --- a/cmd/es-rollover/app/init/flags_test.go +++ b/cmd/es-rollover/app/init/flags_test.go @@ -36,10 +36,16 @@ func TestBindFlags(t *testing.T) { err := command.ParseFlags([]string{ "--shards=8", "--replicas=16", + "--priority-span-template=300", + "--priority-service-template=301", + "--priority-dependencies-template=302", }) require.NoError(t, err) c.InitFromViper(v) assert.Equal(t, 8, c.Shards) assert.Equal(t, 16, c.Replicas) + assert.Equal(t, 300, c.PrioritySpanTemplate) + assert.Equal(t, 301, c.PriorityServiceTemplate) + assert.Equal(t, 302, c.PriorityDependenciesTemplate) } diff --git a/go.mod b/go.mod index dc79f46c3a5..f0de4952e30 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/bsm/sarama-cluster v2.1.13+incompatible github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b github.com/dgraph-io/badger/v3 v3.2103.5 + github.com/elastic/go-elasticsearch/v8 v8.11.0 github.com/fsnotify/fsnotify v1.7.0 github.com/go-kit/kit v0.13.0 github.com/go-logr/zapr v1.3.0 @@ -109,6 +110,7 @@ require ( github.com/eapache/go-resiliency v1.4.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect + github.com/elastic/elastic-transport-go/v8 v8.3.0 // indirect github.com/fatih/color v1.14.1 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-kit/log v0.2.1 // indirect diff --git a/go.sum b/go.sum index 7ddb073479d..43864534bee 100644 --- a/go.sum +++ b/go.sum @@ -130,6 +130,14 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4A github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6 h1:1+44gxLdKRnR/Bx/iAtr+XqNcE4e0oODa63+FABNANI= +github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= +github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo= +github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= +github.com/elastic/go-elasticsearch/v8 v8.10.0 h1:ALg3DMxSrx07YmeMNcfPf7cFh1Ep2+Qa19EOXTbwr2k= +github.com/elastic/go-elasticsearch/v8 v8.10.0/go.mod h1:NGmpvohKiRHXI0Sw4fuUGn6hYOmAXlyCphKpzVBiqDE= +github.com/elastic/go-elasticsearch/v8 v8.11.0 h1:gUazf443rdYAEAD7JHX5lSXRgTkG4N4IcsV8dcWQPxM= +github.com/elastic/go-elasticsearch/v8 v8.11.0/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= diff --git a/pkg/es/client/client.go b/pkg/es/client/client.go index 7ea944e2862..ad46cf15077 100644 --- a/pkg/es/client/client.go +++ b/pkg/es/client/client.go @@ -52,7 +52,8 @@ func newResponseError(err error, code int, body []byte) ResponseError { } } -// Client is a generic client to make requests to ES +// Client executes requests against Elasticsearch using direct HTTP calls, +// without using the official Go client for ES. type Client struct { // Http client. Client *http.Client diff --git a/pkg/es/client/cluster_client.go b/pkg/es/client/cluster_client.go index 7d05e29497c..0f90acab421 100644 --- a/pkg/es/client/cluster_client.go +++ b/pkg/es/client/cluster_client.go @@ -36,7 +36,7 @@ func (c *ClusterClient) Version() (uint, error) { TagLine string `json:"tagline"` } body, err := c.request(elasticRequest{ - endpoint: "/", + endpoint: "", method: http.MethodGet, }) if err != nil { diff --git a/pkg/es/client/cluster_client_test.go b/pkg/es/client/cluster_client_test.go index c0ff770a427..57d0b65801e 100644 --- a/pkg/es/client/cluster_client_test.go +++ b/pkg/es/client/cluster_client_test.go @@ -104,7 +104,6 @@ const opensearch2 = ` ` const elasticsearch7 = ` - { "name" : "elasticsearch-0", "cluster_name" : "clustername", @@ -124,8 +123,17 @@ const elasticsearch7 = ` } ` -const elasticsearch6 = ` +const elasticsearch8 = ` +{ + "name" : "elasticsearch-0", + "version" : { + "number" : "8.0.0" + }, + "tagline" : "You Know, for Search" + } +` +const elasticsearch6 = ` { "name" : "elasticsearch-0", "cluster_name" : "clustername", @@ -165,6 +173,12 @@ func TestVersion(t *testing.T) { response: elasticsearch7, expectedResult: 7, }, + { + name: "success with elasticsearch 8", + responseCode: http.StatusOK, + response: elasticsearch8, + expectedResult: 8, + }, { name: "success with opensearch 1", responseCode: http.StatusOK, @@ -223,7 +237,9 @@ func TestVersion(t *testing.T) { if test.errContains != "" { require.Error(t, err) assert.Contains(t, err.Error(), test.errContains) + return } + require.NoError(t, err) assert.Equal(t, test.expectedResult, result) }) } diff --git a/pkg/es/client/index_client.go b/pkg/es/client/index_client.go index 93892e80708..9fdaa89c308 100644 --- a/pkg/es/client/index_client.go +++ b/pkg/es/client/index_client.go @@ -231,10 +231,23 @@ func (i *IndicesClient) aliasAction(action string, aliases []Alias) error { return err } +func (i IndicesClient) version() (uint, error) { + cl := ClusterClient{Client: i.Client} + return cl.Version() +} + // CreateTemplate an ES index template func (i IndicesClient) CreateTemplate(template, name string) error { + endpointFmt := "_template/%s" + if v, err := i.version(); err == nil { + if v >= 8 { + endpointFmt = "_index_template/%s" + } + } else { + return err + } _, err := i.request(elasticRequest{ - endpoint: fmt.Sprintf("_template/%s", name), + endpoint: fmt.Sprintf(endpointFmt, name), method: http.MethodPut, body: []byte(template), }) diff --git a/pkg/es/client/index_client_test.go b/pkg/es/client/index_client_test.go index fc4db17f5fe..6d3b51f6420 100644 --- a/pkg/es/client/index_client_test.go +++ b/pkg/es/client/index_client_test.go @@ -484,16 +484,24 @@ func TestClientCreateTemplate(t *testing.T) { templateContent := "template content" tests := []struct { name string + versionResp string responseCode int response string errContains string }{ { - name: "success", + name: "success/v7", + versionResp: elasticsearch7, + responseCode: http.StatusOK, + }, + { + name: "success/v8", + versionResp: elasticsearch8, responseCode: http.StatusOK, }, { name: "client error", + versionResp: elasticsearch7, responseCode: http.StatusBadRequest, response: esErrResponse, errContains: "failed to create template: jaeger-template", @@ -502,6 +510,11 @@ func TestClientCreateTemplate(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + if req.URL.String() == "/" { // ES version check + res.WriteHeader(http.StatusOK) + res.Write([]byte(test.versionResp)) + return + } assert.True(t, strings.HasSuffix(req.URL.String(), "_template/jaeger-template")) assert.Equal(t, http.MethodPut, req.Method) assert.Equal(t, "Basic foobar", req.Header.Get("Authorization")) diff --git a/pkg/es/client/mocks/cluter_client.go b/pkg/es/client/mocks/cluster_client.go similarity index 100% rename from pkg/es/client/mocks/cluter_client.go rename to pkg/es/client/mocks/cluster_client.go diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index d54b9be0c2e..bee13a8a34e 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -29,6 +29,7 @@ import ( "sync" "time" + esV8 "github.com/elastic/go-elasticsearch/v8" "github.com/olivere/elastic" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -57,6 +58,9 @@ type Configuration struct { MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads NumShards int64 `yaml:"shards" mapstructure:"num_shards"` NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"` + PrioritySpanTemplate int64 `yaml:"priority_span_template" mapstructure:"priority_span_template"` + PriorityServiceTemplate int64 `yaml:"priority_service_template" mapstructure:"priority_service_template"` + PriorityDependenciesTemplate int64 `yaml:"priority_dependencies_template" mapstructure:"priority_dependencies_template"` Timeout time.Duration `validate:"min=500" mapstructure:"-"` BulkSize int `mapstructure:"-"` BulkWorkers int `mapstructure:"-"` @@ -111,7 +115,7 @@ func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Fact sm := storageMetrics.NewWriteMetrics(metricsFactory, "bulk_index") m := sync.Map{} - service, err := rawClient.BulkProcessor(). + bulkProc, err := rawClient.BulkProcessor(). Before(func(id int64, requests []elastic.BulkableRequest) { m.Store(id, time.Now()) }). @@ -184,7 +188,29 @@ func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Fact c.Version = uint(esVersion) } - return eswrapper.WrapESClient(rawClient, service, c.Version), nil + var rawClientV8 *esV8.Client + if c.Version >= 8 { + rawClientV8, err = newElasticsearchV8(c, logger) + if err != nil { + return nil, fmt.Errorf("error creating v8 client: %v", err) + } + } + + return eswrapper.WrapESClient(rawClient, bulkProc, c.Version, rawClientV8), nil +} + +func newElasticsearchV8(c *Configuration, logger *zap.Logger) (*esV8.Client, error) { + var options esV8.Config + options.Addresses = c.Servers + options.Username = c.Username + options.Password = c.Password + options.DiscoverNodesOnStart = c.Sniffer + transport, err := GetHTTPRoundTripper(c, logger) + if err != nil { + return nil, err + } + options.Transport = transport + return esV8.NewClient(options) } // ApplyDefaults copies settings from source unless its own value is non-zero. @@ -210,6 +236,15 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if c.NumReplicas == 0 { c.NumReplicas = source.NumReplicas } + if c.PrioritySpanTemplate == 0 { + c.PrioritySpanTemplate = source.PrioritySpanTemplate + } + if c.PriorityServiceTemplate == 0 { + c.PriorityServiceTemplate = source.PriorityServiceTemplate + } + if c.PrioritySpanTemplate == 0 { + c.PriorityDependenciesTemplate = source.PriorityDependenciesTemplate + } if c.BulkSize == 0 { c.BulkSize = source.BulkSize } diff --git a/pkg/es/wrapper/wrapper.go b/pkg/es/wrapper/wrapper.go index 9df3a251857..72621340712 100644 --- a/pkg/es/wrapper/wrapper.go +++ b/pkg/es/wrapper/wrapper.go @@ -17,7 +17,11 @@ package eswrapper import ( "context" + "fmt" + "strings" + esV8 "github.com/elastic/go-elasticsearch/v8" + esV8api "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/olivere/elastic" "github.com/jaegertracing/jaeger/pkg/es" @@ -30,6 +34,7 @@ type ClientWrapper struct { client *elastic.Client bulkService *elastic.BulkProcessor esVersion uint + clientV8 *esV8.Client } // GetVersion returns the ElasticSearch Version @@ -38,8 +43,13 @@ func (c ClientWrapper) GetVersion() uint { } // WrapESClient creates a ESClient out of *elastic.Client. -func WrapESClient(client *elastic.Client, s *elastic.BulkProcessor, esVersion uint) ClientWrapper { - return ClientWrapper{client: client, bulkService: s, esVersion: esVersion} +func WrapESClient(client *elastic.Client, s *elastic.BulkProcessor, esVersion uint, clientV8 *esV8.Client) ClientWrapper { + return ClientWrapper{ + client: client, + bulkService: s, + esVersion: esVersion, + clientV8: clientV8, + } } // IndexExists calls this function to internal client. @@ -54,6 +64,12 @@ func (c ClientWrapper) CreateIndex(index string) es.IndicesCreateService { // CreateTemplate calls this function to internal client. func (c ClientWrapper) CreateTemplate(ttype string) es.TemplateCreateService { + if c.esVersion >= 8 { + return TemplateCreatorWrapperV8{ + indicesV8: c.clientV8.Indices, + templateName: ttype, + } + } return WrapESTemplateCreateService(c.client.IndexPutTemplate(ttype)) } @@ -66,7 +82,7 @@ func (c ClientWrapper) Index() es.IndexService { // Search calls this function to internal client. func (c ClientWrapper) Search(indices ...string) es.SearchService { searchService := c.client.Search(indices...) - if c.esVersion == 7 { + if c.esVersion >= 7 { searchService = searchService.RestTotalHitsAsInt(true) } return WrapESSearchService(searchService) @@ -75,7 +91,7 @@ func (c ClientWrapper) Search(indices ...string) es.SearchService { // MultiSearch calls this function to internal client. func (c ClientWrapper) MultiSearch() es.MultiSearchService { multiSearchService := c.client.MultiSearch() - if c.esVersion == 7 { + if c.esVersion >= 7 { multiSearchService = multiSearchService.RestTotalHitsAsInt(true) } return WrapESMultiSearchService(multiSearchService) @@ -147,6 +163,34 @@ func (c TemplateCreateServiceWrapper) Do(ctx context.Context) (*elastic.IndicesP // --- +// TemplateCreatorWrapperV8 implements es.TemplateCreateService. +type TemplateCreatorWrapperV8 struct { + indicesV8 *esV8api.Indices + templateName string + templateMapping string +} + +// Body adds mapping to the future request. +func (c TemplateCreatorWrapperV8) Body(mapping string) es.TemplateCreateService { + cc := c // clone + cc.templateMapping = mapping + return cc +} + +// Do executes Put Template command. +func (c TemplateCreatorWrapperV8) Do(ctx context.Context) (*elastic.IndicesPutTemplateResponse, error) { + resp, err := c.indicesV8.PutIndexTemplate(c.templateName, strings.NewReader(c.templateMapping)) + if err != nil { + return nil, fmt.Errorf("error creating index template %s: %w", c.templateName, err) + } + if resp.StatusCode != 200 { + return nil, fmt.Errorf("error creating index template %s: %s", c.templateName, resp) + } + return nil, nil // no response expected by span writer +} + +// --- + // IndexServiceWrapper is a wrapper around elastic.ESIndexService. // See wrapper_nolint.go for more functions. type IndexServiceWrapper struct { @@ -167,7 +211,7 @@ func (i IndexServiceWrapper) Index(index string) es.IndexService { // Type calls this function to internal service. func (i IndexServiceWrapper) Type(typ string) es.IndexService { - if i.esVersion == 7 { + if i.esVersion >= 7 { return WrapESIndexService(i.bulkIndexReq, i.bulkService, i.esVersion) } return WrapESIndexService(i.bulkIndexReq.Type(typ), i.bulkService, i.esVersion) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index e22e133a871..ae3f20b96a8 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -229,18 +229,22 @@ func createSpanWriter( } mappingBuilder := mappings.MappingBuilder{ - TemplateBuilder: es.TextTemplateBuilder{}, - Shards: cfg.NumShards, - Replicas: cfg.NumReplicas, - EsVersion: cfg.Version, - IndexPrefix: cfg.IndexPrefix, - UseILM: cfg.UseILM, + TemplateBuilder: es.TextTemplateBuilder{}, + Shards: cfg.NumShards, + Replicas: cfg.NumReplicas, + EsVersion: cfg.Version, + IndexPrefix: cfg.IndexPrefix, + UseILM: cfg.UseILM, + PrioritySpanTemplate: cfg.PrioritySpanTemplate, + PriorityServiceTemplate: cfg.PriorityServiceTemplate, + PriorityDependenciesTemplate: cfg.PriorityDependenciesTemplate, } spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings() if err != nil { return nil, err } + writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{ Client: clientFn, IndexPrefix: cfg.IndexPrefix, diff --git a/plugin/storage/es/mappings/fixtures/jaeger-dependencies-8.json b/plugin/storage/es/mappings/fixtures/jaeger-dependencies-8.json new file mode 100644 index 00000000000..2e607b035a9 --- /dev/null +++ b/plugin/storage/es/mappings/fixtures/jaeger-dependencies-8.json @@ -0,0 +1,20 @@ +{ + "priority": 502, + "index_patterns": "test-jaeger-dependencies-*", + "template": { + "aliases": { + "test-jaeger-dependencies-read": {} + }, + "settings": { + "index.number_of_shards": 3, + "index.number_of_replicas": 3, + "index.mapping.nested_fields.limit": 50, + "index.requests.cache.enable": true, + "lifecycle": { + "name": "jaeger-test-policy", + "rollover_alias": "test-jaeger-dependencies-write" + } + }, + "mappings": {} + } +} diff --git a/plugin/storage/es/mappings/fixtures/jaeger-service-8.json b/plugin/storage/es/mappings/fixtures/jaeger-service-8.json new file mode 100644 index 00000000000..39ba82012e9 --- /dev/null +++ b/plugin/storage/es/mappings/fixtures/jaeger-service-8.json @@ -0,0 +1,51 @@ +{ + "priority": 501, + "index_patterns": "test-jaeger-service-*", + "template": { + "aliases": { + "test-jaeger-service-read": {} + }, + "settings": { + "index.number_of_shards": 3, + "index.number_of_replicas": 3, + "index.mapping.nested_fields.limit": 50, + "index.requests.cache.enable": true, + "lifecycle": { + "name": "jaeger-test-policy", + "rollover_alias": "test-jaeger-service-write" + } + }, + "mappings": { + "dynamic_templates": [ + { + "span_tags_map": { + "mapping": { + "type": "keyword", + "ignore_above": 256 + }, + "path_match": "tag.*" + } + }, + { + "process_tags_map": { + "mapping": { + "type": "keyword", + "ignore_above": 256 + }, + "path_match": "process.tag.*" + } + } + ], + "properties": { + "serviceName": { + "type": "keyword", + "ignore_above": 256 + }, + "operationName": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } +} diff --git a/plugin/storage/es/mappings/fixtures/jaeger-span-8.json b/plugin/storage/es/mappings/fixtures/jaeger-span-8.json new file mode 100644 index 00000000000..51dc701d4a5 --- /dev/null +++ b/plugin/storage/es/mappings/fixtures/jaeger-span-8.json @@ -0,0 +1,167 @@ +{ + "priority": 500, + "index_patterns": "test-jaeger-span-*", + "template": { + "aliases": { + "test-jaeger-span-read": {} + }, + "settings": { + "index.number_of_shards": 3, + "index.number_of_replicas": 3, + "index.mapping.nested_fields.limit": 50, + "index.requests.cache.enable": true, + "lifecycle": { + "name": "jaeger-test-policy", + "rollover_alias": "test-jaeger-span-write" + } + }, + "mappings": { + "dynamic_templates": [ + { + "span_tags_map": { + "mapping": { + "type": "keyword", + "ignore_above": 256 + }, + "path_match": "tag.*" + } + }, + { + "process_tags_map": { + "mapping": { + "type": "keyword", + "ignore_above": 256 + }, + "path_match": "process.tag.*" + } + } + ], + "properties": { + "traceID": { + "type": "keyword", + "ignore_above": 256 + }, + "parentSpanID": { + "type": "keyword", + "ignore_above": 256 + }, + "spanID": { + "type": "keyword", + "ignore_above": 256 + }, + "operationName": { + "type": "keyword", + "ignore_above": 256 + }, + "startTime": { + "type": "long" + }, + "startTimeMillis": { + "type": "date", + "format": "epoch_millis" + }, + "duration": { + "type": "long" + }, + "flags": { + "type": "integer" + }, + "logs": { + "type": "nested", + "dynamic": false, + "properties": { + "timestamp": { + "type": "long" + }, + "fields": { + "type": "nested", + "dynamic": false, + "properties": { + "key": { + "type": "keyword", + "ignore_above": 256 + }, + "value": { + "type": "keyword", + "ignore_above": 256 + }, + "tagType": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "process": { + "properties": { + "serviceName": { + "type": "keyword", + "ignore_above": 256 + }, + "tag": { + "type": "object" + }, + "tags": { + "type": "nested", + "dynamic": false, + "properties": { + "key": { + "type": "keyword", + "ignore_above": 256 + }, + "value": { + "type": "keyword", + "ignore_above": 256 + }, + "tagType": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "references": { + "type": "nested", + "dynamic": false, + "properties": { + "refType": { + "type": "keyword", + "ignore_above": 256 + }, + "traceID": { + "type": "keyword", + "ignore_above": 256 + }, + "spanID": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "tag": { + "type": "object" + }, + "tags": { + "type": "nested", + "dynamic": false, + "properties": { + "key": { + "type": "keyword", + "ignore_above": 256 + }, + "value": { + "type": "keyword", + "ignore_above": 256 + }, + "tagType": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + } +} diff --git a/plugin/storage/es/mappings/jaeger-dependencies-8.json b/plugin/storage/es/mappings/jaeger-dependencies-8.json new file mode 100644 index 00000000000..57767866284 --- /dev/null +++ b/plugin/storage/es/mappings/jaeger-dependencies-8.json @@ -0,0 +1,24 @@ +{ + "priority": {{ .PriorityDependenciesTemplate }}, + "index_patterns": "{{ .IndexPrefix }}jaeger-dependencies-*", + "template": { + {{- if .UseILM }} + "aliases": { + "{{ .IndexPrefix }}jaeger-dependencies-read": {} + }, + {{- end }} + "settings": { + "index.number_of_shards": {{ .Shards }}, + "index.number_of_replicas": {{ .Replicas }}, + "index.mapping.nested_fields.limit": 50, + "index.requests.cache.enable": true + {{- if .UseILM }}, + "lifecycle": { + "name": "{{ .ILMPolicyName }}", + "rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write" + } + {{- end }} + }, + "mappings": {} + } +} diff --git a/plugin/storage/es/mappings/jaeger-service-8.json b/plugin/storage/es/mappings/jaeger-service-8.json new file mode 100644 index 00000000000..97ab02f573d --- /dev/null +++ b/plugin/storage/es/mappings/jaeger-service-8.json @@ -0,0 +1,55 @@ +{ + "priority": {{ .PriorityServiceTemplate}}, + "index_patterns": "{{ .IndexPrefix }}jaeger-service-*", + "template": { + {{- if .UseILM }} + "aliases": { + "{{ .IndexPrefix }}jaeger-service-read": {} + }, + {{- end }} + "settings": { + "index.number_of_shards": {{ .Shards }}, + "index.number_of_replicas": {{ .Replicas }}, + "index.mapping.nested_fields.limit": 50, + "index.requests.cache.enable": true + {{- if .UseILM }}, + "lifecycle": { + "name": "{{ .ILMPolicyName }}", + "rollover_alias": "{{ .IndexPrefix }}jaeger-service-write" + } + {{- end }} + }, + "mappings": { + "dynamic_templates": [ + { + "span_tags_map": { + "mapping": { + "type": "keyword", + "ignore_above": 256 + }, + "path_match": "tag.*" + } + }, + { + "process_tags_map": { + "mapping": { + "type": "keyword", + "ignore_above": 256 + }, + "path_match": "process.tag.*" + } + } + ], + "properties": { + "serviceName": { + "type": "keyword", + "ignore_above": 256 + }, + "operationName": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } +} diff --git a/plugin/storage/es/mappings/jaeger-span-8.json b/plugin/storage/es/mappings/jaeger-span-8.json new file mode 100644 index 00000000000..60bc5eaa910 --- /dev/null +++ b/plugin/storage/es/mappings/jaeger-span-8.json @@ -0,0 +1,172 @@ +{ + "priority": {{ .PrioritySpanTemplate}}, + "index_patterns": "{{ .IndexPrefix }}jaeger-span-*", + "template": { + + {{- if .UseILM}} + "aliases": { + "{{ .IndexPrefix }}jaeger-span-read": {} + }, + {{- end}} + "settings": { + "index.number_of_shards": {{ .Shards }}, + "index.number_of_replicas": {{ .Replicas }}, + "index.mapping.nested_fields.limit": 50, + "index.requests.cache.enable": true + {{- if .UseILM }}, + "lifecycle": { + "name": "{{ .ILMPolicyName }}", + "rollover_alias": "{{ .IndexPrefix }}jaeger-span-write" + } + {{- end }} + }, + "mappings": { + "dynamic_templates": [ + { + "span_tags_map": { + "mapping": { + "type": "keyword", + "ignore_above": 256 + }, + "path_match": "tag.*" + } + }, + { + "process_tags_map": { + "mapping": { + "type": "keyword", + "ignore_above": 256 + }, + "path_match": "process.tag.*" + } + } + ], + "properties": { + "traceID": { + "type": "keyword", + "ignore_above": 256 + }, + "parentSpanID": { + "type": "keyword", + "ignore_above": 256 + }, + "spanID": { + "type": "keyword", + "ignore_above": 256 + }, + "operationName": { + "type": "keyword", + "ignore_above": 256 + }, + "startTime": { + "type": "long" + }, + "startTimeMillis": { + "type": "date", + "format": "epoch_millis" + }, + "duration": { + "type": "long" + }, + "flags": { + "type": "integer" + }, + "logs": { + "type": "nested", + "dynamic": false, + "properties": { + "timestamp": { + "type": "long" + }, + "fields": { + "type": "nested", + "dynamic": false, + "properties": { + "key": { + "type": "keyword", + "ignore_above": 256 + }, + "value": { + "type": "keyword", + "ignore_above": 256 + }, + "tagType": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "process": { + "properties": { + "serviceName": { + "type": "keyword", + "ignore_above": 256 + }, + "tag": { + "type": "object" + }, + "tags": { + "type": "nested", + "dynamic": false, + "properties": { + "key": { + "type": "keyword", + "ignore_above": 256 + }, + "value": { + "type": "keyword", + "ignore_above": 256 + }, + "tagType": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "references": { + "type": "nested", + "dynamic": false, + "properties": { + "refType": { + "type": "keyword", + "ignore_above": 256 + }, + "traceID": { + "type": "keyword", + "ignore_above": 256 + }, + "spanID": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "tag": { + "type": "object" + }, + "tags": { + "type": "nested", + "dynamic": false, + "properties": { + "key": { + "type": "keyword", + "ignore_above": 256 + }, + "value": { + "type": "keyword", + "ignore_above": 256 + }, + "tagType": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + } +} diff --git a/plugin/storage/es/mappings/mapping.go b/plugin/storage/es/mappings/mapping.go index d76efd0bbbe..9ee624120db 100644 --- a/plugin/storage/es/mappings/mapping.go +++ b/plugin/storage/es/mappings/mapping.go @@ -29,18 +29,23 @@ var MAPPINGS embed.FS // MappingBuilder holds parameters required to render an elasticsearch index template type MappingBuilder struct { - TemplateBuilder es.TemplateBuilder - Shards int64 - Replicas int64 - EsVersion uint - IndexPrefix string - UseILM bool - ILMPolicyName string + TemplateBuilder es.TemplateBuilder + Shards int64 + Replicas int64 + PrioritySpanTemplate int64 + PriorityServiceTemplate int64 + PriorityDependenciesTemplate int64 + EsVersion uint + IndexPrefix string + UseILM bool + ILMPolicyName string } // GetMapping returns the rendered mapping based on elasticsearch version func (mb *MappingBuilder) GetMapping(mapping string) (string, error) { - if mb.EsVersion == 7 { + if mb.EsVersion == 8 { + return mb.fixMapping(mapping + "-8.json") + } else if mb.EsVersion == 7 { return mb.fixMapping(mapping + "-7.json") } return mb.fixMapping(mapping + ".json") diff --git a/plugin/storage/es/mappings/mapping_test.go b/plugin/storage/es/mappings/mapping_test.go index 8ecf0b558b2..a1ee8d3b7dd 100644 --- a/plugin/storage/es/mappings/mapping_test.go +++ b/plugin/storage/es/mappings/mapping_test.go @@ -17,6 +17,7 @@ package mappings import ( "embed" "errors" + "fmt" "io" "os" "testing" @@ -38,34 +39,39 @@ func TestMappingBuilder_GetMapping(t *testing.T) { mapping string esVersion uint }{ + {mapping: "jaeger-span", esVersion: 8}, {mapping: "jaeger-span", esVersion: 7}, {mapping: "jaeger-span", esVersion: 6}, + {mapping: "jaeger-service", esVersion: 8}, {mapping: "jaeger-service", esVersion: 7}, {mapping: "jaeger-service", esVersion: 6}, + {mapping: "jaeger-dependencies", esVersion: 8}, {mapping: "jaeger-dependencies", esVersion: 7}, {mapping: "jaeger-dependencies", esVersion: 6}, } for _, tt := range tests { t.Run(tt.mapping, func(t *testing.T) { mb := &MappingBuilder{ - TemplateBuilder: es.TextTemplateBuilder{}, - Shards: 3, - Replicas: 3, - EsVersion: tt.esVersion, - IndexPrefix: "test-", - UseILM: true, - ILMPolicyName: "jaeger-test-policy", + TemplateBuilder: es.TextTemplateBuilder{}, + Shards: 3, + Replicas: 3, + PrioritySpanTemplate: 500, + PriorityServiceTemplate: 501, + PriorityDependenciesTemplate: 502, + EsVersion: tt.esVersion, + IndexPrefix: "test-", + UseILM: true, + ILMPolicyName: "jaeger-test-policy", } got, err := mb.GetMapping(tt.mapping) require.NoError(t, err) var wantbytes []byte - if tt.esVersion == 7 { - wantbytes, err = FIXTURES.ReadFile("fixtures/" + tt.mapping + "-7.json") - require.NoError(t, err) - } else { - wantbytes, err = FIXTURES.ReadFile("fixtures/" + tt.mapping + ".json") - require.NoError(t, err) + fileSuffix := "" + if tt.esVersion >= 7 { + fileSuffix = fmt.Sprintf("-%d", tt.esVersion) } + wantbytes, err = FIXTURES.ReadFile("fixtures/" + tt.mapping + fileSuffix + ".json") + require.NoError(t, err) want := string(wantbytes) assert.Equal(t, got, want) }) @@ -77,11 +83,14 @@ func TestMappingBuilder_loadMapping(t *testing.T) { name string }{ {name: "jaeger-span.json"}, - {name: "jaeger-service.json"}, {name: "jaeger-span-7.json"}, + {name: "jaeger-span-8.json"}, + {name: "jaeger-service.json"}, {name: "jaeger-service-7.json"}, + {name: "jaeger-service-8.json"}, {name: "jaeger-dependencies.json"}, {name: "jaeger-dependencies-7.json"}, + {name: "jaeger-dependencies-8.json"}, } for _, test := range tests { mapping := loadMapping(test.name) diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 91fcfc38a15..406da2c313e 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -40,6 +40,9 @@ const ( suffixMaxSpanAge = ".max-span-age" suffixNumShards = ".num-shards" suffixNumReplicas = ".num-replicas" + suffixPrioritySpanTemplate = ".prioirity-span-template" + suffixPriorityServiceTemplate = ".prioirity-service-template" + suffixPriorityDependenciesTemplate = ".prioirity-dependencies-template" suffixBulkSize = ".bulk.size" suffixBulkWorkers = ".bulk.workers" suffixBulkActions = ".bulk.actions" @@ -94,16 +97,19 @@ type namespaceConfig struct { func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { // TODO all default values should be defined via cobra flags defaultConfig := config.Configuration{ - Username: "", - Password: "", - Sniffer: false, - MaxSpanAge: 72 * time.Hour, - NumShards: 5, - NumReplicas: 1, - BulkSize: 5 * 1000 * 1000, - BulkWorkers: 1, - BulkActions: 1000, - BulkFlushInterval: time.Millisecond * 200, + Username: "", + Password: "", + Sniffer: false, + MaxSpanAge: 72 * time.Hour, + NumShards: 5, + NumReplicas: 1, + PrioritySpanTemplate: 0, + PriorityServiceTemplate: 0, + PriorityDependenciesTemplate: 0, + BulkSize: 5 * 1000 * 1000, + BulkWorkers: 1, + BulkActions: 1000, + BulkFlushInterval: time.Millisecond * 200, Tags: config.TagsAsFields{ DotReplacement: "@", }, @@ -192,6 +198,18 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixNumReplicas, nsConfig.NumReplicas, "The number of replicas per index in Elasticsearch") + flagSet.Int64( + nsConfig.namespace+suffixPrioritySpanTemplate, + nsConfig.PrioritySpanTemplate, + "Priority of jaeger-span index template (ESv8 only)") + flagSet.Int64( + nsConfig.namespace+suffixPriorityServiceTemplate, + nsConfig.PriorityServiceTemplate, + "Priority of jaeger-service index template (ESv8 only)") + flagSet.Int64( + nsConfig.namespace+suffixPriorityDependenciesTemplate, + nsConfig.PriorityDependenciesTemplate, + "Priority of jaeger-dependecies index template (ESv8 only)") flagSet.Int( nsConfig.namespace+suffixBulkSize, nsConfig.BulkSize, @@ -314,6 +332,9 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.MaxSpanAge = v.GetDuration(cfg.namespace + suffixMaxSpanAge) cfg.NumShards = v.GetInt64(cfg.namespace + suffixNumShards) cfg.NumReplicas = v.GetInt64(cfg.namespace + suffixNumReplicas) + cfg.PrioritySpanTemplate = v.GetInt64(cfg.namespace + suffixPrioritySpanTemplate) + cfg.PriorityServiceTemplate = v.GetInt64(cfg.namespace + suffixPriorityServiceTemplate) + cfg.PriorityDependenciesTemplate = v.GetInt64(cfg.namespace + suffixPriorityDependenciesTemplate) cfg.BulkSize = v.GetInt(cfg.namespace + suffixBulkSize) cfg.BulkWorkers = v.GetInt(cfg.namespace + suffixBulkWorkers) cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions) diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index c9c584a6615..06470b0ad19 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -17,6 +17,7 @@ package spanstore import ( "context" + "fmt" "strings" "time" @@ -109,11 +110,11 @@ func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate, indexPrefix } _, err := s.client().CreateTemplate(indexPrefix + "jaeger-span").Body(spanTemplate).Do(context.Background()) if err != nil { - return err + return fmt.Errorf("failed to create template %q: %w", indexPrefix+"jaeger-span", err) } _, err = s.client().CreateTemplate(indexPrefix + "jaeger-service").Body(serviceTemplate).Do(context.Background()) if err != nil { - return err + return fmt.Errorf("failed to create template %q: %w", indexPrefix+"jaeger-service", err) } return nil } diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 589869e9069..fcc8f9cc32a 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + elasticsearch8 "github.com/elastic/go-elasticsearch/v8" "github.com/olivere/elastic" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -44,21 +45,25 @@ import ( ) const ( - host = "0.0.0.0" - queryPort = "9200" - queryHostPort = host + ":" + queryPort - queryURL = "http://" + queryHostPort - indexPrefix = "integration-test" - indexDateLayout = "2006-01-02" - tagKeyDeDotChar = "@" - maxSpanAge = time.Hour * 72 - defaultMaxDocCount = 10_000 + host = "0.0.0.0" + queryPort = "9200" + queryHostPort = host + ":" + queryPort + queryURL = "http://" + queryHostPort + indexPrefix = "integration-test" + indexDateLayout = "2006-01-02" + tagKeyDeDotChar = "@" + maxSpanAge = time.Hour * 72 + defaultMaxDocCount = 10_000 + spanTemplateName = "jaeger-span" + serviceTemplateName = "jaeger-service" + dependenciesTemplateName = "jaeger-dependencies" ) type ESStorageIntegration struct { StorageIntegration client *elastic.Client + v8Client *elasticsearch8.Client bulkProcessor *elastic.BulkProcessor logger *zap.Logger } @@ -105,7 +110,17 @@ func (s *ESStorageIntegration) initializeES(allTagsAsFields, archive bool) error s.logger, _ = testutils.NewLogger() s.client = rawClient - s.initSpanstore(allTagsAsFields, archive) + s.v8Client, err = elasticsearch8.NewClient(elasticsearch8.Config{ + Addresses: []string{queryURL}, + DiscoverNodesOnStart: false, + }) + if err != nil { + return err + } + + if err := s.initSpanstore(allTagsAsFields, archive); err != nil { + return err + } s.CleanUp = func() error { return s.esCleanUp(allTagsAsFields, archive) } @@ -131,20 +146,25 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro if err != nil { return err } - client := eswrapper.WrapESClient(s.client, bp, esVersion) + client := eswrapper.WrapESClient(s.client, bp, esVersion, s.v8Client) mappingBuilder := mappings.MappingBuilder{ - TemplateBuilder: estemplate.TextTemplateBuilder{}, - Shards: 5, - Replicas: 1, - EsVersion: client.GetVersion(), - IndexPrefix: indexPrefix, - UseILM: false, + TemplateBuilder: estemplate.TextTemplateBuilder{}, + Shards: 5, + Replicas: 1, + PrioritySpanTemplate: 500, + PriorityServiceTemplate: 501, + PriorityDependenciesTemplate: 502, + EsVersion: client.GetVersion(), + IndexPrefix: indexPrefix, + UseILM: false, } spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings() if err != nil { return err } + clientFn := func() estemplate.Client { return client } + w := spanstore.NewSpanWriter( spanstore.SpanWriterParams{ Client: clientFn, @@ -253,13 +273,29 @@ func TestElasticsearchStorage_IndexTemplates(t *testing.T) { } s := &ESStorageIntegration{} require.NoError(t, s.initializeES(true, false)) - serviceTemplateExists, _ := s.client.IndexTemplateExists(indexPrefix + "-jaeger-service").Do(context.Background()) - spanTemplateExists, _ := s.client.IndexTemplateExists(indexPrefix + "-jaeger-span").Do(context.Background()) - assert.True(t, serviceTemplateExists) - assert.True(t, spanTemplateExists) + esVersion, err := s.getVersion() + require.NoError(t, err) + // TODO abstract this into pkg/es/client.IndexManagementLifecycleAPI + if esVersion <= 7 { + serviceTemplateExists, err := s.client.IndexTemplateExists(indexPrefix + "-jaeger-service").Do(context.Background()) + require.NoError(t, err) + assert.True(t, serviceTemplateExists) + spanTemplateExists, err := s.client.IndexTemplateExists(indexPrefix + "-jaeger-span").Do(context.Background()) + require.NoError(t, err) + assert.True(t, spanTemplateExists) + } else { + serviceTemplateExistsResponse, err := s.v8Client.API.Indices.ExistsIndexTemplate(indexPrefix + "-jaeger-service") + require.NoError(t, err) + assert.Equal(t, 200, serviceTemplateExistsResponse.StatusCode) + spanTemplateExistsResponse, err := s.v8Client.API.Indices.ExistsIndexTemplate(indexPrefix + "-jaeger-span") + require.NoError(t, err) + assert.Equal(t, 200, spanTemplateExistsResponse.StatusCode) + } + err = s.cleanESIndexTemplates(t, indexPrefix) + require.NoError(t, err) } -func (s *StorageIntegration) testArchiveTrace(t *testing.T) { +func (s *ESStorageIntegration) testArchiveTrace(t *testing.T) { defer s.cleanUp(t) tID := model.NewTraceID(uint64(11), uint64(22)) expected := &model.Span{ @@ -284,3 +320,24 @@ func (s *StorageIntegration) testArchiveTrace(t *testing.T) { CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual) } } + +func (s *ESStorageIntegration) cleanESIndexTemplates(t *testing.T, prefix string) error { + version, err := s.getVersion() + require.NoError(t, err) + if version > 7 { + prefixWithSeparator := prefix + if prefix != "" { + prefixWithSeparator += "-" + } + _, err := s.v8Client.Indices.DeleteIndexTemplate(prefixWithSeparator + spanTemplateName) + require.NoError(t, err) + _, err = s.v8Client.Indices.DeleteIndexTemplate(prefixWithSeparator + serviceTemplateName) + require.NoError(t, err) + _, err = s.v8Client.Indices.DeleteIndexTemplate(prefixWithSeparator + dependenciesTemplateName) + require.NoError(t, err) + } else { + _, err := s.client.IndexDeleteTemplate("*").Do(context.Background()) + require.NoError(t, err) + } + return nil +} diff --git a/plugin/storage/integration/es_index_cleaner_test.go b/plugin/storage/integration/es_index_cleaner_test.go index 728570c399b..6805c99a684 100644 --- a/plugin/storage/integration/es_index_cleaner_test.go +++ b/plugin/storage/integration/es_index_cleaner_test.go @@ -23,9 +23,12 @@ import ( "os/exec" "testing" + elasticsearch8 "github.com/elastic/go-elasticsearch/v8" "github.com/olivere/elastic" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/pkg/testutils" ) const ( @@ -80,6 +83,8 @@ func TestIndexCleaner_doNotFailOnFullStorage(t *testing.T) { func TestIndexCleaner(t *testing.T) { client, err := createESClient() require.NoError(t, err) + v8Client, err := createESV8Client() + require.NoError(t, err) tests := []struct { name string @@ -116,24 +121,23 @@ func TestIndexCleaner(t *testing.T) { } for _, test := range tests { t.Run(fmt.Sprintf("%s_no_prefix, %s", test.name, test.envVars), func(t *testing.T) { - runIndexCleanerTest(t, client, "", test.expectedIndices, test.envVars) + runIndexCleanerTest(t, client, v8Client, "", test.expectedIndices, test.envVars) }) t.Run(fmt.Sprintf("%s_prefix, %s", test.name, test.envVars), func(t *testing.T) { - runIndexCleanerTest(t, client, indexPrefix, test.expectedIndices, append(test.envVars, "INDEX_PREFIX="+indexPrefix)) + runIndexCleanerTest(t, client, v8Client, indexPrefix, test.expectedIndices, append(test.envVars, "INDEX_PREFIX="+indexPrefix)) }) } } -func runIndexCleanerTest(t *testing.T, client *elastic.Client, prefix string, expectedIndices, envVars []string) { +func runIndexCleanerTest(t *testing.T, client *elastic.Client, v8Client *elasticsearch8.Client, prefix string, expectedIndices, envVars []string) { // make sure ES is clean _, err := client.DeleteIndex("*").Do(context.Background()) require.NoError(t, err) - + defer cleanESIndexTemplates(t, client, v8Client, prefix) err = createAllIndices(client, prefix) require.NoError(t, err) err = runEsCleaner(0, envVars) require.NoError(t, err) - indices, err := client.IndexNames() require.NoError(t, err) if prefix != "" { @@ -218,3 +222,19 @@ func createESClient() (*elastic.Client, error) { elastic.SetURL(queryURL), elastic.SetSniff(false)) } + +func createESV8Client() (*elasticsearch8.Client, error) { + return elasticsearch8.NewClient(elasticsearch8.Config{ + Addresses: []string{queryURL}, + DiscoverNodesOnStart: false, + }) +} + +func cleanESIndexTemplates(t *testing.T, client *elastic.Client, v8Client *elasticsearch8.Client, prefix string) { + s := &ESStorageIntegration{ + client: client, + v8Client: v8Client, + } + s.logger, _ = testutils.NewLogger() + s.cleanESIndexTemplates(t, prefix) +} diff --git a/plugin/storage/integration/es_index_rollover_test.go b/plugin/storage/integration/es_index_rollover_test.go index 0c1140ebe9c..715a1be2031 100644 --- a/plugin/storage/integration/es_index_rollover_test.go +++ b/plugin/storage/integration/es_index_rollover_test.go @@ -24,9 +24,12 @@ import ( "strconv" "testing" + elasticsearch8 "github.com/elastic/go-elasticsearch/v8" "github.com/olivere/elastic" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/pkg/testutils" ) const ( @@ -39,7 +42,7 @@ func TestIndexRollover_FailIfILMNotPresent(t *testing.T) { require.NoError(t, err) esVersion, err := getVersion(client) require.NoError(t, err) - if esVersion != 7 { + if esVersion < 7 { t.Skip("Integration test - " + t.Name() + " against ElasticSearch skipped for ES version " + fmt.Sprint(esVersion)) } // make sure ES is clean @@ -79,7 +82,7 @@ func runCreateIndicesWithILM(t *testing.T, ilmPolicyName string) { envVars = append(envVars, "ES_ILM_POLICY_NAME="+ilmPolicyName) } - if esVersion != 7 { + if esVersion < 7 { cleanES(t, client, "") err := runEsRollover("init", envVars) assert.EqualError(t, err, "exit status 1") @@ -103,9 +106,12 @@ func runIndexRolloverWithILMTest(t *testing.T, client *elastic.Client, prefix st // make sure ES is cleaned before test cleanES(t, client, ilmPolicyName) + v8Client, err := createESV8Client() + require.NoError(t, err) // make sure ES is cleaned after test defer cleanES(t, client, ilmPolicyName) - err := createILMPolicy(client, ilmPolicyName) + defer cleanESIndexTemplates(t, client, v8Client, prefix) + err = createILMPolicy(client, ilmPolicyName) require.NoError(t, err) if prefix != "" { @@ -146,6 +152,13 @@ func createESClient() (*elastic.Client, error) { elastic.SetSniff(false)) } +func createESV8Client() (*elasticsearch8.Client, error) { + return elasticsearch8.NewClient(elasticsearch8.Config{ + Addresses: []string{queryURL}, + DiscoverNodesOnStart: false, + }) +} + func runEsRollover(action string, envs []string) error { var dockerEnv string for _, e := range envs { @@ -180,7 +193,7 @@ func cleanES(t *testing.T, client *elastic.Client, policyName string) { require.NoError(t, err) esVersion, err := getVersion(client) require.NoError(t, err) - if esVersion == 7 { + if esVersion >= 7 { _, err = client.XPackIlmDeleteLifecycle().Policy(policyName).Do(context.Background()) if err != nil && !elastic.IsNotFound(err) { assert.Fail(t, "Not able to clean up ILM Policy") @@ -189,3 +202,12 @@ func cleanES(t *testing.T, client *elastic.Client, policyName string) { _, err = client.IndexDeleteTemplate("*").Do(context.Background()) require.NoError(t, err) } + +func cleanESIndexTemplates(t *testing.T, client *elastic.Client, v8Client *elasticsearch8.Client, prefix string) { + s := &ESStorageIntegration{ + client: client, + v8Client: v8Client, + } + s.logger, _ = testutils.NewLogger() + s.cleanESIndexTemplates(t, prefix) +} diff --git a/scripts/es-integration-test.sh b/scripts/es-integration-test.sh index 4450c3373ca..0de9ffd9e4c 100755 --- a/scripts/es-integration-test.sh +++ b/scripts/es-integration-test.sh @@ -27,8 +27,19 @@ setup_es() { --env "http.host=0.0.0.0" --env "transport.host=127.0.0.1" --env "xpack.security.enabled=false" - --env "xpack.monitoring.enabled=false" ) + local major_version=${tag%%.*} + if (( major_version < 8 )); then + params+=(--env "xpack.monitoring.enabled=false") + else + params+=(--env "xpack.monitoring.collection.enabled=false") + fi + if (( major_version > 7 )); then + params+=( + --env "action.destructive_requires_name=false" + ) + fi + local cid cid=$(docker run "${params[@]}" "${image}:${tag}") echo "${cid}"