Skip to content

Commit

Permalink
Make ES index name configurable (#1009)
Browse files Browse the repository at this point in the history
* Make ES index name configurable

It allow simple multitenancy - index per tenant or k8s namespace.

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* docs

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Fix comment

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* index prefix

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Fix index cleaner and add prefix delimiter

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Fix dependency storage

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Fmt and help message

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* increase converage

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* review fixes

Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed Aug 22, 2018
1 parent 40a3cc9 commit 75af597
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 78 deletions.
7 changes: 7 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Configuration struct {
BulkWorkers int
BulkActions int
BulkFlushInterval time.Duration
IndexPrefix string
}

// ClientBuilder creates new es.Client
Expand All @@ -49,6 +50,7 @@ type ClientBuilder interface {
GetNumShards() int64
GetNumReplicas() int64
GetMaxSpanAge() time.Duration
GetIndexPrefix() string
}

// NewClient creates a new ElasticSearch client
Expand Down Expand Up @@ -158,6 +160,11 @@ func (c *Configuration) GetMaxSpanAge() time.Duration {
return c.MaxSpanAge
}

// GetIndexPrefix returns index prefix
func (c *Configuration) GetIndexPrefix() string {
return c.IndexPrefix
}

// GetConfigs wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) GetConfigs() []elastic.ClientOptionFunc {
options := make([]elastic.ClientOptionFunc, 3)
Expand Down
39 changes: 22 additions & 17 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
)

const (
dependencyType = "dependencies"
dependencyIndexPrefix = "jaeger-dependencies-"
dependencyType = "dependencies"
dependencyIndex = "jaeger-dependencies-"
)

type timeToDependencies struct {
Expand All @@ -39,23 +39,28 @@ type timeToDependencies struct {

// DependencyStore handles all queries and insertions to ElasticSearch dependencies
type DependencyStore struct {
ctx context.Context
client es.Client
logger *zap.Logger
ctx context.Context
client es.Client
logger *zap.Logger
dependencyIndexPrefix string
}

// NewDependencyStore returns a DependencyStore
func NewDependencyStore(client es.Client, logger *zap.Logger) *DependencyStore {
func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string) *DependencyStore {
if indexPrefix != "" {
indexPrefix += ":"
}
return &DependencyStore{
ctx: context.Background(),
client: client,
logger: logger,
ctx: context.Background(),
client: client,
logger: logger,
dependencyIndexPrefix: indexPrefix + dependencyIndex,
}
}

// WriteDependencies implements dependencystore.Writer#WriteDependencies.
func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error {
indexName := indexName(ts)
indexName := indexWithDate(s.dependencyIndexPrefix, ts)
if err := s.createIndex(indexName); err != nil {
return err
}
Expand All @@ -80,7 +85,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
searchResult, err := s.client.Search(getIndices(endTs, lookback)...).
searchResult, err := s.client.Search(getIndices(s.dependencyIndexPrefix, endTs, lookback)...).
Type(dependencyType).
Size(10000). // the default elasticsearch allowed limit
Query(buildTSQuery(endTs, lookback)).
Expand All @@ -107,18 +112,18 @@ func buildTSQuery(endTs time.Time, lookback time.Duration) elastic.Query {
return elastic.NewRangeQuery("timestamp").Gte(endTs.Add(-lookback)).Lte(endTs)
}

func getIndices(ts time.Time, lookback time.Duration) []string {
func getIndices(prefix string, ts time.Time, lookback time.Duration) []string {
var indices []string
firstIndex := indexName(ts.Add(-lookback))
currentIndex := indexName(ts)
firstIndex := indexWithDate(prefix, ts.Add(-lookback))
currentIndex := indexWithDate(prefix, ts)
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
ts = ts.Add(-24 * time.Hour)
currentIndex = indexName(ts)
currentIndex = indexWithDate(prefix, ts)
}
return append(indices, firstIndex)
}

func indexName(date time.Time) string {
return dependencyIndexPrefix + date.UTC().Format("2006-01-02")
func indexWithDate(indexNamePrefix string, date time.Time) string {
return indexNamePrefix + date.UTC().Format("2006-01-02")
}
35 changes: 28 additions & 7 deletions plugin/storage/es/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,30 @@ func withDepStorage(fn func(r *depStorageTest)) {
client: client,
logger: logger,
logBuffer: logBuffer,
storage: NewDependencyStore(client, logger),
storage: NewDependencyStore(client, logger, ""),
}
fn(r)
}

var _ dependencystore.Reader = &DependencyStore{} // check API conformance
var _ dependencystore.Writer = &DependencyStore{} // check API conformance

func TestNewSpanReaderIndexPrefix(t *testing.T) {
testCases := []struct {
prefix string
expected string
}{
{prefix: "", expected: ""},
{prefix: "foo", expected: "foo:"},
{prefix: ":", expected: "::"},
}
for _, testCase := range testCases {
client := &mocks.Client{}
r := NewDependencyStore(client, zap.NewNop(), testCase.prefix)
assert.Equal(t, testCase.expected+dependencyIndex, r.dependencyIndexPrefix)
}
}

func TestWriteDependencies(t *testing.T) {
testCases := []struct {
createIndexError error
Expand All @@ -69,7 +85,7 @@ func TestWriteDependencies(t *testing.T) {
for _, testCase := range testCases {
withDepStorage(func(r *depStorageTest) {
fixedTime := time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC)
indexName := indexName(fixedTime)
indexName := indexWithDate("", fixedTime)

indexService := &mocks.IndicesCreateService{}
writeService := &mocks.IndexService{}
Expand Down Expand Up @@ -174,26 +190,31 @@ func TestGetIndices(t *testing.T) {
testCases := []struct {
expected []string
lookback time.Duration
prefix string
}{
{
expected: []string{indexName(fixedTime), indexName(fixedTime.Add(-24 * time.Hour))},
expected: []string{indexWithDate("", fixedTime), indexWithDate("", fixedTime.Add(-24*time.Hour))},
lookback: 23 * time.Hour,
prefix: "",
},
{
expected: []string{indexName(fixedTime), indexName(fixedTime.Add(-24 * time.Hour))},
expected: []string{indexWithDate("", fixedTime), indexWithDate("", fixedTime.Add(-24*time.Hour))},
lookback: 13 * time.Hour,
prefix: "",
},
{
expected: []string{indexName(fixedTime)},
expected: []string{indexWithDate("foo:", fixedTime)},
lookback: 1 * time.Hour,
prefix: "foo:",
},
{
expected: []string{indexName(fixedTime)},
expected: []string{indexWithDate("foo-", fixedTime)},
lookback: 0,
prefix: "foo-",
},
}
for _, testCase := range testCases {
assert.EqualValues(t, testCase.expected, getIndices(fixedTime, testCase.lookback))
assert.EqualValues(t, testCase.expected, getIndices(testCase.prefix, fixedTime, testCase.lookback))
}
}

Expand Down
11 changes: 9 additions & 2 deletions plugin/storage/es/esCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,23 @@

def main():
if len(sys.argv) == 1:
print('USAGE: [TIMEOUT=(default 120)] %s NUM_OF_DAYS HOSTNAME[:PORT] ...' % sys.argv[0])
print('USAGE: [TIMEOUT=(default 120)] [INDEX_PREFIX=(default "")] %s NUM_OF_DAYS HOSTNAME[:PORT] ...' % sys.argv[0])
print('Specify a NUM_OF_DAYS that will delete indices that are older than the given NUM_OF_DAYS.')
print('HOSTNAME ... specifies which ElasticSearch hosts to search and delete indices from.')
print('INDEX_PREFIX ... specifies index prefix.')
sys.exit(1)

client = elasticsearch.Elasticsearch(sys.argv[2:])

ilo = curator.IndexList(client)
empty_list(ilo, 'ElasticSearch has no indices')
ilo.filter_by_regex(kind='prefix', value='jaeger-')

prefix = os.getenv("INDEX_PREFIX", '')
if prefix != '':
prefix += ':'
prefix += 'jaeger'

ilo.filter_by_regex(kind='prefix', value=prefix)
ilo.filter_by_age(source='name', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=int(sys.argv[1]))
empty_list(ilo, 'No indices to delete')

Expand Down
6 changes: 3 additions & 3 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,16 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
cfg := f.primaryConfig
return esSpanStore.NewSpanReader(f.primaryClient, f.logger, cfg.GetMaxSpanAge(), f.metricsFactory), nil
return esSpanStore.NewSpanReader(f.primaryClient, f.logger, cfg.GetMaxSpanAge(), f.metricsFactory, cfg.GetIndexPrefix()), nil
}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
cfg := f.primaryConfig
return esSpanStore.NewSpanWriter(f.primaryClient, f.logger, f.metricsFactory, cfg.GetNumShards(), cfg.GetNumReplicas()), nil
return esSpanStore.NewSpanWriter(f.primaryClient, f.logger, f.metricsFactory, cfg.GetNumShards(), cfg.GetNumReplicas(), cfg.GetIndexPrefix()), nil
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return esDepStore.NewDependencyStore(f.primaryClient, f.logger), nil
return esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix()), nil
}
6 changes: 6 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
suffixBulkWorkers = ".bulk.workers"
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixIndexPrefix = ".index-prefix"
)

// TODO this should be moved next to config.Configuration struct (maybe ./flags package)
Expand Down Expand Up @@ -141,6 +142,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixBulkFlushInterval,
nsConfig.BulkFlushInterval,
"A time.Duration after which bulk requests are committed, regardless of other tresholds. Set to zero to disable. By default, this is disabled.")
flagSet.String(
nsConfig.namespace+suffixIndexPrefix,
nsConfig.IndexPrefix,
"Optional prefix of Jaeger indices. For example \"production\" creates \"production:jaeger-*\".")
}

// InitFromViper initializes Options with properties from viper
Expand All @@ -163,6 +168,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.BulkWorkers = v.GetInt(cfg.namespace + suffixBulkWorkers)
cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions)
cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval)
cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix)
}

// GetPrimary returns primary configuration.
Expand Down
37 changes: 20 additions & 17 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
)

const (
spanIndexPrefix = "jaeger-span-"
serviceIndexPrefix = "jaeger-service-"
spanIndex = "jaeger-span-"
serviceIndex = "jaeger-service-"
traceIDAggregation = "traceIDs"

traceIDField = "traceID"
Expand Down Expand Up @@ -88,21 +88,28 @@ type SpanReader struct {
// this will be rounded down to UTC 00:00 of that day.
maxLookback time.Duration
serviceOperationStorage *ServiceOperationStorage
spanIndexPrefix string
serviceIndexPrefix string
}

// NewSpanReader returns a new SpanReader with a metrics.
func NewSpanReader(client es.Client, logger *zap.Logger, maxLookback time.Duration, metricsFactory metrics.Factory) spanstore.Reader {
return storageMetrics.NewReadMetricsDecorator(newSpanReader(client, logger, maxLookback), metricsFactory)
func NewSpanReader(client es.Client, logger *zap.Logger, maxLookback time.Duration, metricsFactory metrics.Factory, indexPrefix string) spanstore.Reader {
return storageMetrics.NewReadMetricsDecorator(newSpanReader(client, logger, maxLookback, indexPrefix), metricsFactory)
}

func newSpanReader(client es.Client, logger *zap.Logger, maxLookback time.Duration) *SpanReader {
func newSpanReader(client es.Client, logger *zap.Logger, maxLookback time.Duration, indexPrefix string) *SpanReader {
ctx := context.Background()
if indexPrefix != "" {
indexPrefix += ":"
}
return &SpanReader{
ctx: ctx,
client: client,
logger: logger,
maxLookback: maxLookback,
serviceOperationStorage: NewServiceOperationStorage(ctx, client, metrics.NullFactory, logger, 0), // the decorator takes care of metrics
spanIndexPrefix: indexPrefix + spanIndex,
serviceIndexPrefix: indexPrefix + serviceIndex,
}
}

Expand Down Expand Up @@ -147,33 +154,29 @@ func (s *SpanReader) unmarshalJSONSpan(esSpanRaw *elastic.SearchHit) (*jModel.Sp
}

// Returns the array of indices that we need to query, based on query params
func findIndices(prefix string, startTime time.Time, endTime time.Time) []string {
func (s *SpanReader) indicesForTimeRange(indexName string, startTime time.Time, endTime time.Time) []string {
var indices []string
firstIndex := indexWithDate(prefix, startTime)
currentIndex := indexWithDate(prefix, endTime)
firstIndex := indexWithDate(indexName, startTime)
currentIndex := indexWithDate(indexName, endTime)
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
endTime = endTime.Add(-24 * time.Hour)
currentIndex = indexWithDate(prefix, endTime)
currentIndex = indexWithDate(indexName, endTime)
}
return append(indices, firstIndex)
}

func indexWithDate(prefix string, date time.Time) string {
return prefix + date.UTC().Format("2006-01-02")
}

// GetServices returns all services traced by Jaeger, ordered by frequency
func (s *SpanReader) GetServices() ([]string, error) {
currentTime := time.Now()
jaegerIndices := findIndices(serviceIndexPrefix, currentTime.Add(-s.maxLookback), currentTime)
jaegerIndices := s.indicesForTimeRange(s.serviceIndexPrefix, currentTime.Add(-s.maxLookback), currentTime)
return s.serviceOperationStorage.getServices(jaegerIndices)
}

// GetOperations returns all operations for a specific service traced by Jaeger
func (s *SpanReader) GetOperations(service string) ([]string, error) {
currentTime := time.Now()
jaegerIndices := findIndices(serviceIndexPrefix, currentTime.Add(-s.maxLookback), currentTime)
jaegerIndices := s.indicesForTimeRange(s.serviceIndexPrefix, currentTime.Add(-s.maxLookback), currentTime)
return s.serviceOperationStorage.getOperations(jaegerIndices, service)
}

Expand Down Expand Up @@ -214,7 +217,7 @@ func (s *SpanReader) multiRead(traceIDs []string, startTime, endTime time.Time)
var traces []*model.Trace
// Add an hour in both directions so that traces that straddle two indexes are retrieved.
// i.e starts in one and ends in another.
indices := findIndices(spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour))
indices := s.indicesForTimeRange(s.spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour))

nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour))

Expand Down Expand Up @@ -347,7 +350,7 @@ func (s *SpanReader) findTraceIDs(traceQuery *spanstore.TraceQueryParameters) ([
aggregation := s.buildTraceIDAggregation(traceQuery.NumTraces)
boolQuery := s.buildFindTraceIDsQuery(traceQuery)

jaegerIndices := findIndices(spanIndexPrefix, traceQuery.StartTimeMin, traceQuery.StartTimeMax)
jaegerIndices := s.indicesForTimeRange(s.spanIndexPrefix, traceQuery.StartTimeMin, traceQuery.StartTimeMax)

searchService := s.client.Search(jaegerIndices...).
Type(spanType).
Expand Down

0 comments on commit 75af597

Please sign in to comment.