Skip to content

Commit

Permalink
otelgrpc: add custom attributes to the stats handler
Browse files Browse the repository at this point in the history
  • Loading branch information
inigohu committed May 9, 2024
1 parent 0483033 commit eacf0ac
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 79 deletions.
76 changes: 76 additions & 0 deletions instrumentation/google.golang.org/grpc/otelgrpc/grpccontext.go
@@ -0,0 +1,76 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"

import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// metricsInfo contains metrics information for an RPC.
type metricsInfo struct {
msgReceived int64
msgSent int64
}

// traceInfo contains tracing information for an RPC.
type traceInfo struct {
name string
kind trace.SpanKind
}

// gRPCContext contains all the information needed to record metrics and traces.
type gRPCContext struct {
metricsInfo *metricsInfo
traceInfo *traceInfo
attrs []attribute.KeyValue
record bool
}

// AddAttrs adds attributes to the given context.
func AddAttrs(ctx context.Context, attrs ...attribute.KeyValue) context.Context {
gctx, _ := gRPCContextFromContext(ctx)
gctx.addAttrs(attrs...)
return contextWithGRPCContext(ctx, gctx)
}

// add attributes to a gRPCContext.
func (g *gRPCContext) addAttrs(attrs ...attribute.KeyValue) {
g.attrs = append(g.attrs, attrs...)
}

type gRPCContextKey struct{}

// contextWithGRPCContext returns a new context with the provided gRPCContext attached.
func contextWithGRPCContext(ctx context.Context, gctx *gRPCContext) context.Context {
return context.WithValue(ctx, gRPCContextKey{}, gctx)
}

// gRPCContextFromContext retrieves a GRPCContext instance from the provided context if
// one is available. If no GRPCContext was found in the provided context a new, empty
// GRPCContext is returned and the second return value is false. In this case it is
// safe to use the GRPCContext but any attributes added to it will not be used.
func gRPCContextFromContext(ctx context.Context) (*gRPCContext, bool) { // nolint: revive
l, ok := ctx.Value(gRPCContextKey{}).(*gRPCContext)
if !ok {
l = &gRPCContext{
metricsInfo: &metricsInfo{},
traceInfo: &traceInfo{},
}
}
return l, ok
}
133 changes: 54 additions & 79 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Expand Up @@ -14,22 +14,12 @@ import (
"google.golang.org/grpc/status"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)

type gRPCContextKey struct{}

type gRPCContext struct {
messagesReceived int64
messagesSent int64
metricAttrs []attribute.KeyValue
record bool
}

type serverHandler struct {
*config
}
Expand All @@ -54,31 +44,17 @@ func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {

// TagRPC can attach some information to the given context.
func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
ctx = extract(ctx, h.config.Propagators)
ctx = extract(ctx, h.Propagators)

name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)
ctx, _ = h.tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attrs...),
)
gctx, _ := gRPCContextFromContext(ctx)
gctx.traceInfo.kind = trace.SpanKindServer

gctx := gRPCContext{
metricAttrs: attrs,
record: true,
}
if h.config.Filter != nil {
gctx.record = h.config.Filter(info)
}
return context.WithValue(ctx, gRPCContextKey{}, &gctx)
return h.tagRPC(ctx, info)
}

// HandleRPC processes the RPC stats.
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
isServer := true
h.handleRPC(ctx, rs, isServer)
h.handleRPC(ctx, rs)
}

type clientHandler struct {
Expand All @@ -96,30 +72,17 @@ func NewClientHandler(opts ...Option) stats.Handler {

// TagRPC can attach some information to the given context.
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)
ctx, _ = h.tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attrs...),
)
gctx, _ := gRPCContextFromContext(ctx)
gctx.traceInfo.kind = trace.SpanKindClient

gctx := gRPCContext{
metricAttrs: attrs,
record: true,
}
if h.config.Filter != nil {
gctx.record = h.config.Filter(info)
}
ctx = h.tagRPC(contextWithGRPCContext(ctx, gctx), info)

return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.config.Propagators)
return inject(ctx, h.Propagators)
}

// HandleRPC processes the RPC stats.
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
isServer := false
h.handleRPC(ctx, rs, isServer)
h.handleRPC(ctx, rs)
}

// TagConn can attach some information to the given context.
Expand All @@ -132,28 +95,48 @@ func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
// no-op
}

func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool) { // nolint: revive // isServer is not a control flag.
func (c *config) tagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)

gctx, _ := gRPCContextFromContext(ctx)
gctx.traceInfo.name = name
gctx.addAttrs(attrs...)
gctx.record = true

if c.Filter != nil {
gctx.record = c.Filter(info)
}

if gctx.traceInfo.kind == trace.SpanKindServer {
ctx = trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx))
}

ctx, _ = c.tracer.Start(
ctx,
gctx.traceInfo.name,
trace.WithSpanKind(gctx.traceInfo.kind),
trace.WithAttributes(gctx.attrs...),
)

return contextWithGRPCContext(ctx, gctx)
}

func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats) {
gctx, _ := gRPCContextFromContext(ctx)
span := trace.SpanFromContext(ctx)
var metricAttrs []attribute.KeyValue

var messageId int64

gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
if gctx != nil {
if !gctx.record {
return
}
metricAttrs = make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1)
metricAttrs = append(metricAttrs, gctx.metricAttrs...)
if gctx != nil && !gctx.record {
return
}

switch rs := rs.(type) {
case *stats.Begin:
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
}

messageId = atomic.AddInt64(&gctx.metricsInfo.msgReceived, 1)
c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributes(gctx.attrs...))
if c.ReceivedEvent {
span.AddEvent("message",
trace.WithAttributes(
Expand All @@ -165,11 +148,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
)
}
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
}

messageId = atomic.AddInt64(&gctx.metricsInfo.msgSent, 1)
c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributes(gctx.attrs...))
if c.SentEvent {
span.AddEvent("message",
trace.WithAttributes(
Expand All @@ -186,33 +166,28 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
span.SetAttributes(peerAttr(p.Addr.String())...)
}
case *stats.End:
var rpcStatusAttr attribute.KeyValue

if rs.Error != nil {
s, _ := status.FromError(rs.Error)
if isServer {
if gctx.traceInfo.kind == trace.SpanKindServer {
statusCode, msg := serverStatus(s)
span.SetStatus(statusCode, msg)
} else {
span.SetStatus(codes.Error, s.Message())
}
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(s.Code()))
gctx.addAttrs(semconv.RPCGRPCStatusCodeKey.Int(int(s.Code())))
} else {
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK))
gctx.addAttrs(semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK)))
}
span.SetAttributes(rpcStatusAttr)
span.End()

metricAttrs = append(metricAttrs, rpcStatusAttr)
span.SetAttributes(gctx.attrs...)
span.End()

// Use floating point division here for higher precision (instead of Millisecond method).
elapsedTime := float64(rs.EndTime.Sub(rs.BeginTime)) / float64(time.Millisecond)

c.rpcDuration.Record(ctx, elapsedTime, metric.WithAttributes(metricAttrs...))
if gctx != nil {
c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesReceived), metric.WithAttributes(metricAttrs...))
c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesSent), metric.WithAttributes(metricAttrs...))
}
c.rpcDuration.Record(ctx, elapsedTime, metric.WithAttributes(gctx.attrs...))
c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.metricsInfo.msgReceived), metric.WithAttributes(gctx.attrs...))
c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.metricsInfo.msgSent), metric.WithAttributes(gctx.attrs...))
default:
return
}
Expand Down

0 comments on commit eacf0ac

Please sign in to comment.