Skip to content

Commit

Permalink
Merge pull request #254 from josephwoodward/add-opentelemetry-support
Browse files Browse the repository at this point in the history
Add support for tracing via OpenTelemetry
  • Loading branch information
casualjim committed Nov 28, 2022
2 parents 4425b20 + f454d66 commit 93d335a
Show file tree
Hide file tree
Showing 6 changed files with 389 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.17.1
go-version: 1.18.8

- name: Setup gotestsum
uses: autero1/action-gotestsum@v1.0.0
Expand Down
212 changes: 212 additions & 0 deletions client/opentelemetry.go
@@ -0,0 +1,212 @@
package client

import (
"fmt"
"net/http"
"strings"

"github.com/go-openapi/runtime"
"github.com/go-openapi/strfmt"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
)

const (
instrumentationVersion = "1.0.0"
tracerName = "go-openapi"
)

type config struct {
Tracer trace.Tracer
Propagator propagation.TextMapPropagator
SpanStartOptions []trace.SpanStartOption
SpanNameFormatter func(*runtime.ClientOperation) string
TracerProvider trace.TracerProvider
}

type OpenTelemetryOpt interface {
apply(*config)
}

type optionFunc func(*config)

func (o optionFunc) apply(c *config) {
o(c)
}

// WithTracerProvider specifies a tracer provider to use for creating a tracer.
// If none is specified, the global provider is used.
func WithTracerProvider(provider trace.TracerProvider) OpenTelemetryOpt {
return optionFunc(func(c *config) {
if provider != nil {
c.TracerProvider = provider
}
})
}

// WithPropagators configures specific propagators. If this
// option isn't specified, then the global TextMapPropagator is used.
func WithPropagators(ps propagation.TextMapPropagator) OpenTelemetryOpt {
return optionFunc(func(c *config) {
if ps != nil {
c.Propagator = ps
}
})
}

// WithSpanOptions configures an additional set of
// trace.SpanOptions, which are applied to each new span.
func WithSpanOptions(opts ...trace.SpanStartOption) OpenTelemetryOpt {
return optionFunc(func(c *config) {
c.SpanStartOptions = append(c.SpanStartOptions, opts...)
})
}

// WithSpanNameFormatter takes a function that will be called on every
// request and the returned string will become the Span Name.
func WithSpanNameFormatter(f func(op *runtime.ClientOperation) string) OpenTelemetryOpt {
return optionFunc(func(c *config) {
c.SpanNameFormatter = f
})
}

func defaultTransportFormatter(op *runtime.ClientOperation) string {
if op.ID != "" {
return op.ID
}

return fmt.Sprintf("%s_%s", strings.ToLower(op.Method), op.PathPattern)
}

type openTelemetryTransport struct {
transport runtime.ClientTransport
host string
tracer trace.Tracer
config *config
}

func newOpenTelemetryTransport(transport runtime.ClientTransport, host string, opts []OpenTelemetryOpt) *openTelemetryTransport {
tr := &openTelemetryTransport{
transport: transport,
host: host,
}

defaultOpts := []OpenTelemetryOpt{
WithSpanOptions(trace.WithSpanKind(trace.SpanKindClient)),
WithSpanNameFormatter(defaultTransportFormatter),
WithPropagators(otel.GetTextMapPropagator()),
WithTracerProvider(otel.GetTracerProvider()),
}

c := newConfig(append(defaultOpts, opts...)...)
tr.config = c

return tr
}

func (t *openTelemetryTransport) Submit(op *runtime.ClientOperation) (interface{}, error) {
if op.Context == nil {
return t.transport.Submit(op)
}

params := op.Params
reader := op.Reader

var span trace.Span
defer func() {
if span != nil {
span.End()
}
}()

op.Params = runtime.ClientRequestWriterFunc(func(req runtime.ClientRequest, reg strfmt.Registry) error {
span = t.newOpenTelemetrySpan(op, req.GetHeaderParams())
return params.WriteToRequest(req, reg)
})

op.Reader = runtime.ClientResponseReaderFunc(func(response runtime.ClientResponse, consumer runtime.Consumer) (interface{}, error) {
if span != nil {
statusCode := response.Code()
span.SetAttributes(attribute.Int(string(semconv.HTTPStatusCodeKey), statusCode))
span.SetStatus(semconv.SpanStatusFromHTTPStatusCodeAndSpanKind(statusCode, trace.SpanKindClient))
}

return reader.ReadResponse(response, consumer)
})

submit, err := t.transport.Submit(op)
if err != nil && span != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

return submit, err
}

func (t *openTelemetryTransport) newOpenTelemetrySpan(op *runtime.ClientOperation, header http.Header) trace.Span {
ctx := op.Context

tracer := t.tracer
if tracer == nil {
if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
tracer = newTracer(span.TracerProvider())
} else {
tracer = newTracer(otel.GetTracerProvider())
}
}

ctx, span := tracer.Start(ctx, t.config.SpanNameFormatter(op), t.config.SpanStartOptions...)

var scheme string
if len(op.Schemes) > 0 {
scheme = op.Schemes[0]
}

span.SetAttributes(
attribute.String("net.peer.name", t.host),
attribute.String(string(semconv.HTTPRouteKey), op.PathPattern),
attribute.String(string(semconv.HTTPMethodKey), op.Method),
attribute.String("span.kind", trace.SpanKindClient.String()),
attribute.String("http.scheme", scheme),
)

carrier := propagation.HeaderCarrier(header)
t.config.Propagator.Inject(ctx, carrier)

return span
}

func newTracer(tp trace.TracerProvider) trace.Tracer {
return tp.Tracer(tracerName, trace.WithInstrumentationVersion(version()))
}

func newConfig(opts ...OpenTelemetryOpt) *config {
c := &config{
Propagator: otel.GetTextMapPropagator(),
}

for _, opt := range opts {
opt.apply(c)
}

// Tracer is only initialized if manually specified. Otherwise, can be passed with the tracing context.
if c.TracerProvider != nil {
c.Tracer = newTracer(c.TracerProvider)
}

return c
}

// Version is the current release version of the go-runtime instrumentation.
func version() string {
return instrumentationVersion
}

// SemVersion is the semantic version to be supplied to tracer/meter creation.
func semVersion() string {
return "semver:" + version()
}
123 changes: 123 additions & 0 deletions client/opentelemetry_test.go
@@ -0,0 +1,123 @@
package client

import (
"context"
"testing"

"github.com/go-openapi/runtime"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
)

func Test_OpenTelemetryRuntime_submit(t *testing.T) {
t.Parallel()

exporter := tracetest.NewInMemoryExporter()

tp := tracesdk.NewTracerProvider(
tracesdk.WithSampler(tracesdk.AlwaysSample()),
tracesdk.WithSyncer(exporter),
)

otel.SetTracerProvider(tp)

tracer := tp.Tracer("go-runtime")
ctx, _ := tracer.Start(context.Background(), "op")

assertOpenTelemetrySubmit(t, testOperation(ctx), exporter, 1)
}

func Test_OpenTelemetryRuntime_submit_nilAuthInfo(t *testing.T) {
t.Parallel()

exporter := tracetest.NewInMemoryExporter()

tp := tracesdk.NewTracerProvider(
tracesdk.WithSampler(tracesdk.AlwaysSample()),
tracesdk.WithSyncer(exporter),
)

otel.SetTracerProvider(tp)

tracer := tp.Tracer("go-runtime")
ctx, _ := tracer.Start(context.Background(), "op")

operation := testOperation(ctx)
operation.AuthInfo = nil
assertOpenTelemetrySubmit(t, operation, exporter, 1)
}

func Test_OpenTelemetryRuntime_submit_nilContext(t *testing.T) {
exporter := tracetest.NewInMemoryExporter()

tp := tracesdk.NewTracerProvider(
tracesdk.WithSampler(tracesdk.AlwaysSample()),
tracesdk.WithSyncer(exporter),
)

otel.SetTracerProvider(tp)

tracer := tp.Tracer("go-runtime")
ctx, _ := tracer.Start(context.Background(), "op")
operation := testOperation(ctx)
operation.Context = nil

assertOpenTelemetrySubmit(t, operation, exporter, 0) // just don't panic
}

func Test_injectOpenTelemetrySpanContext(t *testing.T) {
t.Parallel()

exporter := tracetest.NewInMemoryExporter()

tp := tracesdk.NewTracerProvider(
tracesdk.WithSampler(tracesdk.AlwaysSample()),
tracesdk.WithSyncer(exporter),
)

otel.SetTracerProvider(tp)

tracer := tp.Tracer("go-runtime")
ctx, _ := tracer.Start(context.Background(), "op")
operation := testOperation(ctx)

header := map[string][]string{}
tr := newOpenTelemetryTransport(&mockRuntime{runtime.TestClientRequest{Headers: header}}, "", nil)
tr.config.Propagator = propagation.TraceContext{}
tr.Submit(operation)

assert.Len(t, header, 1)
}

func assertOpenTelemetrySubmit(t *testing.T, operation *runtime.ClientOperation, exporter *tracetest.InMemoryExporter, expectedSpanCount int) {
header := map[string][]string{}
tr := newOpenTelemetryTransport(&mockRuntime{runtime.TestClientRequest{Headers: header}}, "remote_host", nil)

_, err := tr.Submit(operation)
require.NoError(t, err)

spans := exporter.GetSpans()
assert.Len(t, spans, expectedSpanCount)

if expectedSpanCount == 1 {
span := spans[0]
assert.Equal(t, "getCluster", span.Name)
assert.Equal(t, "go-openapi", span.InstrumentationLibrary.Name)
assert.Equal(t, span.Status.Code, codes.Error)
assert.Equal(t, []attribute.KeyValue{
attribute.String("net.peer.name", "remote_host"),
attribute.String("http.route", "/kubernetes-clusters/{cluster_id}"),
attribute.String("http.method", "GET"),
attribute.String("span.kind", trace.SpanKindClient.String()),
attribute.String("http.scheme", "https"),
attribute.Int("http.status_code", 490),
}, span.Attributes)
}
}
10 changes: 9 additions & 1 deletion client/runtime.go
Expand Up @@ -31,13 +31,13 @@ import (
"sync"
"time"

"github.com/go-openapi/strfmt"
"github.com/opentracing/opentracing-go"

"github.com/go-openapi/runtime"
"github.com/go-openapi/runtime/logger"
"github.com/go-openapi/runtime/middleware"
"github.com/go-openapi/runtime/yamlpc"
"github.com/go-openapi/strfmt"
)

// TLSClientOptions to configure client authentication with mutual TLS
Expand Down Expand Up @@ -303,6 +303,14 @@ func (r *Runtime) WithOpenTracing(opts ...opentracing.StartSpanOption) runtime.C
return newOpenTracingTransport(r, r.Host, opts)
}

// WithOpenTelemetry adds opentelemetry support to the provided runtime.
// A new client span is created for each request.
// If the context of the client operation does not contain an active span, no span is created.
// The provided opts are applied to each spans - for example to add global tags.
func (r *Runtime) WithOpenTelemetry(opts ...OpenTelemetryOpt) runtime.ClientTransport {
return newOpenTelemetryTransport(r, r.Host, opts)
}

func (r *Runtime) pickScheme(schemes []string) string {
if v := r.selectScheme(r.schemes); v != "" {
return v
Expand Down

0 comments on commit 93d335a

Please sign in to comment.