Skip to content

Commit

Permalink
Support Elasticsearch 8.x (#4829)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Resolves #3571 

## Description of the changes
- Added index templates for esv8
- Added a esv8 client to handle the put index template request that is
not currently possible by olivere/elastic
- After these changes are merged , users no more need to use
create.index-templates= false unless they want to add their custom index
templates and force the es.version = 7

---------

Signed-off-by: bugslayer-332 <ayashwanth9503@gmail.com>
Signed-off-by: Yashwanth Reddy  <89122324+pmuls99@users.noreply.github.com>
Signed-off-by: Yuri Shkuro <github@ysh.us>
Co-authored-by: bugslayer-332 <ayashwanth9503@gmail.com>
Co-authored-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
3 people committed Nov 23, 2023
1 parent 9cd9fdb commit 05332d7
Show file tree
Hide file tree
Showing 29 changed files with 892 additions and 98 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci-elasticsearch.yml
Expand Up @@ -30,6 +30,9 @@ jobs:
- major: 7.x
image: 7.14.0
distribution: elasticsearch
- major: 8.x
image: 8.8.2
distribution: elasticsearch
name: ${{ matrix.version.distribution }} ${{ matrix.version.major }}
steps:
- name: Harden Runner
Expand Down
20 changes: 11 additions & 9 deletions cmd/es-rollover/app/init/action.go
Expand Up @@ -39,13 +39,16 @@ type Action struct {

func (c Action) getMapping(version uint, templateName string) (string, error) {
mappingBuilder := mappings.MappingBuilder{
TemplateBuilder: es.TextTemplateBuilder{},
Shards: int64(c.Config.Shards),
Replicas: int64(c.Config.Replicas),
IndexPrefix: c.Config.IndexPrefix,
UseILM: c.Config.UseILM,
ILMPolicyName: c.Config.ILMPolicyName,
EsVersion: version,
TemplateBuilder: es.TextTemplateBuilder{},
PrioritySpanTemplate: int64(c.Config.PrioritySpanTemplate),
PriorityServiceTemplate: int64(c.Config.PriorityServiceTemplate),
PriorityDependenciesTemplate: int64(c.Config.PriorityDependenciesTemplate),
Shards: int64(c.Config.Shards),
Replicas: int64(c.Config.Replicas),
IndexPrefix: c.Config.IndexPrefix,
UseILM: c.Config.UseILM,
ILMPolicyName: c.Config.ILMPolicyName,
EsVersion: version,
}
return mappingBuilder.GetMapping(templateName)
}
Expand All @@ -57,7 +60,7 @@ func (c Action) Do() error {
return err
}
if c.Config.UseILM {
if version == ilmVersionSupport {
if version >= ilmVersionSupport {
policyExist, err := c.ILMClient.Exists(c.Config.ILMPolicyName)
if err != nil {
return err
Expand Down Expand Up @@ -109,7 +112,6 @@ func (c Action) init(version uint, indexopt app.IndexOption) error {
if err != nil {
return err
}

err = c.IndicesClient.CreateTemplate(mapping, indexopt.TemplateName())
if err != nil {
return err
Expand Down
20 changes: 16 additions & 4 deletions cmd/es-rollover/app/init/flags.go
Expand Up @@ -23,25 +23,37 @@ import (
)

const (
shards = "shards"
replicas = "replicas"
shards = "shards"
replicas = "replicas"
prioritySpanTemplate = "priority-span-template"
priorityServiceTemplate = "priority-service-template"
priorityDependenciesTemplate = "priority-dependencies-template"
)

// Config holds configuration for index cleaner binary.
type Config struct {
app.Config
Shards int
Replicas int
Shards int
Replicas int
PrioritySpanTemplate int
PriorityServiceTemplate int
PriorityDependenciesTemplate int
}

// AddFlags adds flags for TLS to the FlagSet.
func (c *Config) AddFlags(flags *flag.FlagSet) {
flags.Int(shards, 5, "Number of shards")
flags.Int(replicas, 1, "Number of replicas")
flags.Int(prioritySpanTemplate, 0, "Priority of jaeger-span index template (ESv8 only)")
flags.Int(priorityServiceTemplate, 0, "Priority of jaeger-service index template (ESv8 only)")
flags.Int(priorityDependenciesTemplate, 0, "Priority of jaeger-dependecies index template (ESv8 only)")
}

// InitFromViper initializes config from viper.Viper.
func (c *Config) InitFromViper(v *viper.Viper) {
c.Shards = v.GetInt(shards)
c.Replicas = v.GetInt(replicas)
c.PrioritySpanTemplate = v.GetInt(prioritySpanTemplate)
c.PriorityServiceTemplate = v.GetInt(priorityServiceTemplate)
c.PriorityDependenciesTemplate = v.GetInt(priorityDependenciesTemplate)
}
6 changes: 6 additions & 0 deletions cmd/es-rollover/app/init/flags_test.go
Expand Up @@ -36,10 +36,16 @@ func TestBindFlags(t *testing.T) {
err := command.ParseFlags([]string{
"--shards=8",
"--replicas=16",
"--priority-span-template=300",
"--priority-service-template=301",
"--priority-dependencies-template=302",
})
require.NoError(t, err)

c.InitFromViper(v)
assert.Equal(t, 8, c.Shards)
assert.Equal(t, 16, c.Replicas)
assert.Equal(t, 300, c.PrioritySpanTemplate)
assert.Equal(t, 301, c.PriorityServiceTemplate)
assert.Equal(t, 302, c.PriorityDependenciesTemplate)
}
2 changes: 2 additions & 0 deletions go.mod
Expand Up @@ -10,6 +10,7 @@ require (
github.com/bsm/sarama-cluster v2.1.13+incompatible
github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b
github.com/dgraph-io/badger/v3 v3.2103.5
github.com/elastic/go-elasticsearch/v8 v8.11.0
github.com/fsnotify/fsnotify v1.7.0
github.com/go-kit/kit v0.13.0
github.com/go-logr/zapr v1.3.0
Expand Down Expand Up @@ -109,6 +110,7 @@ require (
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.3.0 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-kit/log v0.2.1 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Expand Up @@ -130,6 +130,14 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4A
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6 h1:1+44gxLdKRnR/Bx/iAtr+XqNcE4e0oODa63+FABNANI=
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo=
github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
github.com/elastic/go-elasticsearch/v8 v8.10.0 h1:ALg3DMxSrx07YmeMNcfPf7cFh1Ep2+Qa19EOXTbwr2k=
github.com/elastic/go-elasticsearch/v8 v8.10.0/go.mod h1:NGmpvohKiRHXI0Sw4fuUGn6hYOmAXlyCphKpzVBiqDE=
github.com/elastic/go-elasticsearch/v8 v8.11.0 h1:gUazf443rdYAEAD7JHX5lSXRgTkG4N4IcsV8dcWQPxM=
github.com/elastic/go-elasticsearch/v8 v8.11.0/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down
3 changes: 2 additions & 1 deletion pkg/es/client/client.go
Expand Up @@ -52,7 +52,8 @@ func newResponseError(err error, code int, body []byte) ResponseError {
}
}

// Client is a generic client to make requests to ES
// Client executes requests against Elasticsearch using direct HTTP calls,
// without using the official Go client for ES.
type Client struct {
// Http client.
Client *http.Client
Expand Down
2 changes: 1 addition & 1 deletion pkg/es/client/cluster_client.go
Expand Up @@ -36,7 +36,7 @@ func (c *ClusterClient) Version() (uint, error) {
TagLine string `json:"tagline"`
}
body, err := c.request(elasticRequest{
endpoint: "/",
endpoint: "",
method: http.MethodGet,
})
if err != nil {
Expand Down
20 changes: 18 additions & 2 deletions pkg/es/client/cluster_client_test.go
Expand Up @@ -104,7 +104,6 @@ const opensearch2 = `
`

const elasticsearch7 = `
{
"name" : "elasticsearch-0",
"cluster_name" : "clustername",
Expand All @@ -124,8 +123,17 @@ const elasticsearch7 = `
}
`

const elasticsearch6 = `
const elasticsearch8 = `
{
"name" : "elasticsearch-0",
"version" : {
"number" : "8.0.0"
},
"tagline" : "You Know, for Search"
}
`

const elasticsearch6 = `
{
"name" : "elasticsearch-0",
"cluster_name" : "clustername",
Expand Down Expand Up @@ -165,6 +173,12 @@ func TestVersion(t *testing.T) {
response: elasticsearch7,
expectedResult: 7,
},
{
name: "success with elasticsearch 8",
responseCode: http.StatusOK,
response: elasticsearch8,
expectedResult: 8,
},
{
name: "success with opensearch 1",
responseCode: http.StatusOK,
Expand Down Expand Up @@ -223,7 +237,9 @@ func TestVersion(t *testing.T) {
if test.errContains != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), test.errContains)
return
}
require.NoError(t, err)
assert.Equal(t, test.expectedResult, result)
})
}
Expand Down
15 changes: 14 additions & 1 deletion pkg/es/client/index_client.go
Expand Up @@ -231,10 +231,23 @@ func (i *IndicesClient) aliasAction(action string, aliases []Alias) error {
return err
}

func (i IndicesClient) version() (uint, error) {
cl := ClusterClient{Client: i.Client}
return cl.Version()
}

// CreateTemplate an ES index template
func (i IndicesClient) CreateTemplate(template, name string) error {
endpointFmt := "_template/%s"
if v, err := i.version(); err == nil {
if v >= 8 {
endpointFmt = "_index_template/%s"
}
} else {
return err
}
_, err := i.request(elasticRequest{
endpoint: fmt.Sprintf("_template/%s", name),
endpoint: fmt.Sprintf(endpointFmt, name),
method: http.MethodPut,
body: []byte(template),
})
Expand Down
15 changes: 14 additions & 1 deletion pkg/es/client/index_client_test.go
Expand Up @@ -484,16 +484,24 @@ func TestClientCreateTemplate(t *testing.T) {
templateContent := "template content"
tests := []struct {
name string
versionResp string
responseCode int
response string
errContains string
}{
{
name: "success",
name: "success/v7",
versionResp: elasticsearch7,
responseCode: http.StatusOK,
},
{
name: "success/v8",
versionResp: elasticsearch8,
responseCode: http.StatusOK,
},
{
name: "client error",
versionResp: elasticsearch7,
responseCode: http.StatusBadRequest,
response: esErrResponse,
errContains: "failed to create template: jaeger-template",
Expand All @@ -502,6 +510,11 @@ func TestClientCreateTemplate(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
if req.URL.String() == "/" { // ES version check
res.WriteHeader(http.StatusOK)
res.Write([]byte(test.versionResp))
return
}
assert.True(t, strings.HasSuffix(req.URL.String(), "_template/jaeger-template"))
assert.Equal(t, http.MethodPut, req.Method)
assert.Equal(t, "Basic foobar", req.Header.Get("Authorization"))
Expand Down
File renamed without changes.
39 changes: 37 additions & 2 deletions pkg/es/config/config.go
Expand Up @@ -29,6 +29,7 @@ import (
"sync"
"time"

esV8 "github.com/elastic/go-elasticsearch/v8"
"github.com/olivere/elastic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -57,6 +58,9 @@ type Configuration struct {
MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards" mapstructure:"num_shards"`
NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"`
PrioritySpanTemplate int64 `yaml:"priority_span_template" mapstructure:"priority_span_template"`
PriorityServiceTemplate int64 `yaml:"priority_service_template" mapstructure:"priority_service_template"`
PriorityDependenciesTemplate int64 `yaml:"priority_dependencies_template" mapstructure:"priority_dependencies_template"`
Timeout time.Duration `validate:"min=500" mapstructure:"-"`
BulkSize int `mapstructure:"-"`
BulkWorkers int `mapstructure:"-"`
Expand Down Expand Up @@ -111,7 +115,7 @@ func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Fact
sm := storageMetrics.NewWriteMetrics(metricsFactory, "bulk_index")
m := sync.Map{}

service, err := rawClient.BulkProcessor().
bulkProc, err := rawClient.BulkProcessor().
Before(func(id int64, requests []elastic.BulkableRequest) {
m.Store(id, time.Now())
}).
Expand Down Expand Up @@ -184,7 +188,29 @@ func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Fact
c.Version = uint(esVersion)
}

return eswrapper.WrapESClient(rawClient, service, c.Version), nil
var rawClientV8 *esV8.Client
if c.Version >= 8 {
rawClientV8, err = newElasticsearchV8(c, logger)
if err != nil {
return nil, fmt.Errorf("error creating v8 client: %v", err)
}
}

return eswrapper.WrapESClient(rawClient, bulkProc, c.Version, rawClientV8), nil
}

func newElasticsearchV8(c *Configuration, logger *zap.Logger) (*esV8.Client, error) {
var options esV8.Config
options.Addresses = c.Servers
options.Username = c.Username
options.Password = c.Password
options.DiscoverNodesOnStart = c.Sniffer
transport, err := GetHTTPRoundTripper(c, logger)
if err != nil {
return nil, err
}
options.Transport = transport
return esV8.NewClient(options)
}

// ApplyDefaults copies settings from source unless its own value is non-zero.
Expand All @@ -210,6 +236,15 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.NumReplicas == 0 {
c.NumReplicas = source.NumReplicas
}
if c.PrioritySpanTemplate == 0 {
c.PrioritySpanTemplate = source.PrioritySpanTemplate
}
if c.PriorityServiceTemplate == 0 {
c.PriorityServiceTemplate = source.PriorityServiceTemplate
}
if c.PrioritySpanTemplate == 0 {
c.PriorityDependenciesTemplate = source.PriorityDependenciesTemplate
}
if c.BulkSize == 0 {
c.BulkSize = source.BulkSize
}
Expand Down

0 comments on commit 05332d7

Please sign in to comment.