Skip to content

Commit

Permalink
otelgrpc stats.Handler support metric
Browse files Browse the repository at this point in the history
Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
  • Loading branch information
fatsheep9146 committed Oct 7, 2023
1 parent 8a2f8cf commit 57d33a1
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108)
- Add `"go.opentelemetry.io/contrib/samplers/jaegerremote".WithSamplingStrategyFetcher` which sets custom fetcher implementation. (#4045)
- Add metric support for `grpc.StatsHandler` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4356)

### Changed

Expand Down
150 changes: 137 additions & 13 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (
"google.golang.org/grpc/status"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)
Expand All @@ -33,98 +36,204 @@ type gRPCContextKey struct{}
type gRPCContext struct {
messagesReceived int64
messagesSent int64
requestSize int64
responseSize int64
metricAttrs []attribute.KeyValue
}

// NewServerHandler creates a stats.Handler for gRPC server.
func NewServerHandler(opts ...Option) stats.Handler {
h := &serverHandler{
config: newConfig(opts),
r: &handler{},
}

h.tracer = h.config.TracerProvider.Tracer(
h.r.tracer = h.config.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)
h.r.meter = h.config.MeterProvider.Meter(
instrumentationName,
metric.WithInstrumentationVersion(Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)
var err error
h.r.rpcDuration, err = h.r.meter.Int64Histogram("rpc.server.duration",
metric.WithDescription("Measures the duration of inbound RPC."),
metric.WithUnit("ms"))
if err != nil {
otel.Handle(err)
}

Check warning on line 66 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L65-L66

Added lines #L65 - L66 were not covered by tests

h.r.rpcRequestSize, err = h.r.meter.Int64Histogram("rpc.server.request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

Check warning on line 73 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L72-L73

Added lines #L72 - L73 were not covered by tests

h.r.rpcResponseSize, err = h.r.meter.Int64Histogram("rpc.server.response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

Check warning on line 80 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L79-L80

Added lines #L79 - L80 were not covered by tests

h.r.rpcRequestsPerRPC, err = h.r.meter.Int64Histogram("rpc.server.requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}

Check warning on line 87 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L86-L87

Added lines #L86 - L87 were not covered by tests

h.r.rpcResponsesPerRPC, err = h.r.meter.Int64Histogram("rpc.server.responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}

Check warning on line 94 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L93-L94

Added lines #L93 - L94 were not covered by tests

return h
}

type handler struct {
tracer trace.Tracer
meter metric.Meter
rpcDuration metric.Int64Histogram
rpcRequestSize metric.Int64Histogram
rpcResponseSize metric.Int64Histogram
rpcRequestsPerRPC metric.Int64Histogram
rpcResponsesPerRPC metric.Int64Histogram
}

type serverHandler struct {
*config
tracer trace.Tracer
r *handler
}

// TagRPC can attach some information to the given context.
func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
// fmt.Printf("server TagRPC, ctx.Err %v\n", ctx.Err())
ctx = extract(ctx, h.config.Propagators)

name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)
ctx, _ = h.tracer.Start(
ctx, _ = h.r.tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attrs...),
)

gctx := gRPCContext{}
gctx := gRPCContext{
metricAttrs: attrs,
}
return context.WithValue(ctx, gRPCContextKey{}, &gctx)
}

// HandleRPC processes the RPC stats.
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
// fmt.Printf("server HandleRPC, ctx.Err %v\n", ctx.Err())
h.r.handleRPC(ctx, rs)
}

// TagConn can attach some information to the given context.
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
// fmt.Printf("server TagConn, ctx.Err %v\n", ctx.Err())
span := trace.SpanFromContext(ctx)
attrs := peerAttr(peerFromCtx(ctx))
span.SetAttributes(attrs...)

return ctx
}

// HandleConn processes the Conn stats.
func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
// fmt.Printf("server HandleConn, ctx.Err %v\n", ctx.Err())
}

// NewClientHandler creates a stats.Handler for gRPC client.
func NewClientHandler(opts ...Option) stats.Handler {
h := &clientHandler{
config: newConfig(opts),
r: &handler{},
}

h.tracer = h.config.TracerProvider.Tracer(
h.r.tracer = h.config.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)

h.r.meter = h.config.MeterProvider.Meter(
instrumentationName,
metric.WithInstrumentationVersion(Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)
var err error
h.r.rpcDuration, err = h.r.meter.Int64Histogram("rpc.client.duration",
metric.WithDescription("Measures the duration of inbound RPC."),
metric.WithUnit("ms"))
if err != nil {
otel.Handle(err)
}

Check warning on line 178 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L177-L178

Added lines #L177 - L178 were not covered by tests

h.r.rpcRequestSize, err = h.r.meter.Int64Histogram("rpc.client.request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

Check warning on line 185 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L184-L185

Added lines #L184 - L185 were not covered by tests

h.r.rpcResponseSize, err = h.r.meter.Int64Histogram("rpc.client.response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

Check warning on line 192 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L191-L192

Added lines #L191 - L192 were not covered by tests

h.r.rpcRequestsPerRPC, err = h.r.meter.Int64Histogram("rpc.client.requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}

Check warning on line 199 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L198-L199

Added lines #L198 - L199 were not covered by tests

h.r.rpcResponsesPerRPC, err = h.r.meter.Int64Histogram("rpc.client.responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}

Check warning on line 206 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L205-L206

Added lines #L205 - L206 were not covered by tests

return h
}

type clientHandler struct {
*config
tracer trace.Tracer
r *handler
}

// TagRPC can attach some information to the given context.
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)
ctx, _ = h.tracer.Start(
ctx, _ = h.r.tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attrs...),
)

gctx := gRPCContext{}
gctx := gRPCContext{
metricAttrs: attrs,
}

return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.config.Propagators)
}

// HandleRPC processes the RPC stats.
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
h.r.handleRPC(ctx, rs)
}

// TagConn can attach some information to the given context.
Expand All @@ -140,7 +249,7 @@ func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
// no-op
}

func handleRPC(ctx context.Context, rs stats.RPCStats) {
func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) {
span := trace.SpanFromContext(ctx)
gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
var messageId int64
Expand All @@ -150,6 +259,7 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) {
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
atomic.StoreInt64(&gctx.requestSize, int64(rs.Length))
}
span.AddEvent("message",
trace.WithAttributes(
Expand All @@ -162,6 +272,7 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) {
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
atomic.StoreInt64(&gctx.responseSize, int64(rs.Length))
}

span.AddEvent("message",
Expand All @@ -173,14 +284,27 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) {
),
)
case *stats.End:
var rpcStatusAttr attribute.KeyValue
metricAttrs := make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1)
metricAttrs = append(metricAttrs, gctx.metricAttrs...)

if rs.Error != nil {
s, _ := status.FromError(rs.Error)
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(s.Code()))

Check warning on line 294 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L294

Added line #L294 was not covered by tests
} else {
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK))
}
span.SetAttributes(rpcStatusAttr)
span.End()

metricAttrs = append(metricAttrs, rpcStatusAttr)
r.rpcDuration.Record(context.TODO(), int64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...))
r.rpcRequestSize.Record(context.TODO(), gctx.requestSize, metric.WithAttributes(metricAttrs...))
r.rpcResponseSize.Record(context.TODO(), gctx.responseSize, metric.WithAttributes(metricAttrs...))
r.rpcRequestsPerRPC.Record(context.TODO(), gctx.messagesReceived, metric.WithAttributes(metricAttrs...))
r.rpcResponsesPerRPC.Record(context.TODO(), gctx.messagesSent, metric.WithAttributes(metricAttrs...))

default:
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -24,6 +25,8 @@ import (

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
Expand All @@ -32,25 +35,31 @@ import (
func TestStatsHandler(t *testing.T) {
clientSR := tracetest.NewSpanRecorder()
clientTP := trace.NewTracerProvider(trace.WithSpanProcessor(clientSR))
clientMetricReader := metric.NewManualReader()
clientMP := metric.NewMeterProvider(metric.WithReader(clientMetricReader))

serverSR := tracetest.NewSpanRecorder()
serverTP := trace.NewTracerProvider(trace.WithSpanProcessor(serverSR))
serverMetricReader := metric.NewManualReader()
serverMP := metric.NewMeterProvider(metric.WithReader(serverMetricReader))

assert.NoError(t, doCalls(
[]grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(clientTP))),
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(clientTP), otelgrpc.WithMeterProvider(clientMP))),
},
[]grpc.ServerOption{
grpc.StatsHandler(otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(serverTP))),
grpc.StatsHandler(otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(serverTP), otelgrpc.WithMeterProvider(serverMP))),
},
))

t.Run("ClientSpans", func(t *testing.T) {
t.Run("Client", func(t *testing.T) {
checkClientSpans(t, clientSR.Ended())
checkClientRecords(t, clientMetricReader)
})

t.Run("ServerSpans", func(t *testing.T) {
t.Run("Server", func(t *testing.T) {
checkServerSpans(t, serverSR.Ended())
checkServerRecords(t, serverMetricReader)
})
}

Expand Down Expand Up @@ -579,3 +588,49 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, pingPong.Attributes())
}

func checkServerRecords(t *testing.T, reader metric.Reader) {
rm := metricdata.ResourceMetrics{}
err := reader.Collect(context.Background(), &rm)
assert.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 5)
for _, m := range rm.ScopeMetrics[0].Metrics {
require.IsType(t, m.Data, metricdata.Histogram[int64]{})
data := m.Data.(metricdata.Histogram[int64])
for _, dpt := range data.DataPoints {
attr := dpt.Attributes.ToSlice()
method := getRPCMethod(attr)
assert.NotEmpty(t, method)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethod(method),
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, attr)
}
}
}

func checkClientRecords(t *testing.T, reader metric.Reader) {
rm := metricdata.ResourceMetrics{}
err := reader.Collect(context.Background(), &rm)
assert.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 5)
for _, m := range rm.ScopeMetrics[0].Metrics {
require.IsType(t, m.Data, metricdata.Histogram[int64]{})
data := m.Data.(metricdata.Histogram[int64])
for _, dpt := range data.DataPoints {
attr := dpt.Attributes.ToSlice()
method := getRPCMethod(attr)
assert.NotEmpty(t, method)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethod(method),
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, attr)
}
}
}

0 comments on commit 57d33a1

Please sign in to comment.