/
redisotel.go
128 lines (104 loc) · 2.89 KB
/
redisotel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package redisotel
import (
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
"go.opentelemetry.io/otel/trace"
"github.com/go-redis/redis/extra/rediscmd/v8"
"github.com/go-redis/redis/v8"
)
const (
// todo: consider using the full module path "github.com/go-redis/redis/extra/redisotel"
defaultTracerName = "github.com/go-redis/redis"
)
type TracingHook struct {
tracer trace.Tracer
}
func NewTracingHook(opts ...Option) *TracingHook {
cfg := &config{
tp: otel.GetTracerProvider(),
}
for _, opt := range opts {
opt.apply(cfg)
}
tracer := cfg.tp.Tracer(
defaultTracerName,
// todo: consider adding a version
// trace.WithInstrumentationVersion("semver:8.11.4"),
)
return &TracingHook{tracer: tracer}
}
func (th *TracingHook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
if !trace.SpanFromContext(ctx).IsRecording() {
return ctx, nil
}
opts := []trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(
semconv.DBSystemRedis,
semconv.DBStatementKey.String(rediscmd.CmdString(cmd)),
),
}
ctx, _ = th.tracer.Start(ctx, cmd.FullName(), opts...)
return ctx, nil
}
func (th *TracingHook) AfterProcess(ctx context.Context, cmd redis.Cmder) error {
span := trace.SpanFromContext(ctx)
if err := cmd.Err(); err != nil {
recordError(ctx, span, err)
}
span.End()
return nil
}
func (th *TracingHook) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
if !trace.SpanFromContext(ctx).IsRecording() {
return ctx, nil
}
summary, cmdsString := rediscmd.CmdsString(cmds)
opts := []trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(
semconv.DBSystemRedis,
semconv.DBStatementKey.String(cmdsString),
attribute.Int("db.redis.num_cmd", len(cmds)),
),
}
ctx, _ = th.tracer.Start(ctx, "pipeline "+summary, opts...)
return ctx, nil
}
func (th *TracingHook) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error {
span := trace.SpanFromContext(ctx)
if err := cmds[0].Err(); err != nil {
recordError(ctx, span, err)
}
span.End()
return nil
}
func recordError(ctx context.Context, span trace.Span, err error) {
if err != redis.Nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
}
type config struct {
tp trace.TracerProvider
}
// Option specifies instrumentation configuration options.
type Option interface {
apply(*config)
}
type optionFunc func(*config)
func (o optionFunc) apply(c *config) {
o(c)
}
// WithTracerProvider specifies a tracer provider to use for creating a tracer.
// If none is specified, the global provider is used.
func WithTracerProvider(provider trace.TracerProvider) Option {
return optionFunc(func(cfg *config) {
if provider != nil {
cfg.tp = provider
}
})
}