Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reporting of tracing spans from PromQL engine #2707

Merged
merged 4 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Grafana Mimir

* [BUGFIX] Fix reporting of tracing spans from PromQL engine. #2707

### Mixin

### Jsonnet
Expand Down
3 changes: 3 additions & 0 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/runtimeconfig"
"github.com/grafana/dskit/services"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql"
prom_storage "github.com/prometheus/prometheus/storage"
"github.com/weaveworks/common/server"
"github.com/weaveworks/common/signals"
"go.opentelemetry.io/otel"
"google.golang.org/grpc/health/grpc_health_v1"
"gopkg.in/yaml.v3"

Expand Down Expand Up @@ -521,6 +523,7 @@ func New(cfg Config) (*Mimir, error) {
}

mimir.setupThanosTracing()
otel.SetTracerProvider(NewOpenTelemetryProviderBridge(opentracing.GlobalTracer()))

if err := mimir.setupModuleManager(); err != nil {
return nil, err
Expand Down
198 changes: 198 additions & 0 deletions pkg/mimir/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ package mimir

import (
"context"
"encoding/binary"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/uber/jaeger-client-go"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
)

Expand All @@ -36,3 +42,195 @@ type wrappedServerStream struct {
func (ss wrappedServerStream) Context() context.Context {
return ss.ctx
}

type OpenTelemetryProviderBridge struct {
tracer opentracing.Tracer
}

func NewOpenTelemetryProviderBridge(tracer opentracing.Tracer) *OpenTelemetryProviderBridge {
return &OpenTelemetryProviderBridge{
tracer: tracer,
}
}

// Tracer creates an implementation of the Tracer interface.
// The instrumentationName must be the name of the library providing
// instrumentation. This name may be the same as the instrumented code
// only if that code provides built-in instrumentation. If the
// instrumentationName is empty, then a implementation defined default
// name will be used instead.
//
// This method must be concurrency safe.
func (p *OpenTelemetryProviderBridge) Tracer(instrumentationName string, opts ...trace.TracerOption) trace.Tracer {
return NewOpenTelemetricTracerBridge(p.tracer, p)
}

type OpenTelemetricTracerBridge struct {
pracucci marked this conversation as resolved.
Show resolved Hide resolved
tracer opentracing.Tracer
provider trace.TracerProvider
}

func NewOpenTelemetricTracerBridge(tracer opentracing.Tracer, provider trace.TracerProvider) *OpenTelemetricTracerBridge {
return &OpenTelemetricTracerBridge{
tracer: tracer,
provider: provider,
}
}

// Start creates a span and a context.Context containing the newly-created span.
//
// If the context.Context provided in `ctx` contains a Span then the newly-created
// Span will be a child of that span, otherwise it will be a root span. This behavior
// can be overridden by providing `WithNewRoot()` as a SpanOption, causing the
// newly-created Span to be a root span even if `ctx` contains a Span.
//
// When creating a Span it is recommended to provide all known span attributes using
// the `WithAttributes()` SpanOption as samplers will only have access to the
// attributes provided when a Span is created.
//
// Any Span that is created MUST also be ended. This is the responsibility of the user.
// Implementations of this API may leak memory or other resources if Spans are not ended.
func (t *OpenTelemetricTracerBridge) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
var mappedOptions []opentracing.StartSpanOption

// Map supported options.
if len(opts) > 0 {
mappedOptions = make([]opentracing.StartSpanOption, 0, len(opts))
cfg := trace.NewSpanStartConfig(opts...)
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved

if !cfg.Timestamp().IsZero() {
mappedOptions = append(mappedOptions, opentracing.StartTime(cfg.Timestamp()))
}
if len(cfg.Attributes()) > 0 {
tags := make(map[string]interface{}, len(cfg.Attributes()))

for _, attr := range cfg.Attributes() {
if !attr.Valid() {
continue
}

tags[string(attr.Key)] = attr.Value.AsString()
}

mappedOptions = append(mappedOptions, opentracing.Tags(tags))
}
}

span, ctx := opentracing.StartSpanFromContextWithTracer(ctx, t.tracer, spanName, mappedOptions...)
return ctx, NewOpenTelemetrySpanBridge(span, t.provider)
}

type OpenTelemetrySpanBridge struct {
span opentracing.Span
provider trace.TracerProvider
}

func NewOpenTelemetrySpanBridge(span opentracing.Span, provider trace.TracerProvider) *OpenTelemetrySpanBridge {
return &OpenTelemetrySpanBridge{
span: span,
provider: provider,
}
}

// End completes the Span. The Span is considered complete and ready to be
// delivered through the rest of the telemetry pipeline after this method
// is called. Therefore, updates to the Span are not allowed after this
// method has been called.
func (s *OpenTelemetrySpanBridge) End(options ...trace.SpanEndOption) {
if len(options) == 0 {
s.span.Finish()
return
}

cfg := trace.NewSpanEndConfig(options...)
s.span.FinishWithOptions(opentracing.FinishOptions{
FinishTime: cfg.Timestamp(),
})
}

// AddEvent adds an event with the provided name and options.
func (s *OpenTelemetrySpanBridge) AddEvent(name string, options ...trace.EventOption) {
// Options are not supported.
s.span.LogFields(log.Event(name))
}

// IsRecording returns the recording state of the Span. It will return
// true if the Span is active and events can be recorded.
func (s *OpenTelemetrySpanBridge) IsRecording() bool {
return true
}

// RecordError will record err as an exception span event for this span. An
// additional call to SetStatus is required if the Status of the Span should
// be set to Error, as this method does not change the Span status. If this
// span is not being recorded or err is nil then this method does nothing.
func (s *OpenTelemetrySpanBridge) RecordError(err error, options ...trace.EventOption) {
// Options are not supported.
s.span.LogFields(log.Error(err))
pracucci marked this conversation as resolved.
Show resolved Hide resolved
}

// SpanContext returns the SpanContext of the Span. The returned SpanContext
// is usable even after the End method has been called for the Span.
func (s *OpenTelemetrySpanBridge) SpanContext() trace.SpanContext {
// We only support Jaeger span context.
sctx, ok := s.span.Context().(jaeger.SpanContext)
if !ok {
return trace.SpanContext{}
}

var flags trace.TraceFlags
flags = flags.WithSampled(sctx.IsSampled())

return trace.NewSpanContext(trace.SpanContextConfig{
TraceID: jaegerToOpenTelemetryTraceID(sctx.TraceID()),
SpanID: jaegerToOpenTelemetrySpanID(sctx.SpanID()),
TraceFlags: flags,

// Unsupported because we can't read it from the Jaeger span context.
Remote: false,
})
}

// SetStatus sets the status of the Span in the form of a code and a
// description, overriding previous values set. The description is only
// included in a status when the code is for an error.
func (s *OpenTelemetrySpanBridge) SetStatus(code codes.Code, description string) {
// Not supported.
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
}

// SetName sets the Span name.
func (s *OpenTelemetrySpanBridge) SetName(name string) {
s.span.SetOperationName(name)
}

// SetAttributes sets kv as attributes of the Span. If a key from kv
// already exists for an attribute of the Span it will be overwritten with
// the value contained in kv.
func (s *OpenTelemetrySpanBridge) SetAttributes(kv ...attribute.KeyValue) {
for _, attr := range kv {
if !attr.Valid() {
continue
}

s.span.SetTag(string(attr.Key), attr.Value.AsString())
}
}

// TracerProvider returns a TracerProvider that can be used to generate
// additional Spans on the same telemetry pipeline as the current Span.
func (s *OpenTelemetrySpanBridge) TracerProvider() trace.TracerProvider {
return s.provider
}

func jaegerToOpenTelemetryTraceID(input jaeger.TraceID) trace.TraceID {
var traceID trace.TraceID
binary.BigEndian.PutUint64(traceID[0:8], input.High)
binary.BigEndian.PutUint64(traceID[8:16], input.Low)
return traceID
}

func jaegerToOpenTelemetrySpanID(input jaeger.SpanID) trace.SpanID {
var spanID trace.SpanID
binary.BigEndian.PutUint64(spanID[0:8], uint64(input))
return spanID
}
48 changes: 48 additions & 0 deletions pkg/mimir/tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// SPDX-License-Identifier: AGPL-3.0-only

package mimir

import (
crand "crypto/rand"
"encoding/binary"
"math/rand"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-client-go"
"go.opentelemetry.io/otel/trace"
)

func TestJaegerToOpenTelemetryTraceID(t *testing.T) {
expected, _ := generateOpenTelemetryIDs()

jaegerTraceID, err := jaeger.TraceIDFromString(expected.String())
require.NoError(t, err)

actual := jaegerToOpenTelemetryTraceID(jaegerTraceID)
assert.Equal(t, expected, actual)
}

func TestJaegerToOpenTelemetrySpanID(t *testing.T) {
_, expected := generateOpenTelemetryIDs()

jaegerSpanID, err := jaeger.SpanIDFromString(expected.String())
require.NoError(t, err)

actual := jaegerToOpenTelemetrySpanID(jaegerSpanID)
assert.Equal(t, expected, actual)
}

// generateOpenTelemetryIDs generated trace and span IDs. The implementation has been copied from open telemetry.
func generateOpenTelemetryIDs() (trace.TraceID, trace.SpanID) {
var rngSeed int64
_ = binary.Read(crand.Reader, binary.LittleEndian, &rngSeed)
randSource := rand.New(rand.NewSource(rngSeed))

tid := trace.TraceID{}
randSource.Read(tid[:])
sid := trace.SpanID{}
randSource.Read(sid[:])
return tid, sid
}