Skip to content

Commit

Permalink
contrib/go-redis/redis.v8: support wrapping a go-redis v8 client (#808)
Browse files Browse the repository at this point in the history
Introduce a new function, WrapClient, in the go-redis.v8 integration. This
is similar to the older go-redis integration in allowing a user to supply
an already configured Redis client and have the tracing added to that
rather than having the library build and return a client.

Fixes #803
  • Loading branch information
seancaffery committed Jan 13, 2021
1 parent 3ed2a27 commit 9892197
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 22 deletions.
79 changes: 57 additions & 22 deletions contrib/go-redis/redis.v8/redis.go
Expand Up @@ -29,34 +29,73 @@ type datadogHook struct {

// params holds the tracer and a set of parameters which are recorded with every trace.
type params struct {
host string
port string
db string
config *clientConfig
config *clientConfig
additionalTags []ddtrace.StartSpanOption
}

// NewClient returns a new Client that is traced with the default tracer under
// the service name "redis".
func NewClient(opt *redis.Options, opts ...ClientOption) redis.UniversalClient {
client := redis.NewClient(opt)
WrapClient(client, opts...)
return client
}

// WrapClient adds a hook to the given client that traces with the default tracer under
// the service name "redis".
func WrapClient(client redis.UniversalClient, opts ...ClientOption) {
cfg := new(clientConfig)
defaults(cfg)
for _, fn := range opts {
fn(cfg)
}
host, port, err := net.SplitHostPort(opt.Addr)
if err != nil {
host = opt.Addr
port = "6379"

hookParams := &params{
additionalTags: additionalTagOptions(client),
config: cfg,
}
params := &params{
host: host,
port: port,
db: strconv.Itoa(opt.DB),
config: cfg,

client.AddHook(&datadogHook{params: hookParams})
}

type clientOptions interface {
Options() *redis.Options
}

type clusterOptions interface {
Options() *redis.ClusterOptions
}

func additionalTagOptions(client redis.UniversalClient) []ddtrace.StartSpanOption {
additionalTags := []ddtrace.StartSpanOption{}
if clientOptions, ok := client.(clientOptions); ok {
opt := clientOptions.Options()
if opt.Addr == "FailoverClient" {
additionalTags = []ddtrace.StartSpanOption{
tracer.Tag("out.db", strconv.Itoa(opt.DB)),
}
} else {
host, port, err := net.SplitHostPort(opt.Addr)
if err != nil {
host = opt.Addr
port = "6379"
}
additionalTags = []ddtrace.StartSpanOption{
tracer.Tag(ext.TargetHost, host),
tracer.Tag(ext.TargetPort, port),
tracer.Tag("out.db", strconv.Itoa(opt.DB)),
}
}
} else if clientOptions, ok := client.(clusterOptions); ok {
addrs := []string{}
for _, addr := range clientOptions.Options().Addrs {
addrs = append(addrs, addr)
}
additionalTags = []ddtrace.StartSpanOption{
tracer.Tag("addrs", strings.Join(addrs, ", ")),
}
}
client := redis.NewClient(opt)
client.AddHook(&datadogHook{params: params})
return client
return additionalTags
}

func (ddh *datadogHook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
Expand All @@ -68,12 +107,10 @@ func (ddh *datadogHook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (con
tracer.SpanType(ext.SpanTypeRedis),
tracer.ServiceName(p.config.serviceName),
tracer.ResourceName(parts[0]),
tracer.Tag(ext.TargetHost, p.host),
tracer.Tag(ext.TargetPort, p.port),
tracer.Tag("out.db", p.db),
tracer.Tag("redis.raw_command", raw),
tracer.Tag("redis.args_length", strconv.Itoa(length)),
}
opts = append(opts, ddh.additionalTags...)
if !math.IsNaN(p.config.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, p.config.analyticsRate))
}
Expand Down Expand Up @@ -102,14 +139,12 @@ func (ddh *datadogHook) BeforeProcessPipeline(ctx context.Context, cmds []redis.
tracer.SpanType(ext.SpanTypeRedis),
tracer.ServiceName(p.config.serviceName),
tracer.ResourceName(parts[0]),
tracer.Tag(ext.TargetHost, p.host),
tracer.Tag(ext.TargetPort, p.port),
tracer.Tag("out.db", p.db),
tracer.Tag("redis.raw_command", raw),
tracer.Tag("redis.args_length", strconv.Itoa(length)),
tracer.Tag(ext.ResourceName, raw),
tracer.Tag("redis.pipeline_length", strconv.Itoa(len(cmds))),
}
opts = append(opts, ddh.additionalTags...)
if !math.IsNaN(p.config.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, p.config.analyticsRate))
}
Expand Down
120 changes: 120 additions & 0 deletions contrib/go-redis/redis.v8/redis_test.go
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
Expand Down Expand Up @@ -84,6 +85,125 @@ func TestClient(t *testing.T) {
assert.Equal("3", span.Tag("redis.args_length"))
}

func TestWrapClient(t *testing.T) {
simpleClientOpts := &redis.UniversalOptions{Addrs: []string{"127.0.0.1:6379"}}
simpleClient := redis.NewUniversalClient(simpleClientOpts)

failoverClientOpts := &redis.UniversalOptions{
MasterName: "leader.redis.host",
Addrs: []string{
"127.0.0.1:6379",
"127.0.0.2:6379",
}}
failoverClient := redis.NewUniversalClient(failoverClientOpts)

clusterClientOpts := &redis.UniversalOptions{
Addrs: []string{
"127.0.0.1:6379",
"127.0.0.2:6379",
},
DialTimeout: 1}
clusterClient := redis.NewUniversalClient(clusterClientOpts)

testCases := []struct {
name string
client redis.UniversalClient
}{
{
name: "simple-client",
client: simpleClient,
},
{
name: "failover-client",
client: failoverClient,
},
{
name: "cluster-client",
client: clusterClient,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
assert := assert.New(t)
mt := mocktracer.Start()
defer mt.Stop()

WrapClient(tc.client, WithServiceName("my-redis"))
tc.client.Set(ctx, "test_key", "test_value", 0)

spans := mt.FinishedSpans()
assert.Len(spans, 1)

span := spans[0]
assert.Equal("redis.command", span.OperationName())
assert.Equal(ext.SpanTypeRedis, span.Tag(ext.SpanType))
assert.Equal("my-redis", span.Tag(ext.ServiceName))
assert.Equal("set test_key test_value: ", span.Tag("redis.raw_command"))
assert.Equal("3", span.Tag("redis.args_length"))
})
}
}

func TestAdditionalTagsFromClient(t *testing.T) {
t.Run("simple-client", func(t *testing.T) {
simpleClientOpts := &redis.UniversalOptions{Addrs: []string{"127.0.0.1:6379"}}
simpleClient := redis.NewUniversalClient(simpleClientOpts)
config := &ddtrace.StartSpanConfig{}
expectedTags := map[string]interface{}{
"out.db": "0",
"out.host": "127.0.0.1",
"out.port": "6379",
}

additionalTagOptions := additionalTagOptions(simpleClient)
for _, t := range additionalTagOptions {
t(config)
}
assert.Equal(t, expectedTags, config.Tags)
})

t.Run("failover-client", func(t *testing.T) {
failoverClientOpts := &redis.UniversalOptions{
MasterName: "leader.redis.host",
Addrs: []string{
"127.0.0.1:6379",
"127.0.0.2:6379",
}}
failoverClient := redis.NewUniversalClient(failoverClientOpts)
config := &ddtrace.StartSpanConfig{}
expectedTags := map[string]interface{}{
"out.db": "0",
}

additionalTagOptions := additionalTagOptions(failoverClient)
for _, t := range additionalTagOptions {
t(config)
}
assert.Equal(t, expectedTags, config.Tags)
})

t.Run("cluster-client", func(t *testing.T) {
clusterClientOpts := &redis.UniversalOptions{
Addrs: []string{
"127.0.0.1:6379",
"127.0.0.2:6379",
},
DialTimeout: 1}
clusterClient := redis.NewUniversalClient(clusterClientOpts)
config := &ddtrace.StartSpanConfig{}
expectedTags := map[string]interface{}{
"addrs": "127.0.0.1:6379, 127.0.0.2:6379",
}

additionalTagOptions := additionalTagOptions(clusterClient)
for _, t := range additionalTagOptions {
t(config)
}
assert.Equal(t, expectedTags, config.Tags)
})
}

func TestPipeline(t *testing.T) {
ctx := context.Background()
opts := &redis.Options{Addr: "127.0.0.1:6379"}
Expand Down

0 comments on commit 9892197

Please sign in to comment.