Skip to content

Commit

Permalink
Tenancy for memory storage (#3827)
Browse files Browse the repository at this point in the history
  • Loading branch information
esnible committed Jul 23, 2022
1 parent ddca3c8 commit 7b33160
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 50 deletions.
2 changes: 1 addition & 1 deletion plugin/storage/memory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (f *Factory) InitFromOptions(opts Options) {
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger
f.store = WithConfiguration(f.options.Configuration)
logger.Info("Memory storage initialized", zap.Any("configuration", f.store.config))
logger.Info("Memory storage initialized", zap.Any("configuration", f.store.defaultConfig))
f.publishOpts()

return nil
Expand Down
85 changes: 63 additions & 22 deletions plugin/storage/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,21 @@ import (
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// Store is an in-memory store of traces
type Store struct {
sync.RWMutex
// Each tenant gets a copy of default config.
// In the future this can be extended to contain per-tenant configuration.
defaultConfig config.Configuration
perTenant map[string]*Tenant
}

// Tenant is an in-memory store of traces for a single tenant
type Tenant struct {
sync.RWMutex
ids []*model.TraceID
traces map[model.TraceID]*model.Trace
Expand All @@ -50,17 +60,42 @@ func NewStore() *Store {
// WithConfiguration creates a new in memory storage based on the given configuration
func WithConfiguration(configuration config.Configuration) *Store {
return &Store{
ids: make([]*model.TraceID, configuration.MaxTraces),
defaultConfig: configuration,
perTenant: make(map[string]*Tenant),
}
}

func newTenant(cfg config.Configuration) *Tenant {
return &Tenant{
ids: make([]*model.TraceID, cfg.MaxTraces),
traces: map[model.TraceID]*model.Trace{},
services: map[string]struct{}{},
operations: map[string]map[spanstore.Operation]struct{}{},
deduper: adjuster.SpanIDDeduper(),
config: configuration,
config: cfg,
}
}

// getTenant returns the per-tenant storage. Note that tenantID has already been checked for by the collector or query
func (st *Store) getTenant(tenantID string) *Tenant {
st.RLock()
tenant, ok := st.perTenant[tenantID]
st.RUnlock()
if !ok {
st.Lock()
defer st.Unlock()
tenant, ok = st.perTenant[tenantID]
if !ok {
tenant = newTenant(st.defaultConfig)
st.perTenant[tenantID] = tenant
}
}
return tenant
}

// GetDependencies returns dependencies between services
func (m *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
m := st.getTenant(tenancy.GetTenant(ctx))
// deduper used below can modify the spans, so we take an exclusive lock
m.Lock()
defer m.Unlock()
Expand All @@ -69,9 +104,9 @@ func (m *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback t
for _, orig := range m.traces {
// SpanIDDeduper never returns an err
trace, _ := m.deduper.Adjust(orig)
if m.traceIsBetweenStartAndEnd(startTs, endTs, trace) {
if traceIsBetweenStartAndEnd(startTs, endTs, trace) {
for _, s := range trace.Spans {
parentSpan := m.findSpan(trace, s.ParentSpanID())
parentSpan := findSpan(trace, s.ParentSpanID())
if parentSpan != nil {
if parentSpan.Process.ServiceName == s.Process.ServiceName {
continue
Expand All @@ -97,7 +132,7 @@ func (m *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback t
return retMe, nil
}

func (m *Store) findSpan(trace *model.Trace, spanID model.SpanID) *model.Span {
func findSpan(trace *model.Trace, spanID model.SpanID) *model.Span {
for _, s := range trace.Spans {
if s.SpanID == spanID {
return s
Expand All @@ -106,7 +141,7 @@ func (m *Store) findSpan(trace *model.Trace, spanID model.SpanID) *model.Span {
return nil
}

func (m *Store) traceIsBetweenStartAndEnd(startTs, endTs time.Time, trace *model.Trace) bool {
func traceIsBetweenStartAndEnd(startTs, endTs time.Time, trace *model.Trace) bool {
for _, s := range trace.Spans {
if s.StartTime.After(startTs) && endTs.After(s.StartTime) {
return true
Expand All @@ -116,7 +151,8 @@ func (m *Store) traceIsBetweenStartAndEnd(startTs, endTs time.Time, trace *model
}

// WriteSpan writes the given span
func (m *Store) WriteSpan(ctx context.Context, span *model.Span) error {
func (st *Store) WriteSpan(ctx context.Context, span *model.Span) error {
m := st.getTenant(tenancy.GetTenant(ctx))
m.Lock()
defer m.Unlock()
if _, ok := m.operations[span.Process.ServiceName]; !ok {
Expand Down Expand Up @@ -159,18 +195,19 @@ func (m *Store) WriteSpan(ctx context.Context, span *model.Span) error {
}

// GetTrace gets a trace
func (m *Store) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
func (st *Store) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
m := st.getTenant(tenancy.GetTenant(ctx))
m.RLock()
defer m.RUnlock()
trace, ok := m.traces[traceID]
if !ok {
return nil, spanstore.ErrTraceNotFound
}
return m.copyTrace(trace)
return copyTrace(trace)
}

// Spans may still be added to traces after they are returned to user code, so make copies.
func (m *Store) copyTrace(trace *model.Trace) (*model.Trace, error) {
func copyTrace(trace *model.Trace) (*model.Trace, error) {
bytes, err := proto.Marshal(trace)
if err != nil {
return nil, err
Expand All @@ -182,7 +219,8 @@ func (m *Store) copyTrace(trace *model.Trace) (*model.Trace, error) {
}

// GetServices returns a list of all known services
func (m *Store) GetServices(ctx context.Context) ([]string, error) {
func (st *Store) GetServices(ctx context.Context) ([]string, error) {
m := st.getTenant(tenancy.GetTenant(ctx))
m.RLock()
defer m.RUnlock()
var retMe []string
Expand All @@ -193,10 +231,11 @@ func (m *Store) GetServices(ctx context.Context) ([]string, error) {
}

// GetOperations returns the operations of a given service
func (m *Store) GetOperations(
func (st *Store) GetOperations(
ctx context.Context,
query spanstore.OperationQueryParameters,
) ([]spanstore.Operation, error) {
m := st.getTenant(tenancy.GetTenant(ctx))
m.RLock()
defer m.RUnlock()
var retMe []spanstore.Operation
Expand All @@ -211,13 +250,14 @@ func (m *Store) GetOperations(
}

// FindTraces returns all traces in the query parameters are satisfied by a trace's span
func (m *Store) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
func (st *Store) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
m := st.getTenant(tenancy.GetTenant(ctx))
m.RLock()
defer m.RUnlock()
var retMe []*model.Trace
for _, trace := range m.traces {
if m.validTrace(trace, query) {
copied, err := m.copyTrace(trace)
if validTrace(trace, query) {
copied, err := copyTrace(trace)
if err != nil {
return nil, err
}
Expand All @@ -243,9 +283,9 @@ func (m *Store) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryPar
return nil, errors.New("not implemented")
}

func (m *Store) validTrace(trace *model.Trace, query *spanstore.TraceQueryParameters) bool {
func validTrace(trace *model.Trace, query *spanstore.TraceQueryParameters) bool {
for _, span := range trace.Spans {
if m.validSpan(span, query) {
if validSpan(span, query) {
return true
}
}
Expand All @@ -261,7 +301,7 @@ func findKeyValueMatch(kvs model.KeyValues, key, value string) (model.KeyValue,
return model.KeyValue{}, false
}

func (m *Store) validSpan(span *model.Span, query *spanstore.TraceQueryParameters) bool {
func validSpan(span *model.Span, query *spanstore.TraceQueryParameters) bool {
if query.ServiceName != span.Process.ServiceName {
return false
}
Expand All @@ -280,7 +320,7 @@ func (m *Store) validSpan(span *model.Span, query *spanstore.TraceQueryParameter
if !query.StartTimeMax.IsZero() && span.StartTime.After(query.StartTimeMax) {
return false
}
spanKVs := m.flattenTags(span)
spanKVs := flattenTags(span)
for queryK, queryV := range query.Tags {
// (NB): we cannot use the KeyValues.FindKey function because there can be multiple tags with the same key
if _, ok := findKeyValueMatch(spanKVs, queryK, queryV); !ok {
Expand All @@ -290,8 +330,9 @@ func (m *Store) validSpan(span *model.Span, query *spanstore.TraceQueryParameter
return true
}

func (m *Store) flattenTags(span *model.Span) model.KeyValues {
retMe := span.Tags
func flattenTags(span *model.Span) model.KeyValues {
retMe := []model.KeyValue{}
retMe = append(retMe, span.Tags...)
retMe = append(retMe, span.Process.Tags...)
for _, l := range span.Logs {
retMe = append(retMe, l.Fields...)
Expand Down
114 changes: 87 additions & 27 deletions plugin/storage/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,19 @@ import (

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

var traceID = model.NewTraceID(1, 2)
var (
traceID = model.NewTraceID(1, 2)
testingSpan = makeTestingSpan(traceID, "")
)

var testingSpan = &model.Span{
TraceID: traceID,
SpanID: model.NewSpanID(1),
Process: &model.Process{
ServiceName: "serviceName",
Tags: []model.KeyValue(nil),
},
OperationName: "operationName",
Tags: model.KeyValues{
model.String("tagKey", "tagValue"),
model.String("span.kind", "client"),
},
Logs: []model.Log{
{
Timestamp: time.Now().UTC(),
Fields: []model.KeyValue{
model.String("logKey", "logValue"),
},
},
},
Duration: time.Second * 5,
StartTime: time.Unix(300, 0).UTC(),
}
var (
traceID2 = model.NewTraceID(2, 3)
testingSpan2 = makeTestingSpan(traceID2, "2")
)

var childSpan1 = &model.Span{
TraceID: traceID,
Expand Down Expand Up @@ -147,6 +132,7 @@ func withMemoryStore(f func(store *Store)) {
}

func TestStoreGetEmptyDependencies(t *testing.T) {
// assert.Equal(t, testingSpan, testingSpan1B) // @@@
withMemoryStore(func(store *Store) {
links, err := store.GetDependencies(context.Background(), time.Now(), time.Hour)
assert.NoError(t, err)
Expand Down Expand Up @@ -206,8 +192,8 @@ func TestStoreWithLimit(t *testing.T) {
assert.NoError(t, err)
}

assert.Equal(t, maxTraces, len(store.traces))
assert.Equal(t, maxTraces, len(store.ids))
assert.Equal(t, maxTraces, len(store.getTenant("").traces))
assert.Equal(t, maxTraces, len(store.getTenant("").ids))
}

func TestStoreGetTraceSuccess(t *testing.T) {
Expand Down Expand Up @@ -239,7 +225,7 @@ func TestStoreGetAndMutateTrace(t *testing.T) {

func TestStoreGetTraceError(t *testing.T) {
withPopulatedMemoryStore(func(store *Store) {
store.traces[testingSpan.TraceID] = &model.Trace{
store.getTenant("").traces[testingSpan.TraceID] = &model.Trace{
Spans: []*model.Span{nonSerializableSpan},
}
_, err := store.GetTrace(context.Background(), testingSpan.TraceID)
Expand Down Expand Up @@ -463,3 +449,77 @@ func TestStore_FindTraceIDs(t *testing.T) {
assert.EqualError(t, err, "not implemented")
})
}

func TestTenantStore(t *testing.T) {
withMemoryStore(func(store *Store) {
ctxAcme := tenancy.WithTenant(context.Background(), "acme")
ctxWonka := tenancy.WithTenant(context.Background(), "wonka")

assert.NoError(t, store.WriteSpan(ctxAcme, testingSpan))
assert.NoError(t, store.WriteSpan(ctxWonka, testingSpan2))

// Can we retrieve the spans with correct tenancy
trace1, err := store.GetTrace(ctxAcme, testingSpan.TraceID)
assert.NoError(t, err)
assert.Len(t, trace1.Spans, 1)
assert.Equal(t, testingSpan, trace1.Spans[0])

trace2, err := store.GetTrace(ctxWonka, testingSpan2.TraceID)
assert.NoError(t, err)
assert.Len(t, trace2.Spans, 1)
assert.Equal(t, testingSpan2, trace2.Spans[0])

// Can we query the spans with correct tenancy
traces1, err := store.FindTraces(ctxAcme, &spanstore.TraceQueryParameters{
ServiceName: "serviceName",
})
assert.NoError(t, err)
assert.Len(t, traces1, 1)
assert.Len(t, traces1[0].Spans, 1)
assert.Equal(t, testingSpan, traces1[0].Spans[0])

traces2, err := store.FindTraces(ctxWonka, &spanstore.TraceQueryParameters{
ServiceName: "serviceName2",
})
assert.NoError(t, err)
assert.Len(t, traces2, 1)
assert.Len(t, traces2[0].Spans, 1)
assert.Equal(t, testingSpan2, traces2[0].Spans[0])

// Do the spans fail with incorrect tenancy?
_, err = store.GetTrace(ctxAcme, testingSpan2.TraceID)
assert.Error(t, err)

_, err = store.GetTrace(ctxWonka, testingSpan.TraceID)
assert.Error(t, err)

_, err = store.GetTrace(context.Background(), testingSpan.TraceID)
assert.Error(t, err)
})
}

func makeTestingSpan(traceID model.TraceID, suffix string) *model.Span {
return &model.Span{
TraceID: traceID,
SpanID: model.NewSpanID(1),
Process: &model.Process{
ServiceName: "serviceName" + suffix,
Tags: []model.KeyValue(nil),
},
OperationName: "operationName" + suffix,
Tags: model.KeyValues{
model.String("tagKey", "tagValue"+suffix),
model.String("span.kind", "client"+suffix),
},
Logs: []model.Log{
{
Timestamp: time.Now().UTC(),
Fields: []model.KeyValue{
model.String("logKey", "logValue"+suffix),
},
},
},
Duration: time.Second * 5,
StartTime: time.Unix(300, 0).UTC(),
}
}

0 comments on commit 7b33160

Please sign in to comment.