Skip to content

Commit

Permalink
add support for JSONRPC opencensus tracing (#1022)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-lang committed Nov 3, 2020
1 parent 2125fae commit 1e9fdf6
Show file tree
Hide file tree
Showing 6 changed files with 448 additions and 2 deletions.
180 changes: 180 additions & 0 deletions tracing/opencensus/jsonrpc.go
@@ -0,0 +1,180 @@
package opencensus

import (
"context"
"net/http"

"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/b3"
"go.opencensus.io/trace"

kithttp "github.com/go-kit/kit/transport/http"
jsonrpc "github.com/go-kit/kit/transport/http/jsonrpc"
)

// JSONRPCClientTrace enables OpenCensus tracing of a Go kit JSONRPC transport client.
func JSONRPCClientTrace(options ...TracerOption) jsonrpc.ClientOption {
cfg := TracerOptions{}

for _, option := range options {
option(&cfg)
}

if !cfg.Public && cfg.HTTPPropagate == nil {
cfg.HTTPPropagate = &b3.HTTPFormat{}
}

clientBefore := jsonrpc.ClientBefore(
func(ctx context.Context, req *http.Request) context.Context {
var name string

if cfg.Name != "" {
name = cfg.Name
} else {
// OpenCensus states Path being default naming for a client span
name = ctx.Value(jsonrpc.ContextKeyRequestMethod).(string)
}

ctx, span := trace.StartSpan(
ctx,
name,
trace.WithSampler(cfg.Sampler),
trace.WithSpanKind(trace.SpanKindClient),
)

span.AddAttributes(
trace.StringAttribute(ochttp.HostAttribute, req.URL.Host),
trace.StringAttribute(ochttp.MethodAttribute, req.Method),
trace.StringAttribute(ochttp.PathAttribute, req.URL.Path),
trace.StringAttribute(ochttp.UserAgentAttribute, req.UserAgent()),
)

if !cfg.Public {
cfg.HTTPPropagate.SpanContextToRequest(span.SpanContext(), req)
}

return ctx
},
)

clientAfter := jsonrpc.ClientAfter(
func(ctx context.Context, res *http.Response) context.Context {
if span := trace.FromContext(ctx); span != nil {
span.SetStatus(ochttp.TraceStatus(res.StatusCode, http.StatusText(res.StatusCode)))
span.AddAttributes(
trace.Int64Attribute(ochttp.StatusCodeAttribute, int64(res.StatusCode)),
)
}
return ctx
},
)

clientFinalizer := jsonrpc.ClientFinalizer(
func(ctx context.Context, err error) {
if span := trace.FromContext(ctx); span != nil {
if err != nil {
span.SetStatus(trace.Status{
Code: trace.StatusCodeUnknown,
Message: err.Error(),
})
}
span.End()
}
},
)

return func(c *jsonrpc.Client) {
clientBefore(c)
clientAfter(c)
clientFinalizer(c)
}
}

// JSONRPCServerTrace enables OpenCensus tracing of a Go kit JSONRPC transport server.
func JSONRPCServerTrace(options ...TracerOption) jsonrpc.ServerOption {
cfg := TracerOptions{}

for _, option := range options {
option(&cfg)
}

if !cfg.Public && cfg.HTTPPropagate == nil {
cfg.HTTPPropagate = &b3.HTTPFormat{}
}

serverBeforeCodec := jsonrpc.ServerBeforeCodec(
func(ctx context.Context, httpReq *http.Request, req jsonrpc.Request) context.Context {
var (
spanContext trace.SpanContext
span *trace.Span
name string
ok bool
)

if cfg.Name != "" {
name = cfg.Name
} else {
name = ctx.Value(jsonrpc.ContextKeyRequestMethod).(string)
if name == "" {
// we can't find the rpc method. probably the
// unaryInterceptor was not wired up.
name = "unknown jsonrpc method"
}
}

spanContext, ok = cfg.HTTPPropagate.SpanContextFromRequest(httpReq)
if ok && !cfg.Public {
ctx, span = trace.StartSpanWithRemoteParent(
ctx,
name,
spanContext,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithSampler(cfg.Sampler),
)
} else {
ctx, span = trace.StartSpan(
ctx,
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithSampler(cfg.Sampler),
)
if ok {
span.AddLink(trace.Link{
TraceID: spanContext.TraceID,
SpanID: spanContext.SpanID,
Type: trace.LinkTypeChild,
Attributes: nil,
})
}
}

span.AddAttributes(
trace.StringAttribute(ochttp.MethodAttribute, httpReq.Method),
trace.StringAttribute(ochttp.PathAttribute, httpReq.URL.Path),
)

return ctx
},
)

serverFinalizer := jsonrpc.ServerFinalizer(
func(ctx context.Context, code int, r *http.Request) {
if span := trace.FromContext(ctx); span != nil {
span.SetStatus(ochttp.TraceStatus(code, http.StatusText(code)))

if rs, ok := ctx.Value(kithttp.ContextKeyResponseSize).(int64); ok {
span.AddAttributes(
trace.Int64Attribute("http.response_size", rs),
)
}

span.End()
}
},
)

return func(s *jsonrpc.Server) {
serverBeforeCodec(s)
serverFinalizer(s)
}
}
194 changes: 194 additions & 0 deletions tracing/opencensus/jsonrpc_test.go
@@ -0,0 +1,194 @@
package opencensus_test

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"

"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/b3"
"go.opencensus.io/plugin/ochttp/propagation/tracecontext"
"go.opencensus.io/trace"
"go.opencensus.io/trace/propagation"

"github.com/go-kit/kit/endpoint"
ockit "github.com/go-kit/kit/tracing/opencensus"
jsonrpc "github.com/go-kit/kit/transport/http/jsonrpc"
)

func TestJSONRPCClientTrace(t *testing.T) {
var (
err error
rec = &recordingExporter{}
rURL, _ = url.Parse("https://httpbin.org/anything")
endpointName = "DummyEndpoint"
)

trace.RegisterExporter(rec)

traces := []struct {
name string
err error
}{
{"", nil},
{"CustomName", nil},
{"", errors.New("dummy-error")},
}

for _, tr := range traces {
clientTracer := ockit.JSONRPCClientTrace(
ockit.WithName(tr.name),
ockit.WithSampler(trace.AlwaysSample()),
)
ep := jsonrpc.NewClient(
rURL,
endpointName,
jsonrpc.ClientRequestEncoder(func(ctx context.Context, i interface{}) (json.RawMessage, error) {
return json.RawMessage(`{}`), nil
}),
jsonrpc.ClientResponseDecoder(func(ctx context.Context, r jsonrpc.Response) (response interface{}, err error) {
return nil, tr.err
}),
clientTracer,
).Endpoint()

ctx, parentSpan := trace.StartSpan(context.Background(), "test")

_, err = ep(ctx, nil)
if want, have := tr.err, err; want != have {
t.Fatalf("unexpected error, want %s, have %s", tr.err.Error(), err.Error())
}

spans := rec.Flush()
if want, have := 1, len(spans); want != have {
t.Fatalf("incorrect number of spans, want %d, have %d", want, have)
}

span := spans[0]
if want, have := parentSpan.SpanContext().SpanID, span.ParentSpanID; want != have {
t.Errorf("incorrect parent ID, want %s, have %s", want, have)
}

if want, have := tr.name, span.Name; want != have && want != "" {
t.Errorf("incorrect span name, want %s, have %s", want, have)
}

if want, have := endpointName, span.Name; want != have && tr.name == "" {
t.Errorf("incorrect span name, want %s, have %s", want, have)
}

code := trace.StatusCodeOK
if tr.err != nil {
code = trace.StatusCodeUnknown

if want, have := err.Error(), span.Status.Message; want != have {
t.Errorf("incorrect span status msg, want %s, have %s", want, have)
}
}

if want, have := int32(code), span.Status.Code; want != have {
t.Errorf("incorrect span status code, want %d, have %d", want, have)
}
}
}

func TestJSONRPCServerTrace(t *testing.T) {
var (
endpointName = "DummyEndpoint"
rec = &recordingExporter{}
)

trace.RegisterExporter(rec)

traces := []struct {
useParent bool
name string
err error
propagation propagation.HTTPFormat
}{
{false, "", nil, nil},
{true, "", nil, nil},
{true, "CustomName", nil, &b3.HTTPFormat{}},
{true, "", errors.New("dummy-error"), &tracecontext.HTTPFormat{}},
}

for _, tr := range traces {
var client http.Client

handler := jsonrpc.NewServer(
jsonrpc.EndpointCodecMap{
endpointName: jsonrpc.EndpointCodec{
Endpoint: endpoint.Nop,
Decode: func(context.Context, json.RawMessage) (interface{}, error) { return nil, nil },
Encode: func(context.Context, interface{}) (json.RawMessage, error) { return nil, tr.err },
},
},
ockit.JSONRPCServerTrace(
ockit.WithName(tr.name),
ockit.WithSampler(trace.AlwaysSample()),
ockit.WithHTTPPropagation(tr.propagation),
),
)

server := httptest.NewServer(handler)
defer server.Close()

jsonStr := []byte(fmt.Sprintf(`{"method":"%s"}`, endpointName))
req, err := http.NewRequest("POST", server.URL, bytes.NewBuffer(jsonStr))
if err != nil {
t.Fatalf("unable to create JSONRPC request: %s", err.Error())
}

if tr.useParent {
client = http.Client{
Transport: &ochttp.Transport{
StartOptions: trace.StartOptions{
Sampler: trace.AlwaysSample(),
},
Propagation: tr.propagation,
},
}
}

resp, err := client.Do(req.WithContext(context.Background()))
if err != nil {
t.Fatalf("unable to send JSONRPC request: %s", err.Error())
}
resp.Body.Close()

spans := rec.Flush()

expectedSpans := 1
if tr.useParent {
expectedSpans++
}

if want, have := expectedSpans, len(spans); want != have {
t.Fatalf("incorrect number of spans, want %d, have %d", want, have)
}

if tr.useParent {
if want, have := spans[1].TraceID, spans[0].TraceID; want != have {
t.Errorf("incorrect trace ID, want %s, have %s", want, have)
}

if want, have := spans[1].SpanID, spans[0].ParentSpanID; want != have {
t.Errorf("incorrect span ID, want %s, have %s", want, have)
}
}

if want, have := tr.name, spans[0].Name; want != have && want != "" {
t.Errorf("incorrect span name, want %s, have %s", want, have)
}

if want, have := endpointName, spans[0].Name; want != have && tr.name == "" {
t.Errorf("incorrect span name, want %s, have %s", want, have)
}
}
}
9 changes: 8 additions & 1 deletion transport/http/jsonrpc/client.go
Expand Up @@ -159,6 +159,8 @@ func (c Client) Endpoint() endpoint.Endpoint {
}()
}

ctx = context.WithValue(ctx, ContextKeyRequestMethod, c.method)

var params json.RawMessage
if params, err = c.enc(ctx, request); err != nil {
return nil, err
Expand Down Expand Up @@ -207,7 +209,12 @@ func (c Client) Endpoint() endpoint.Endpoint {
return nil, err
}

return c.dec(ctx, rpcRes)
response, err := c.dec(ctx, rpcRes)
if err != nil {
return nil, err
}

return response, nil
}
}

Expand Down

0 comments on commit 1e9fdf6

Please sign in to comment.