Skip to content

Commit

Permalink
add metric rpc.server.duration for unary request and stream request
Browse files Browse the repository at this point in the history
Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
  • Loading branch information
fatsheep9146 committed Sep 19, 2022
1 parent c4876c3 commit 9e3f735
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 3 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Expand Up @@ -14,6 +14,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Rename the `Typ` field of `"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc".InterceptorInfo` to `Type`. (#2688)
- Use Go 1.19 as the default version for CI testing/linting. (#2675)

### Added

- [otelgrpc] add metric `rpc.server.duration` to otelgrpc instrumentation library. (#2700)

## [1.9.0/0.34.0/0.4.0] - 2022-08-02

Expand Down
Expand Up @@ -22,6 +22,9 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
Expand All @@ -43,6 +46,10 @@ type config struct {
Filter Filter
Propagators propagation.TextMapPropagator
TracerProvider trace.TracerProvider
MeterProvider metric.MeterProvider

meter metric.Meter
rpcServerDuration syncfloat64.Histogram
}

// Option applies an option value for a config.
Expand All @@ -55,10 +62,22 @@ func newConfig(opts []Option) *config {
c := &config{
Propagators: otel.GetTextMapPropagator(),
TracerProvider: otel.GetTracerProvider(),
MeterProvider: global.MeterProvider(),
}
for _, o := range opts {
o.apply(c)
}

meter := c.MeterProvider.Meter(
instrumentationName,
metric.WithInstrumentationVersion(SemVersion()),
)
c.meter = meter
var err error
if c.rpcServerDuration, err = meter.SyncFloat64().Histogram("rpc.server.duration"); err != nil {
otel.Handle(err)
}

return c
}

Expand Down Expand Up @@ -105,6 +124,20 @@ func WithTracerProvider(tp trace.TracerProvider) Option {
return tracerProviderOption{tp: tp}
}

type meterProviderOption struct{ mp metric.MeterProvider }

func (o meterProviderOption) apply(c *config) {
if o.mp != nil {
c.MeterProvider = o.mp
}
}

// WithMeterProvider returns an Option to use the MeterProvider when
// creating a Meter.
func WithMeterProvider(mp metric.MeterProvider) Option {
return meterProviderOption{mp: mp}
}

type metadataSupplier struct {
metadata *metadata.MD
}
Expand Down
Expand Up @@ -18,6 +18,7 @@ require (
require (
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
go.opentelemetry.io/otel/metric v0.31.0 // indirect
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 // indirect
golang.org/x/text v0.3.3 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
Expand Down
Expand Up @@ -37,6 +37,8 @@ go.opentelemetry.io/otel v1.10.0 h1:Y7DTJMR6zs1xkS/upamJYk0SxxN4C9AqRd77jmZnyY4=
go.opentelemetry.io/otel v1.10.0/go.mod h1:NbvWjCthWHKBEUMpf0/v8ZRZlni86PpGFEMA9pnQSnQ=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.10.0 h1:c9UtMu/qnbLlVwTwt+ABrURrioEruapIslTDYZHJe2w=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.10.0/go.mod h1:h3Lrh9t3Dnqp3NPwAZx7i37UFX7xrfnO1D+fuClREOA=
go.opentelemetry.io/otel/metric v0.31.0 h1:6SiklT+gfWAwWUR0meEMxQBtihpiEs4c+vL9spDTqUs=
go.opentelemetry.io/otel/metric v0.31.0/go.mod h1:ohmwj9KTSIeBnDBm/ZwH2PSZxZzoOaG2xZeekTRzL5A=
go.opentelemetry.io/otel/sdk v1.10.0 h1:jZ6K7sVn04kk/3DNUdJ4mqRlGDiXAVuIG+MMENpTNdY=
go.opentelemetry.io/otel/sdk v1.10.0/go.mod h1:vO06iKzD5baltJz1zarxMCNHFpUlUiOy4s65ECtn6kE=
go.opentelemetry.io/otel/trace v1.10.0 h1:npQMbR8o7mum8uF95yFbOEJffhs1sbCOfDh8zAJiH5E=
Expand Down
1 change: 1 addition & 0 deletions instrumentation/google.golang.org/grpc/otelgrpc/go.mod
Expand Up @@ -5,6 +5,7 @@ go 1.17
require (
github.com/golang/protobuf v1.5.2
go.opentelemetry.io/otel v1.10.0
go.opentelemetry.io/otel/metric v0.31.0
go.opentelemetry.io/otel/trace v1.10.0
google.golang.org/grpc v1.49.0
)
Expand Down
2 changes: 2 additions & 0 deletions instrumentation/google.golang.org/grpc/otelgrpc/go.sum
Expand Up @@ -36,6 +36,8 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
go.opentelemetry.io/otel v1.10.0 h1:Y7DTJMR6zs1xkS/upamJYk0SxxN4C9AqRd77jmZnyY4=
go.opentelemetry.io/otel v1.10.0/go.mod h1:NbvWjCthWHKBEUMpf0/v8ZRZlni86PpGFEMA9pnQSnQ=
go.opentelemetry.io/otel/metric v0.31.0 h1:6SiklT+gfWAwWUR0meEMxQBtihpiEs4c+vL9spDTqUs=
go.opentelemetry.io/otel/metric v0.31.0/go.mod h1:ohmwj9KTSIeBnDBm/ZwH2PSZxZzoOaG2xZeekTRzL5A=
go.opentelemetry.io/otel/trace v1.10.0 h1:npQMbR8o7mum8uF95yFbOEJffhs1sbCOfDh8zAJiH5E=
go.opentelemetry.io/otel/trace v1.10.0/go.mod h1:Sij3YYczqAdz+EhmGhE6TpTxUO5/F/AzrK+kxfGqySM=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
21 changes: 20 additions & 1 deletion instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"io"
"net"
"time"

"github.com/golang/protobuf/proto" // nolint:staticcheck

Expand Down Expand Up @@ -349,13 +350,22 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {

messageReceived.Event(ctx, 1, req)

var statusCode grpc_codes.Code
defer func(t time.Time) {
elapsedTime := float64(time.Since(t)) / float64(time.Millisecond)
attr = append(attr, semconv.RPCGRPCStatusCodeKey.Int64(int64(statusCode)))
cfg.rpcServerDuration.Record(ctx, elapsedTime, attr...)
}(time.Now())

resp, err := handler(ctx, req)
if err != nil {
s, _ := status.FromError(err)
statusCode = s.Code()
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
messageSent.Event(ctx, 1, s.Proto())
} else {
statusCode = grpc_codes.OK
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
messageSent.Event(ctx, 1, resp)
}
Expand Down Expand Up @@ -416,6 +426,7 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
handler grpc.StreamHandler,
) error {
ctx := ss.Context()

i := &InterceptorInfo{
StreamServerInfo: info,
Type: StreamServer,
Expand Down Expand Up @@ -444,13 +455,21 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
)
defer span.End()

err := handler(srv, wrapServerStream(ctx, ss))
var statusCode grpc_codes.Code
defer func(t time.Time) {
elapsedTime := float64(time.Since(t)) / float64(time.Millisecond)
attr = append(attr, semconv.RPCGRPCStatusCodeKey.Int64(int64(statusCode)))
cfg.rpcServerDuration.Record(ctx, elapsedTime, attr...)
}(time.Now())

err := handler(srv, wrapServerStream(ctx, ss))
if err != nil {
s, _ := status.FromError(err)
statusCode = s.Code()
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
} else {
statusCode = grpc_codes.OK
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
}

Expand Down
2 changes: 2 additions & 0 deletions instrumentation/google.golang.org/grpc/otelgrpc/test/go.mod
Expand Up @@ -8,6 +8,7 @@ require (
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0
go.opentelemetry.io/otel v1.10.0
go.opentelemetry.io/otel/sdk v1.10.0
go.opentelemetry.io/otel/sdk/metric v0.31.0
go.uber.org/goleak v1.2.0
google.golang.org/grpc v1.49.0
)
Expand All @@ -19,6 +20,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel/metric v0.31.0 // indirect
go.opentelemetry.io/otel/trace v1.10.0 // indirect
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect
Expand Down
5 changes: 5 additions & 0 deletions instrumentation/google.golang.org/grpc/otelgrpc/test/go.sum
Expand Up @@ -2,6 +2,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
cloud.google.com/go v0.34.0 h1:eOI3/cP2VTU6uZLDYAoic+eyzzB9YyGmJ7eIjl8rOPg=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down Expand Up @@ -47,8 +48,12 @@ github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PK
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
go.opentelemetry.io/otel v1.10.0 h1:Y7DTJMR6zs1xkS/upamJYk0SxxN4C9AqRd77jmZnyY4=
go.opentelemetry.io/otel v1.10.0/go.mod h1:NbvWjCthWHKBEUMpf0/v8ZRZlni86PpGFEMA9pnQSnQ=
go.opentelemetry.io/otel/metric v0.31.0 h1:6SiklT+gfWAwWUR0meEMxQBtihpiEs4c+vL9spDTqUs=
go.opentelemetry.io/otel/metric v0.31.0/go.mod h1:ohmwj9KTSIeBnDBm/ZwH2PSZxZzoOaG2xZeekTRzL5A=
go.opentelemetry.io/otel/sdk v1.10.0 h1:jZ6K7sVn04kk/3DNUdJ4mqRlGDiXAVuIG+MMENpTNdY=
go.opentelemetry.io/otel/sdk v1.10.0/go.mod h1:vO06iKzD5baltJz1zarxMCNHFpUlUiOy4s65ECtn6kE=
go.opentelemetry.io/otel/sdk/metric v0.31.0 h1:2sZx4R43ZMhJdteKAlKoHvRgrMp53V1aRxvEf5lCq8Q=
go.opentelemetry.io/otel/sdk/metric v0.31.0/go.mod h1:fl0SmNnX9mN9xgU6OLYLMBMrNAsaZQi7qBwprwO3abk=
go.opentelemetry.io/otel/trace v1.10.0 h1:npQMbR8o7mum8uF95yFbOEJffhs1sbCOfDh8zAJiH5E=
go.opentelemetry.io/otel/trace v1.10.0/go.mod h1:Sij3YYczqAdz+EhmGhE6TpTxUO5/F/AzrK+kxfGqySM=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
Expand Down
52 changes: 50 additions & 2 deletions instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go
Expand Up @@ -30,6 +30,7 @@ import (

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metrictest"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
Expand Down Expand Up @@ -84,18 +85,20 @@ func TestInterceptors(t *testing.T) {

serverUnarySR := tracetest.NewSpanRecorder()
serverUnaryTP := trace.NewTracerProvider(trace.WithSpanProcessor(serverUnarySR))
serverUnaryMP, serverUnaryMetricExporter := metrictest.NewTestMeterProvider()

serverStreamSR := tracetest.NewSpanRecorder()
serverStreamTP := trace.NewTracerProvider(trace.WithSpanProcessor(serverStreamSR))
serverStreamMP, serverStreamMetricExporter := metrictest.NewTestMeterProvider()

assert.NoError(t, doCalls(
[]grpc.DialOption{
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(clientUnaryTP))),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(clientStreamTP))),
},
[]grpc.ServerOption{
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(serverUnaryTP))),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor(otelgrpc.WithTracerProvider(serverStreamTP))),
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(serverUnaryTP), otelgrpc.WithMeterProvider(serverUnaryMP))),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor(otelgrpc.WithTracerProvider(serverStreamTP), otelgrpc.WithMeterProvider(serverStreamMP))),
},
))

Expand All @@ -109,10 +112,12 @@ func TestInterceptors(t *testing.T) {

t.Run("UnaryServerSpans", func(t *testing.T) {
checkUnaryServerSpans(t, serverUnarySR.Ended())
checkUnaryServerRecords(t, serverUnaryMetricExporter)
})

t.Run("StreamServerSpans", func(t *testing.T) {
checkStreamServerSpans(t, serverStreamSR.Ended())
checkStreamServerRecords(t, serverStreamMetricExporter)
})
}

Expand Down Expand Up @@ -622,3 +627,46 @@ func assertEvents(t *testing.T, expected, actual []trace.Event) bool {

return !failed
}

func checkUnaryServerRecords(t *testing.T, exporter *metrictest.Exporter) {
assert.NoError(t, exporter.Collect(context.Background()))
records := exporter.GetRecords()
assert.Equal(t, 2, len(records))

for _, record := range records {
method := getRPCMethod(record.Attributes)
assert.NotEmpty(t, method)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String(method),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, record.Attributes)
}
}

func checkStreamServerRecords(t *testing.T, exporter *metrictest.Exporter) {
assert.NoError(t, exporter.Collect(context.Background()))
records := exporter.GetRecords()
assert.Equal(t, 3, len(records))

for _, record := range records {
method := getRPCMethod(record.Attributes)
assert.NotEmpty(t, method)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String(method),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, record.Attributes)
}
}

func getRPCMethod(attrs []attribute.KeyValue) string {
for _, kvs := range attrs {
if kvs.Key == semconv.RPCMethodKey {
return kvs.Value.AsString()
}
}
return ""
}

0 comments on commit 9e3f735

Please sign in to comment.