From 8dd04cc9f5b7b7558f61bc909f4c220fec9612d7 Mon Sep 17 00:00:00 2001 From: Ryan Lang Date: Fri, 9 Oct 2020 11:55:42 -0700 Subject: [PATCH] add support for JSONRPC opencensus tracing --- tracing/opencensus/jsonrpc.go | 180 ++++++++++++++++ tracing/opencensus/jsonrpc_test.go | 194 ++++++++++++++++++ transport/http/jsonrpc/client.go | 9 +- .../http/jsonrpc/request_response_types.go | 17 +- transport/http/jsonrpc/server.go | 14 ++ transport/http/jsonrpc/server_test.go | 36 ++++ 6 files changed, 448 insertions(+), 2 deletions(-) create mode 100644 tracing/opencensus/jsonrpc.go create mode 100644 tracing/opencensus/jsonrpc_test.go diff --git a/tracing/opencensus/jsonrpc.go b/tracing/opencensus/jsonrpc.go new file mode 100644 index 000000000..ea2b0180c --- /dev/null +++ b/tracing/opencensus/jsonrpc.go @@ -0,0 +1,180 @@ +package opencensus + +import ( + "context" + "net/http" + + "go.opencensus.io/plugin/ochttp" + "go.opencensus.io/plugin/ochttp/propagation/b3" + "go.opencensus.io/trace" + + kithttp "github.com/go-kit/kit/transport/http" + jsonrpc "github.com/go-kit/kit/transport/http/jsonrpc" +) + +// JSONRPCClientTrace enables OpenCensus tracing of a Go kit JSONRPC transport client. +func JSONRPCClientTrace(options ...TracerOption) jsonrpc.ClientOption { + cfg := TracerOptions{} + + for _, option := range options { + option(&cfg) + } + + if !cfg.Public && cfg.HTTPPropagate == nil { + cfg.HTTPPropagate = &b3.HTTPFormat{} + } + + clientBefore := jsonrpc.ClientBefore( + func(ctx context.Context, req *http.Request) context.Context { + var name string + + if cfg.Name != "" { + name = cfg.Name + } else { + // OpenCensus states Path being default naming for a client span + name = ctx.Value(jsonrpc.ContextKeyRequestMethod).(string) + } + + ctx, span := trace.StartSpan( + ctx, + name, + trace.WithSampler(cfg.Sampler), + trace.WithSpanKind(trace.SpanKindClient), + ) + + span.AddAttributes( + trace.StringAttribute(ochttp.HostAttribute, req.URL.Host), + trace.StringAttribute(ochttp.MethodAttribute, req.Method), + trace.StringAttribute(ochttp.PathAttribute, req.URL.Path), + trace.StringAttribute(ochttp.UserAgentAttribute, req.UserAgent()), + ) + + if !cfg.Public { + cfg.HTTPPropagate.SpanContextToRequest(span.SpanContext(), req) + } + + return ctx + }, + ) + + clientAfter := jsonrpc.ClientAfter( + func(ctx context.Context, res *http.Response) context.Context { + if span := trace.FromContext(ctx); span != nil { + span.SetStatus(ochttp.TraceStatus(res.StatusCode, http.StatusText(res.StatusCode))) + span.AddAttributes( + trace.Int64Attribute(ochttp.StatusCodeAttribute, int64(res.StatusCode)), + ) + } + return ctx + }, + ) + + clientFinalizer := jsonrpc.ClientFinalizer( + func(ctx context.Context, err error) { + if span := trace.FromContext(ctx); span != nil { + if err != nil { + span.SetStatus(trace.Status{ + Code: trace.StatusCodeUnknown, + Message: err.Error(), + }) + } + span.End() + } + }, + ) + + return func(c *jsonrpc.Client) { + clientBefore(c) + clientAfter(c) + clientFinalizer(c) + } +} + +// JSONRPCServerTrace enables OpenCensus tracing of a Go kit JSONRPC transport server. +func JSONRPCServerTrace(options ...TracerOption) jsonrpc.ServerOption { + cfg := TracerOptions{} + + for _, option := range options { + option(&cfg) + } + + if !cfg.Public && cfg.HTTPPropagate == nil { + cfg.HTTPPropagate = &b3.HTTPFormat{} + } + + serverBeforeCodec := jsonrpc.ServerBeforeCodec( + func(ctx context.Context, httpReq *http.Request, req jsonrpc.Request) context.Context { + var ( + spanContext trace.SpanContext + span *trace.Span + name string + ok bool + ) + + if cfg.Name != "" { + name = cfg.Name + } else { + name = ctx.Value(jsonrpc.ContextKeyRequestMethod).(string) + if name == "" { + // we can't find the rpc method. probably the + // unaryInterceptor was not wired up. + name = "unknown jsonrpc method" + } + } + + spanContext, ok = cfg.HTTPPropagate.SpanContextFromRequest(httpReq) + if ok && !cfg.Public { + ctx, span = trace.StartSpanWithRemoteParent( + ctx, + name, + spanContext, + trace.WithSpanKind(trace.SpanKindServer), + trace.WithSampler(cfg.Sampler), + ) + } else { + ctx, span = trace.StartSpan( + ctx, + name, + trace.WithSpanKind(trace.SpanKindServer), + trace.WithSampler(cfg.Sampler), + ) + if ok { + span.AddLink(trace.Link{ + TraceID: spanContext.TraceID, + SpanID: spanContext.SpanID, + Type: trace.LinkTypeChild, + Attributes: nil, + }) + } + } + + span.AddAttributes( + trace.StringAttribute(ochttp.MethodAttribute, httpReq.Method), + trace.StringAttribute(ochttp.PathAttribute, httpReq.URL.Path), + ) + + return ctx + }, + ) + + serverFinalizer := jsonrpc.ServerFinalizer( + func(ctx context.Context, code int, r *http.Request) { + if span := trace.FromContext(ctx); span != nil { + span.SetStatus(ochttp.TraceStatus(code, http.StatusText(code))) + + if rs, ok := ctx.Value(kithttp.ContextKeyResponseSize).(int64); ok { + span.AddAttributes( + trace.Int64Attribute("http.response_size", rs), + ) + } + + span.End() + } + }, + ) + + return func(s *jsonrpc.Server) { + serverBeforeCodec(s) + serverFinalizer(s) + } +} diff --git a/tracing/opencensus/jsonrpc_test.go b/tracing/opencensus/jsonrpc_test.go new file mode 100644 index 000000000..c9c47ff7e --- /dev/null +++ b/tracing/opencensus/jsonrpc_test.go @@ -0,0 +1,194 @@ +package opencensus_test + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "go.opencensus.io/plugin/ochttp" + "go.opencensus.io/plugin/ochttp/propagation/b3" + "go.opencensus.io/plugin/ochttp/propagation/tracecontext" + "go.opencensus.io/trace" + "go.opencensus.io/trace/propagation" + + "github.com/go-kit/kit/endpoint" + ockit "github.com/go-kit/kit/tracing/opencensus" + jsonrpc "github.com/go-kit/kit/transport/http/jsonrpc" +) + +func TestJSONRPCClientTrace(t *testing.T) { + var ( + err error + rec = &recordingExporter{} + rURL, _ = url.Parse("https://httpbin.org/anything") + endpointName = "DummyEndpoint" + ) + + trace.RegisterExporter(rec) + + traces := []struct { + name string + err error + }{ + {"", nil}, + {"CustomName", nil}, + {"", errors.New("dummy-error")}, + } + + for _, tr := range traces { + clientTracer := ockit.JSONRPCClientTrace( + ockit.WithName(tr.name), + ockit.WithSampler(trace.AlwaysSample()), + ) + ep := jsonrpc.NewClient( + rURL, + endpointName, + jsonrpc.ClientRequestEncoder(func(ctx context.Context, i interface{}) (json.RawMessage, error) { + return json.RawMessage(`{}`), nil + }), + jsonrpc.ClientResponseDecoder(func(ctx context.Context, r jsonrpc.Response) (response interface{}, err error) { + return nil, tr.err + }), + clientTracer, + ).Endpoint() + + ctx, parentSpan := trace.StartSpan(context.Background(), "test") + + _, err = ep(ctx, nil) + if want, have := tr.err, err; want != have { + t.Fatalf("unexpected error, want %s, have %s", tr.err.Error(), err.Error()) + } + + spans := rec.Flush() + if want, have := 1, len(spans); want != have { + t.Fatalf("incorrect number of spans, want %d, have %d", want, have) + } + + span := spans[0] + if want, have := parentSpan.SpanContext().SpanID, span.ParentSpanID; want != have { + t.Errorf("incorrect parent ID, want %s, have %s", want, have) + } + + if want, have := tr.name, span.Name; want != have && want != "" { + t.Errorf("incorrect span name, want %s, have %s", want, have) + } + + if want, have := endpointName, span.Name; want != have && tr.name == "" { + t.Errorf("incorrect span name, want %s, have %s", want, have) + } + + code := trace.StatusCodeOK + if tr.err != nil { + code = trace.StatusCodeUnknown + + if want, have := err.Error(), span.Status.Message; want != have { + t.Errorf("incorrect span status msg, want %s, have %s", want, have) + } + } + + if want, have := int32(code), span.Status.Code; want != have { + t.Errorf("incorrect span status code, want %d, have %d", want, have) + } + } +} + +func TestJSONRPCServerTrace(t *testing.T) { + var ( + endpointName = "DummyEndpoint" + rec = &recordingExporter{} + ) + + trace.RegisterExporter(rec) + + traces := []struct { + useParent bool + name string + err error + propagation propagation.HTTPFormat + }{ + {false, "", nil, nil}, + {true, "", nil, nil}, + {true, "CustomName", nil, &b3.HTTPFormat{}}, + {true, "", errors.New("dummy-error"), &tracecontext.HTTPFormat{}}, + } + + for _, tr := range traces { + var client http.Client + + handler := jsonrpc.NewServer( + jsonrpc.EndpointCodecMap{ + endpointName: jsonrpc.EndpointCodec{ + Endpoint: endpoint.Nop, + Decode: func(context.Context, json.RawMessage) (interface{}, error) { return nil, nil }, + Encode: func(context.Context, interface{}) (json.RawMessage, error) { return nil, tr.err }, + }, + }, + ockit.JSONRPCServerTrace( + ockit.WithName(tr.name), + ockit.WithSampler(trace.AlwaysSample()), + ockit.WithHTTPPropagation(tr.propagation), + ), + ) + + server := httptest.NewServer(handler) + defer server.Close() + + jsonStr := []byte(fmt.Sprintf(`{"method":"%s"}`, endpointName)) + req, err := http.NewRequest("POST", server.URL, bytes.NewBuffer(jsonStr)) + if err != nil { + t.Fatalf("unable to create JSONRPC request: %s", err.Error()) + } + + if tr.useParent { + client = http.Client{ + Transport: &ochttp.Transport{ + StartOptions: trace.StartOptions{ + Sampler: trace.AlwaysSample(), + }, + Propagation: tr.propagation, + }, + } + } + + resp, err := client.Do(req.WithContext(context.Background())) + if err != nil { + t.Fatalf("unable to send JSONRPC request: %s", err.Error()) + } + resp.Body.Close() + + spans := rec.Flush() + + expectedSpans := 1 + if tr.useParent { + expectedSpans++ + } + + if want, have := expectedSpans, len(spans); want != have { + t.Fatalf("incorrect number of spans, want %d, have %d", want, have) + } + + if tr.useParent { + if want, have := spans[1].TraceID, spans[0].TraceID; want != have { + t.Errorf("incorrect trace ID, want %s, have %s", want, have) + } + + if want, have := spans[1].SpanID, spans[0].ParentSpanID; want != have { + t.Errorf("incorrect span ID, want %s, have %s", want, have) + } + } + + if want, have := tr.name, spans[0].Name; want != have && want != "" { + t.Errorf("incorrect span name, want %s, have %s", want, have) + } + + if want, have := endpointName, spans[0].Name; want != have && tr.name == "" { + t.Errorf("incorrect span name, want %s, have %s", want, have) + } + } +} diff --git a/transport/http/jsonrpc/client.go b/transport/http/jsonrpc/client.go index 86e4123c7..14105aa8d 100644 --- a/transport/http/jsonrpc/client.go +++ b/transport/http/jsonrpc/client.go @@ -159,6 +159,8 @@ func (c Client) Endpoint() endpoint.Endpoint { }() } + ctx = context.WithValue(ctx, ContextKeyRequestMethod, c.method) + var params json.RawMessage if params, err = c.enc(ctx, request); err != nil { return nil, err @@ -207,7 +209,12 @@ func (c Client) Endpoint() endpoint.Endpoint { return nil, err } - return c.dec(ctx, rpcRes) + response, err := c.dec(ctx, rpcRes) + if err != nil { + return nil, err + } + + return response, nil } } diff --git a/transport/http/jsonrpc/request_response_types.go b/transport/http/jsonrpc/request_response_types.go index 1c6630bc8..ffae404e5 100644 --- a/transport/http/jsonrpc/request_response_types.go +++ b/transport/http/jsonrpc/request_response_types.go @@ -1,6 +1,10 @@ package jsonrpc -import "encoding/json" +import ( + "context" + "encoding/json" + "net/http" +) // Request defines a JSON RPC request from the spec // http://www.jsonrpc.org/specification#request_object @@ -26,6 +30,11 @@ type RequestID struct { stringError error } +// RequestFunc may take information from decoded json body and place in +// request context. In Servers, RequestFuncs are executed after json is parsed +// but prior to invoking the codec +type RequestFunc func(context.Context, *http.Request, Request) context.Context + // UnmarshalJSON satisfies json.Unmarshaler func (id *RequestID) UnmarshalJSON(b []byte) error { id.intError = json.Unmarshal(b, &id.intValue) @@ -79,3 +88,9 @@ const ( // ContentType defines the content type to be served. ContentType string = "application/json; charset=utf-8" ) + +type contextKey int + +const ( + ContextKeyRequestMethod contextKey = iota +) diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go index 75666e4bf..325027547 100644 --- a/transport/http/jsonrpc/server.go +++ b/transport/http/jsonrpc/server.go @@ -19,6 +19,7 @@ var requestIDKey requestIDKeyType type Server struct { ecm EndpointCodecMap before []httptransport.RequestFunc + beforeCodec []RequestFunc after []httptransport.ServerResponseFunc errorEncoder httptransport.ErrorEncoder finalizer httptransport.ServerFinalizerFunc @@ -50,6 +51,14 @@ func ServerBefore(before ...httptransport.RequestFunc) ServerOption { return func(s *Server) { s.before = append(s.before, before...) } } +// ServerBeforeCodec functions are executed after the JSON request body has been +// decoded, but before the method's decoder is called. This provides an opportunity +// for middleware to inspect the contents of the rpc request before being passed +// to the codec. +func ServerBeforeCodec(beforeCodec ...RequestFunc) ServerOption { + return func(s *Server) { s.beforeCodec = append(s.beforeCodec, beforeCodec...) } +} + // ServerAfter functions are executed on the HTTP response writer after the // endpoint is invoked, but before anything is written to the client. func ServerAfter(after ...httptransport.ServerResponseFunc) ServerOption { @@ -110,6 +119,11 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { } ctx = context.WithValue(ctx, requestIDKey, req.ID) + ctx = context.WithValue(ctx, ContextKeyRequestMethod, req.Method) + + for _, f := range s.beforeCodec { + ctx = f(ctx, r, req) + } // Get the endpoint and codecs from the map using the method // defined in the JSON object diff --git a/transport/http/jsonrpc/server_test.go b/transport/http/jsonrpc/server_test.go index c279d3c92..c05435ddc 100644 --- a/transport/http/jsonrpc/server_test.go +++ b/transport/http/jsonrpc/server_test.go @@ -235,6 +235,42 @@ func TestServerHappyPath(t *testing.T) { } } +func TestMultipleServerBeforeCodec(t *testing.T) { + var done = make(chan struct{}) + ecm := jsonrpc.EndpointCodecMap{ + "add": jsonrpc.EndpointCodec{ + Endpoint: endpoint.Nop, + Decode: nopDecoder, + Encode: nopEncoder, + }, + } + handler := jsonrpc.NewServer( + ecm, + jsonrpc.ServerBeforeCodec(func(ctx context.Context, r *http.Request, req jsonrpc.Request) context.Context { + ctx = context.WithValue(ctx, "one", 1) + + return ctx + }), + jsonrpc.ServerBeforeCodec(func(ctx context.Context, r *http.Request, req jsonrpc.Request) context.Context { + if _, ok := ctx.Value("one").(int); !ok { + t.Error("Value was not set properly when multiple ServerBeforeCodecs are used") + } + + close(done) + return ctx + }), + ) + server := httptest.NewServer(handler) + defer server.Close() + http.Post(server.URL, "application/json", addBody()) // nolint + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for finalizer") + } +} + func TestMultipleServerBefore(t *testing.T) { var done = make(chan struct{}) ecm := jsonrpc.EndpointCodecMap{