Skip to content

Commit

Permalink
Fix reporting of tracing spans from PromQL engine (#2707)
Browse files Browse the repository at this point in the history
* Fix reporting of tracing spans from PromQL engine

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fixed flags setting

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Addressed review comments

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Improve SetStatus()

Signed-off-by: Marco Pracucci <marco@pracucci.com>

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Aug 12, 2022
1 parent 64c5aef commit 5bc7aab
Show file tree
Hide file tree
Showing 5 changed files with 439 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Grafana Mimir

* [BUGFIX] Fix reporting of tracing spans from PromQL engine. #2707

### Mixin

### Jsonnet
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ require (
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.54.0
go.opentelemetry.io/collector/pdata v0.54.0
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/trace v1.7.0
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b
gopkg.in/alecthomas/kingpin.v2 v2.2.6
sigs.k8s.io/kustomize/kyaml v0.13.7
Expand Down Expand Up @@ -209,11 +211,9 @@ require (
go.opentelemetry.io/collector/semconv v0.54.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.32.0 // indirect
go.opentelemetry.io/contrib/propagators/ot v1.4.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/bridge/opentracing v1.5.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
go.opentelemetry.io/otel/sdk v1.7.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
Expand Down
3 changes: 3 additions & 0 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/runtimeconfig"
"github.com/grafana/dskit/services"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql"
prom_storage "github.com/prometheus/prometheus/storage"
"github.com/weaveworks/common/server"
"github.com/weaveworks/common/signals"
"go.opentelemetry.io/otel"
"google.golang.org/grpc/health/grpc_health_v1"
"gopkg.in/yaml.v3"

Expand Down Expand Up @@ -521,6 +523,7 @@ func New(cfg Config) (*Mimir, error) {
}

mimir.setupThanosTracing()
otel.SetTracerProvider(NewOpenTelemetryProviderBridge(opentracing.GlobalTracer()))

if err := mimir.setupModuleManager(); err != nil {
return nil, err
Expand Down
225 changes: 225 additions & 0 deletions pkg/mimir/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ package mimir

import (
"context"
"encoding/binary"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/uber/jaeger-client-go"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
)

Expand All @@ -36,3 +42,222 @@ type wrappedServerStream struct {
func (ss wrappedServerStream) Context() context.Context {
return ss.ctx
}

type OpenTelemetryProviderBridge struct {
tracer opentracing.Tracer
}

func NewOpenTelemetryProviderBridge(tracer opentracing.Tracer) *OpenTelemetryProviderBridge {
return &OpenTelemetryProviderBridge{
tracer: tracer,
}
}

// Tracer creates an implementation of the Tracer interface.
// The instrumentationName must be the name of the library providing
// instrumentation. This name may be the same as the instrumented code
// only if that code provides built-in instrumentation. If the
// instrumentationName is empty, then a implementation defined default
// name will be used instead.
//
// This method must be concurrency safe.
func (p *OpenTelemetryProviderBridge) Tracer(instrumentationName string, opts ...trace.TracerOption) trace.Tracer {
return NewOpenTelemetryTracerBridge(p.tracer, p)
}

type OpenTelemetryTracerBridge struct {
tracer opentracing.Tracer
provider trace.TracerProvider
}

func NewOpenTelemetryTracerBridge(tracer opentracing.Tracer, provider trace.TracerProvider) *OpenTelemetryTracerBridge {
return &OpenTelemetryTracerBridge{
tracer: tracer,
provider: provider,
}
}

// Start creates a span and a context.Context containing the newly-created span.
//
// If the context.Context provided in `ctx` contains a Span then the newly-created
// Span will be a child of that span, otherwise it will be a root span. This behavior
// can be overridden by providing `WithNewRoot()` as a SpanOption, causing the
// newly-created Span to be a root span even if `ctx` contains a Span.
//
// When creating a Span it is recommended to provide all known span attributes using
// the `WithAttributes()` SpanOption as samplers will only have access to the
// attributes provided when a Span is created.
//
// Any Span that is created MUST also be ended. This is the responsibility of the user.
// Implementations of this API may leak memory or other resources if Spans are not ended.
func (t *OpenTelemetryTracerBridge) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
var mappedOptions []opentracing.StartSpanOption

// Map supported options.
if len(opts) > 0 {
mappedOptions = make([]opentracing.StartSpanOption, 0, len(opts))
cfg := trace.NewSpanStartConfig(opts...)

if !cfg.Timestamp().IsZero() {
mappedOptions = append(mappedOptions, opentracing.StartTime(cfg.Timestamp()))
}
if len(cfg.Attributes()) > 0 {
tags := make(map[string]interface{}, len(cfg.Attributes()))

for _, attr := range cfg.Attributes() {
if !attr.Valid() {
continue
}

tags[string(attr.Key)] = attr.Value.AsInterface()
}

mappedOptions = append(mappedOptions, opentracing.Tags(tags))
}
}

span, ctx := opentracing.StartSpanFromContextWithTracer(ctx, t.tracer, spanName, mappedOptions...)
return ctx, NewOpenTelemetrySpanBridge(span, t.provider)
}

type OpenTelemetrySpanBridge struct {
span opentracing.Span
provider trace.TracerProvider
}

func NewOpenTelemetrySpanBridge(span opentracing.Span, provider trace.TracerProvider) *OpenTelemetrySpanBridge {
return &OpenTelemetrySpanBridge{
span: span,
provider: provider,
}
}

// End completes the Span. The Span is considered complete and ready to be
// delivered through the rest of the telemetry pipeline after this method
// is called. Therefore, updates to the Span are not allowed after this
// method has been called.
func (s *OpenTelemetrySpanBridge) End(options ...trace.SpanEndOption) {
if len(options) == 0 {
s.span.Finish()
return
}

cfg := trace.NewSpanEndConfig(options...)
s.span.FinishWithOptions(opentracing.FinishOptions{
FinishTime: cfg.Timestamp(),
})
}

// AddEvent adds an event with the provided name and options.
func (s *OpenTelemetrySpanBridge) AddEvent(name string, options ...trace.EventOption) {
cfg := trace.NewEventConfig(options...)
s.addEvent(name, cfg.Attributes())
}

func (s *OpenTelemetrySpanBridge) addEvent(name string, attributes []attribute.KeyValue) {
s.logFieldWithAttributes(log.Event(name), attributes)
}

// IsRecording returns the recording state of the Span. It will return
// true if the Span is active and events can be recorded.
func (s *OpenTelemetrySpanBridge) IsRecording() bool {
return true
}

// RecordError will record err as an exception span event for this span. An
// additional call to SetStatus is required if the Status of the Span should
// be set to Error, as this method does not change the Span status. If this
// span is not being recorded or err is nil then this method does nothing.
func (s *OpenTelemetrySpanBridge) RecordError(err error, options ...trace.EventOption) {
cfg := trace.NewEventConfig(options...)
s.recordError(err, cfg.Attributes())
}

func (s *OpenTelemetrySpanBridge) recordError(err error, attributes []attribute.KeyValue) {
s.logFieldWithAttributes(log.Error(err), attributes)
}

// SpanContext returns the SpanContext of the Span. The returned SpanContext
// is usable even after the End method has been called for the Span.
func (s *OpenTelemetrySpanBridge) SpanContext() trace.SpanContext {
// We only support Jaeger span context.
sctx, ok := s.span.Context().(jaeger.SpanContext)
if !ok {
return trace.SpanContext{}
}

var flags trace.TraceFlags
flags = flags.WithSampled(sctx.IsSampled())

return trace.NewSpanContext(trace.SpanContextConfig{
TraceID: jaegerToOpenTelemetryTraceID(sctx.TraceID()),
SpanID: jaegerToOpenTelemetrySpanID(sctx.SpanID()),
TraceFlags: flags,

// Unsupported because we can't read it from the Jaeger span context.
Remote: false,
})
}

// SetStatus sets the status of the Span in the form of a code and a
// description, overriding previous values set. The description is only
// included in a status when the code is for an error.
func (s *OpenTelemetrySpanBridge) SetStatus(code codes.Code, description string) {
// We use a log instead of setting tags to have it more prominent in the tracing UI.
s.span.LogFields(log.Uint32("code", uint32(code)), log.String("description", description))
}

// SetName sets the Span name.
func (s *OpenTelemetrySpanBridge) SetName(name string) {
s.span.SetOperationName(name)
}

// SetAttributes sets kv as attributes of the Span. If a key from kv
// already exists for an attribute of the Span it will be overwritten with
// the value contained in kv.
func (s *OpenTelemetrySpanBridge) SetAttributes(kv ...attribute.KeyValue) {
for _, attr := range kv {
if !attr.Valid() {
continue
}

s.span.SetTag(string(attr.Key), attr.Value.AsInterface())
}
}

// TracerProvider returns a TracerProvider that can be used to generate
// additional Spans on the same telemetry pipeline as the current Span.
func (s *OpenTelemetrySpanBridge) TracerProvider() trace.TracerProvider {
return s.provider
}

func (s *OpenTelemetrySpanBridge) logFieldWithAttributes(field log.Field, attributes []attribute.KeyValue) {
if len(attributes) == 0 {
s.span.LogFields(field)
return
}

fields := make([]log.Field, 0, 1+len(attributes))
fields = append(fields, field)

for _, attr := range attributes {
if attr.Valid() {
fields = append(fields, log.Object(string(attr.Key), attr.Value.AsInterface()))
}
}

s.span.LogFields(fields...)
}

func jaegerToOpenTelemetryTraceID(input jaeger.TraceID) trace.TraceID {
var traceID trace.TraceID
binary.BigEndian.PutUint64(traceID[0:8], input.High)
binary.BigEndian.PutUint64(traceID[8:16], input.Low)
return traceID
}

func jaegerToOpenTelemetrySpanID(input jaeger.SpanID) trace.SpanID {
var spanID trace.SpanID
binary.BigEndian.PutUint64(spanID[0:8], uint64(input))
return spanID
}

0 comments on commit 5bc7aab

Please sign in to comment.