Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Elasticsearch 8.x #4829

Merged
merged 18 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci-elasticsearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ jobs:
- major: 7.x
image: 7.14.0
distribution: elasticsearch
- major: 8.x
image: 8.8.2
distribution: elasticsearch
name: ${{ matrix.version.distribution }} ${{ matrix.version.major }}
steps:
- name: Harden Runner
Expand Down
33 changes: 21 additions & 12 deletions cmd/es-rollover/app/init/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@

func (c Action) getMapping(version uint, templateName string) (string, error) {
mappingBuilder := mappings.MappingBuilder{
TemplateBuilder: es.TextTemplateBuilder{},
Shards: int64(c.Config.Shards),
Replicas: int64(c.Config.Replicas),
IndexPrefix: c.Config.IndexPrefix,
UseILM: c.Config.UseILM,
ILMPolicyName: c.Config.ILMPolicyName,
EsVersion: version,
TemplateBuilder: es.TextTemplateBuilder{},
PrioritySpanTemplate: int64(c.Config.PrioritySpanTemplate),
PriorityServiceTemplate: int64(c.Config.PriorityServiceTemplate),
PriorityDependenciesTemplate: int64(c.Config.PriorityDependenciesTemplate),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make sure PR description reflects all the changes, and explain why they are needed (especially the priority)

Shards: int64(c.Config.Shards),
Replicas: int64(c.Config.Replicas),
IndexPrefix: c.Config.IndexPrefix,
UseILM: c.Config.UseILM,
ILMPolicyName: c.Config.ILMPolicyName,
EsVersion: version,
}
return mappingBuilder.GetMapping(templateName)
}
Expand All @@ -57,7 +60,7 @@
return err
}
if c.Config.UseILM {
if version == ilmVersionSupport {
if version >= ilmVersionSupport {
policyExist, err := c.ILMClient.Exists(c.Config.ILMPolicyName)
if err != nil {
return err
Expand Down Expand Up @@ -109,10 +112,16 @@
if err != nil {
return err
}

err = c.IndicesClient.CreateTemplate(mapping, indexopt.TemplateName())
if err != nil {
return err
if version > 7 {
err = c.IndicesClient.CreateTemplateV8(mapping, indexopt.TemplateName())
if err != nil {
return err
}

Check warning on line 119 in cmd/es-rollover/app/init/action.go

View check run for this annotation

Codecov / codecov/patch

cmd/es-rollover/app/init/action.go#L116-L119

Added lines #L116 - L119 were not covered by tests
} else {
err = c.IndicesClient.CreateTemplate(mapping, indexopt.TemplateName())
if err != nil {
return err
}
}

index := indexopt.InitialRolloverIndex()
Expand Down
20 changes: 16 additions & 4 deletions cmd/es-rollover/app/init/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,37 @@ import (
)

const (
shards = "shards"
replicas = "replicas"
shards = "shards"
replicas = "replicas"
prioritySpanTemplate = "priority-span-template"
priorityServiceTemplate = "priority-service-template"
priorityDependenciesTemplate = "priority-dependencies-template"
)

// Config holds configuration for index cleaner binary.
type Config struct {
app.Config
Shards int
Replicas int
Shards int
Replicas int
PrioritySpanTemplate int
PriorityServiceTemplate int
PriorityDependenciesTemplate int
}

// AddFlags adds flags for TLS to the FlagSet.
func (c *Config) AddFlags(flags *flag.FlagSet) {
flags.Int(shards, 5, "Number of shards")
flags.Int(replicas, 1, "Number of replicas")
flags.Int(prioritySpanTemplate, 500, "Priority of span template")
flags.Int(priorityServiceTemplate, 501, "Prioirty of service template")
flags.Int(priorityDependenciesTemplate, 502, "Prioirty of dependencies template")
}

// InitFromViper initializes config from viper.Viper.
func (c *Config) InitFromViper(v *viper.Viper) {
c.Shards = v.GetInt(shards)
c.Replicas = v.GetInt(replicas)
c.PrioritySpanTemplate = v.GetInt(prioritySpanTemplate)
c.PriorityServiceTemplate = v.GetInt(priorityServiceTemplate)
c.PriorityDependenciesTemplate = v.GetInt(priorityDependenciesTemplate)
}
6 changes: 6 additions & 0 deletions cmd/es-rollover/app/init/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,16 @@ func TestBindFlags(t *testing.T) {
err := command.ParseFlags([]string{
"--shards=8",
"--replicas=16",
"--priority-span-template=300",
"--priority-service-template=301",
"--priority-dependencies-template=302",
})
require.NoError(t, err)

c.InitFromViper(v)
assert.Equal(t, 8, c.Shards)
assert.Equal(t, 16, c.Replicas)
assert.Equal(t, 300, c.PrioritySpanTemplate)
assert.Equal(t, 301, c.PriorityServiceTemplate)
assert.Equal(t, 302, c.PriorityDependenciesTemplate)
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/bsm/sarama-cluster v2.1.13+incompatible
github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b
github.com/dgraph-io/badger/v3 v3.2103.5
github.com/elastic/go-elasticsearch/v8 v8.11.0
github.com/fsnotify/fsnotify v1.7.0
github.com/go-kit/kit v0.13.0
github.com/go-logr/zapr v1.3.0
Expand Down Expand Up @@ -109,6 +110,7 @@ require (
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.3.0 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-kit/log v0.2.1 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4A
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6 h1:1+44gxLdKRnR/Bx/iAtr+XqNcE4e0oODa63+FABNANI=
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo=
github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
github.com/elastic/go-elasticsearch/v8 v8.10.0 h1:ALg3DMxSrx07YmeMNcfPf7cFh1Ep2+Qa19EOXTbwr2k=
github.com/elastic/go-elasticsearch/v8 v8.10.0/go.mod h1:NGmpvohKiRHXI0Sw4fuUGn6hYOmAXlyCphKpzVBiqDE=
github.com/elastic/go-elasticsearch/v8 v8.11.0 h1:gUazf443rdYAEAD7JHX5lSXRgTkG4N4IcsV8dcWQPxM=
github.com/elastic/go-elasticsearch/v8 v8.11.0/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down
18 changes: 18 additions & 0 deletions pkg/es/client/index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,24 @@
return nil
}

// Create Template for ESV8
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
func (i IndicesClient) CreateTemplateV8(template, name string) error {
_, err := i.request(elasticRequest{
endpoint: fmt.Sprintf("_index_template/%s", name),
method: http.MethodPut,
body: []byte(template),
})
if err != nil {
if responseError, isResponseError := err.(ResponseError); isResponseError {
if responseError.StatusCode != http.StatusOK {
return responseError.prefixMessage(fmt.Sprintf("failed to create template: %s", name))
}

Check warning on line 263 in pkg/es/client/index_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/client/index_client.go#L253-L263

Added lines #L253 - L263 were not covered by tests
}
return fmt.Errorf("failed to create template: %w", err)

Check warning on line 265 in pkg/es/client/index_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/client/index_client.go#L265

Added line #L265 was not covered by tests
}
return nil

Check warning on line 267 in pkg/es/client/index_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/client/index_client.go#L267

Added line #L267 was not covered by tests
}

// Rollover create a rollover for certain index/alias
func (i IndicesClient) Rollover(rolloverTarget string, conditions map[string]interface{}) error {
esReq := elasticRequest{
Expand Down
1 change: 1 addition & 0 deletions pkg/es/client/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type IndexAPI interface {
CreateAlias(aliases []Alias) error
DeleteAlias(aliases []Alias) error
CreateTemplate(template, name string) error
CreateTemplateV8(template, name string) error
Rollover(rolloverTarget string, conditions map[string]interface{}) error
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/es/client/mocks/index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (c *MockIndexAPI) CreateTemplate(template, name string) error {
ret := c.Called(template, name)
return ret.Error(0)
}
func (c *MockIndexAPI) CreateTemplateV8(template, name string) error {
ret := c.Called(template, name)
return ret.Error(0)
}
func (c *MockIndexAPI) Rollover(rolloverTarget string, conditions map[string]interface{}) error {
ret := c.Called(rolloverTarget, conditions)
return ret.Error(0)
Expand Down
32 changes: 32 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sync"
"time"

elasticsearch8 "github.com/elastic/go-elasticsearch/v8"
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
"github.com/olivere/elastic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -57,6 +58,9 @@ type Configuration struct {
MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards" mapstructure:"num_shards"`
NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"`
PrioritySpanTemplate int64 `yaml:"priority_span_template" mapstructure:"priority_span_template"`
PriorityServiceTemplate int64 `yaml:"priority_service_template" mapstructure:"priority_service_template"`
PriorityDependenciesTemplate int64 `yaml:"priority_dependencies_template" mapstructure:"priority_dependencies_template"`
Timeout time.Duration `validate:"min=500" mapstructure:"-"`
BulkSize int `mapstructure:"-"`
BulkWorkers int `mapstructure:"-"`
Expand Down Expand Up @@ -187,6 +191,25 @@ func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Fact
return eswrapper.WrapESClient(rawClient, service, c.Version), nil
}

func NewElasticSearch8Client(c *Configuration, logger *zap.Logger) (*elasticsearch8.Client, error) {
var options elasticsearch8.Config
options.Addresses = c.Servers
options.Username = c.Username
options.Password = c.Password
options.DiscoverNodesOnStart = c.Sniffer
transport, err := GetHTTPRoundTripper(c, logger)
if err != nil {
return nil, err
}
options.Transport = transport

client, err := elasticsearch8.NewClient(options)
if err != nil {
return nil, err
}
return client, nil
}

// ApplyDefaults copies settings from source unless its own value is non-zero.
func (c *Configuration) ApplyDefaults(source *Configuration) {
if len(c.RemoteReadClusters) == 0 {
Expand All @@ -210,6 +233,15 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.NumReplicas == 0 {
c.NumReplicas = source.NumReplicas
}
if c.PrioritySpanTemplate == 0 {
c.PrioritySpanTemplate = source.PrioritySpanTemplate
}
if c.PriorityServiceTemplate == 0 {
c.PriorityServiceTemplate = source.PriorityServiceTemplate
}
if c.PrioritySpanTemplate == 0 {
c.PriorityDependenciesTemplate = source.PriorityDependenciesTemplate
}
if c.BulkSize == 0 {
c.BulkSize = source.BulkSize
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/es/wrapper/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c ClientWrapper) Index() es.IndexService {
// Search calls this function to internal client.
func (c ClientWrapper) Search(indices ...string) es.SearchService {
searchService := c.client.Search(indices...)
if c.esVersion == 7 {
if c.esVersion >= 7 {
searchService = searchService.RestTotalHitsAsInt(true)
}
return WrapESSearchService(searchService)
Expand All @@ -75,7 +75,7 @@ func (c ClientWrapper) Search(indices ...string) es.SearchService {
// MultiSearch calls this function to internal client.
func (c ClientWrapper) MultiSearch() es.MultiSearchService {
multiSearchService := c.client.MultiSearch()
if c.esVersion == 7 {
if c.esVersion >= 7 {
multiSearchService = multiSearchService.RestTotalHitsAsInt(true)
}
return WrapESMultiSearchService(multiSearchService)
Expand Down Expand Up @@ -167,7 +167,7 @@ func (i IndexServiceWrapper) Index(index string) es.IndexService {

// Type calls this function to internal service.
func (i IndexServiceWrapper) Type(typ string) es.IndexService {
if i.esVersion == 7 {
if i.esVersion >= 7 {
return WrapESIndexService(i.bulkIndexReq, i.bulkService, i.esVersion)
}
return WrapESIndexService(i.bulkIndexReq.Type(typ), i.bulkService, i.esVersion)
Expand Down
33 changes: 30 additions & 3 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
"encoding/json"
"errors"
"fmt"
"strings"
"time"

elasticsearch8 "github.com/elastic/go-elasticsearch/v8"
"github.com/olivere/elastic"
"go.uber.org/zap"

Expand All @@ -39,6 +41,7 @@
// DependencyStore handles all queries and insertions to ElasticSearch dependencies
type DependencyStore struct {
client func() es.Client
v8Client *elasticsearch8.Client
logger *zap.Logger
dependencyIndexPrefix string
indexDateLayout string
Expand All @@ -49,6 +52,7 @@
// DependencyStoreParams holds constructor parameters for NewDependencyStore
type DependencyStoreParams struct {
Client func() es.Client
V8Client *elasticsearch8.Client
Logger *zap.Logger
IndexPrefix string
IndexDateLayout string
Expand All @@ -60,6 +64,7 @@
func NewDependencyStore(p DependencyStoreParams) *DependencyStore {
return &DependencyStore{
client: p.Client,
v8Client: p.V8Client,
logger: p.Logger,
dependencyIndexPrefix: prefixIndexName(p.IndexPrefix, dependencyIndex),
indexDateLayout: p.IndexDateLayout,
Expand All @@ -84,11 +89,14 @@

// CreateTemplates creates index templates.
func (s *DependencyStore) CreateTemplates(dependenciesTemplate string) error {
_, err := s.client().CreateTemplate("jaeger-dependencies").Body(dependenciesTemplate).Do(context.Background())
if err != nil {
esVersion := s.client().GetVersion()
if esVersion > 7 {
err := s.createTemplatesV8(dependenciesTemplate)
return err
} else {
err := s.createTemplates(dependenciesTemplate)

Check warning on line 97 in plugin/storage/es/dependencystore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/dependencystore/storage.go#L92-L97

Added lines #L92 - L97 were not covered by tests
return err
}
return nil
}

func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, dependencies []model.DependencyLink) {
Expand Down Expand Up @@ -153,3 +161,22 @@
}
return indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts)
}

func (s *DependencyStore) createTemplates(dependenciesTemplate string) error {
_, err := s.client().CreateTemplate("jaeger-dependencies").Body(dependenciesTemplate).Do(context.Background())
if err != nil {
return err
}
return nil

Check warning on line 170 in plugin/storage/es/dependencystore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/dependencystore/storage.go#L165-L170

Added lines #L165 - L170 were not covered by tests
}

func (s *DependencyStore) createTemplatesV8(dependenciesTemplate string) error {
dependenciesTemplateResponse, err := s.v8Client.Indices.PutIndexTemplate("jaeger-dependencies", strings.NewReader(dependenciesTemplate))
if dependenciesTemplateResponse.StatusCode != 200 {
return fmt.Errorf("Error creating Index templates for Span %s", dependenciesTemplateResponse.String())
}
if err != nil {
return err
}
return nil

Check warning on line 181 in plugin/storage/es/dependencystore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/dependencystore/storage.go#L173-L181

Added lines #L173 - L181 were not covered by tests
}