Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for JSONRPC opencensus tracing #1022

Merged
merged 1 commit into from Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
180 changes: 180 additions & 0 deletions tracing/opencensus/jsonrpc.go
@@ -0,0 +1,180 @@
package opencensus

import (
"context"
"net/http"

"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/b3"
"go.opencensus.io/trace"

kithttp "github.com/go-kit/kit/transport/http"
jsonrpc "github.com/go-kit/kit/transport/http/jsonrpc"
)

// JSONRPCClientTrace enables OpenCensus tracing of a Go kit JSONRPC transport client.
func JSONRPCClientTrace(options ...TracerOption) jsonrpc.ClientOption {
cfg := TracerOptions{}

for _, option := range options {
option(&cfg)
}

if !cfg.Public && cfg.HTTPPropagate == nil {
cfg.HTTPPropagate = &b3.HTTPFormat{}
}

clientBefore := jsonrpc.ClientBefore(
func(ctx context.Context, req *http.Request) context.Context {
var name string

if cfg.Name != "" {
name = cfg.Name
} else {
// OpenCensus states Path being default naming for a client span
name = ctx.Value(jsonrpc.ContextKeyRequestMethod).(string)
}

ctx, span := trace.StartSpan(
ctx,
name,
trace.WithSampler(cfg.Sampler),
trace.WithSpanKind(trace.SpanKindClient),
)

span.AddAttributes(
trace.StringAttribute(ochttp.HostAttribute, req.URL.Host),
trace.StringAttribute(ochttp.MethodAttribute, req.Method),
trace.StringAttribute(ochttp.PathAttribute, req.URL.Path),
trace.StringAttribute(ochttp.UserAgentAttribute, req.UserAgent()),
)

if !cfg.Public {
cfg.HTTPPropagate.SpanContextToRequest(span.SpanContext(), req)
}

return ctx
},
)

clientAfter := jsonrpc.ClientAfter(
func(ctx context.Context, res *http.Response) context.Context {
if span := trace.FromContext(ctx); span != nil {
span.SetStatus(ochttp.TraceStatus(res.StatusCode, http.StatusText(res.StatusCode)))
span.AddAttributes(
trace.Int64Attribute(ochttp.StatusCodeAttribute, int64(res.StatusCode)),
)
}
return ctx
},
)

clientFinalizer := jsonrpc.ClientFinalizer(
func(ctx context.Context, err error) {
if span := trace.FromContext(ctx); span != nil {
if err != nil {
span.SetStatus(trace.Status{
Code: trace.StatusCodeUnknown,
Message: err.Error(),
})
}
span.End()
}
},
)

return func(c *jsonrpc.Client) {
clientBefore(c)
clientAfter(c)
clientFinalizer(c)
}
}

// JSONRPCServerTrace enables OpenCensus tracing of a Go kit JSONRPC transport server.
func JSONRPCServerTrace(options ...TracerOption) jsonrpc.ServerOption {
cfg := TracerOptions{}

for _, option := range options {
option(&cfg)
}

if !cfg.Public && cfg.HTTPPropagate == nil {
cfg.HTTPPropagate = &b3.HTTPFormat{}
}

serverBeforeCodec := jsonrpc.ServerBeforeCodec(
func(ctx context.Context, httpReq *http.Request, req jsonrpc.Request) context.Context {
var (
spanContext trace.SpanContext
span *trace.Span
name string
ok bool
)

if cfg.Name != "" {
name = cfg.Name
} else {
name = ctx.Value(jsonrpc.ContextKeyRequestMethod).(string)
if name == "" {
// we can't find the rpc method. probably the
// unaryInterceptor was not wired up.
name = "unknown jsonrpc method"
}
}

spanContext, ok = cfg.HTTPPropagate.SpanContextFromRequest(httpReq)
if ok && !cfg.Public {
ctx, span = trace.StartSpanWithRemoteParent(
ctx,
name,
spanContext,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithSampler(cfg.Sampler),
)
} else {
ctx, span = trace.StartSpan(
ctx,
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithSampler(cfg.Sampler),
)
if ok {
span.AddLink(trace.Link{
TraceID: spanContext.TraceID,
SpanID: spanContext.SpanID,
Type: trace.LinkTypeChild,
Attributes: nil,
})
}
}

span.AddAttributes(
trace.StringAttribute(ochttp.MethodAttribute, httpReq.Method),
trace.StringAttribute(ochttp.PathAttribute, httpReq.URL.Path),
)

return ctx
},
)

serverFinalizer := jsonrpc.ServerFinalizer(
func(ctx context.Context, code int, r *http.Request) {
if span := trace.FromContext(ctx); span != nil {
span.SetStatus(ochttp.TraceStatus(code, http.StatusText(code)))

if rs, ok := ctx.Value(kithttp.ContextKeyResponseSize).(int64); ok {
span.AddAttributes(
trace.Int64Attribute("http.response_size", rs),
)
}

span.End()
}
},
)

return func(s *jsonrpc.Server) {
serverBeforeCodec(s)
serverFinalizer(s)
}
}
194 changes: 194 additions & 0 deletions tracing/opencensus/jsonrpc_test.go
@@ -0,0 +1,194 @@
package opencensus_test

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"

"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/b3"
"go.opencensus.io/plugin/ochttp/propagation/tracecontext"
"go.opencensus.io/trace"
"go.opencensus.io/trace/propagation"

"github.com/go-kit/kit/endpoint"
ockit "github.com/go-kit/kit/tracing/opencensus"
jsonrpc "github.com/go-kit/kit/transport/http/jsonrpc"
)

func TestJSONRPCClientTrace(t *testing.T) {
var (
err error
rec = &recordingExporter{}
rURL, _ = url.Parse("https://httpbin.org/anything")
endpointName = "DummyEndpoint"
)

trace.RegisterExporter(rec)

traces := []struct {
name string
err error
}{
{"", nil},
{"CustomName", nil},
{"", errors.New("dummy-error")},
}

for _, tr := range traces {
clientTracer := ockit.JSONRPCClientTrace(
ockit.WithName(tr.name),
ockit.WithSampler(trace.AlwaysSample()),
)
ep := jsonrpc.NewClient(
rURL,
endpointName,
jsonrpc.ClientRequestEncoder(func(ctx context.Context, i interface{}) (json.RawMessage, error) {
return json.RawMessage(`{}`), nil
}),
jsonrpc.ClientResponseDecoder(func(ctx context.Context, r jsonrpc.Response) (response interface{}, err error) {
return nil, tr.err
}),
clientTracer,
).Endpoint()

ctx, parentSpan := trace.StartSpan(context.Background(), "test")

_, err = ep(ctx, nil)
if want, have := tr.err, err; want != have {
t.Fatalf("unexpected error, want %s, have %s", tr.err.Error(), err.Error())
}

spans := rec.Flush()
if want, have := 1, len(spans); want != have {
t.Fatalf("incorrect number of spans, want %d, have %d", want, have)
}

span := spans[0]
if want, have := parentSpan.SpanContext().SpanID, span.ParentSpanID; want != have {
t.Errorf("incorrect parent ID, want %s, have %s", want, have)
}

if want, have := tr.name, span.Name; want != have && want != "" {
t.Errorf("incorrect span name, want %s, have %s", want, have)
}

if want, have := endpointName, span.Name; want != have && tr.name == "" {
t.Errorf("incorrect span name, want %s, have %s", want, have)
}

code := trace.StatusCodeOK
if tr.err != nil {
code = trace.StatusCodeUnknown

if want, have := err.Error(), span.Status.Message; want != have {
t.Errorf("incorrect span status msg, want %s, have %s", want, have)
}
}

if want, have := int32(code), span.Status.Code; want != have {
t.Errorf("incorrect span status code, want %d, have %d", want, have)
}
}
}

func TestJSONRPCServerTrace(t *testing.T) {
var (
endpointName = "DummyEndpoint"
rec = &recordingExporter{}
)

trace.RegisterExporter(rec)

traces := []struct {
useParent bool
name string
err error
propagation propagation.HTTPFormat
}{
{false, "", nil, nil},
{true, "", nil, nil},
{true, "CustomName", nil, &b3.HTTPFormat{}},
{true, "", errors.New("dummy-error"), &tracecontext.HTTPFormat{}},
}

for _, tr := range traces {
var client http.Client

handler := jsonrpc.NewServer(
jsonrpc.EndpointCodecMap{
endpointName: jsonrpc.EndpointCodec{
Endpoint: endpoint.Nop,
Decode: func(context.Context, json.RawMessage) (interface{}, error) { return nil, nil },
Encode: func(context.Context, interface{}) (json.RawMessage, error) { return nil, tr.err },
},
},
ockit.JSONRPCServerTrace(
ockit.WithName(tr.name),
ockit.WithSampler(trace.AlwaysSample()),
ockit.WithHTTPPropagation(tr.propagation),
),
)

server := httptest.NewServer(handler)
defer server.Close()

jsonStr := []byte(fmt.Sprintf(`{"method":"%s"}`, endpointName))
req, err := http.NewRequest("POST", server.URL, bytes.NewBuffer(jsonStr))
if err != nil {
t.Fatalf("unable to create JSONRPC request: %s", err.Error())
}

if tr.useParent {
client = http.Client{
Transport: &ochttp.Transport{
StartOptions: trace.StartOptions{
Sampler: trace.AlwaysSample(),
},
Propagation: tr.propagation,
},
}
}

resp, err := client.Do(req.WithContext(context.Background()))
if err != nil {
t.Fatalf("unable to send JSONRPC request: %s", err.Error())
}
resp.Body.Close()

spans := rec.Flush()

expectedSpans := 1
if tr.useParent {
expectedSpans++
}

if want, have := expectedSpans, len(spans); want != have {
t.Fatalf("incorrect number of spans, want %d, have %d", want, have)
}

if tr.useParent {
if want, have := spans[1].TraceID, spans[0].TraceID; want != have {
t.Errorf("incorrect trace ID, want %s, have %s", want, have)
}

if want, have := spans[1].SpanID, spans[0].ParentSpanID; want != have {
t.Errorf("incorrect span ID, want %s, have %s", want, have)
}
}

if want, have := tr.name, spans[0].Name; want != have && want != "" {
t.Errorf("incorrect span name, want %s, have %s", want, have)
}

if want, have := endpointName, spans[0].Name; want != have && tr.name == "" {
t.Errorf("incorrect span name, want %s, have %s", want, have)
}
}
}
9 changes: 8 additions & 1 deletion transport/http/jsonrpc/client.go
Expand Up @@ -159,6 +159,8 @@ func (c Client) Endpoint() endpoint.Endpoint {
}()
}

ctx = context.WithValue(ctx, ContextKeyRequestMethod, c.method)

var params json.RawMessage
if params, err = c.enc(ctx, request); err != nil {
return nil, err
Expand Down Expand Up @@ -207,7 +209,12 @@ func (c Client) Endpoint() endpoint.Endpoint {
return nil, err
}

return c.dec(ctx, rpcRes)
response, err := c.dec(ctx, rpcRes)
if err != nil {
return nil, err
}

return response, nil
}
}

Expand Down