From 98921972dacf78b62fd66f7e6d3f22814cab1e5e Mon Sep 17 00:00:00 2001 From: Sean Caffery Date: Thu, 14 Jan 2021 09:09:37 +1100 Subject: [PATCH] contrib/go-redis/redis.v8: support wrapping a go-redis v8 client (#808) Introduce a new function, WrapClient, in the go-redis.v8 integration. This is similar to the older go-redis integration in allowing a user to supply an already configured Redis client and have the tracing added to that rather than having the library build and return a client. Fixes #803 --- contrib/go-redis/redis.v8/redis.go | 79 +++++++++++----- contrib/go-redis/redis.v8/redis_test.go | 120 ++++++++++++++++++++++++ 2 files changed, 177 insertions(+), 22 deletions(-) diff --git a/contrib/go-redis/redis.v8/redis.go b/contrib/go-redis/redis.v8/redis.go index 038d905069..497e78bd61 100644 --- a/contrib/go-redis/redis.v8/redis.go +++ b/contrib/go-redis/redis.v8/redis.go @@ -29,34 +29,73 @@ type datadogHook struct { // params holds the tracer and a set of parameters which are recorded with every trace. type params struct { - host string - port string - db string - config *clientConfig + config *clientConfig + additionalTags []ddtrace.StartSpanOption } // NewClient returns a new Client that is traced with the default tracer under // the service name "redis". func NewClient(opt *redis.Options, opts ...ClientOption) redis.UniversalClient { + client := redis.NewClient(opt) + WrapClient(client, opts...) + return client +} + +// WrapClient adds a hook to the given client that traces with the default tracer under +// the service name "redis". +func WrapClient(client redis.UniversalClient, opts ...ClientOption) { cfg := new(clientConfig) defaults(cfg) for _, fn := range opts { fn(cfg) } - host, port, err := net.SplitHostPort(opt.Addr) - if err != nil { - host = opt.Addr - port = "6379" + + hookParams := ¶ms{ + additionalTags: additionalTagOptions(client), + config: cfg, } - params := ¶ms{ - host: host, - port: port, - db: strconv.Itoa(opt.DB), - config: cfg, + + client.AddHook(&datadogHook{params: hookParams}) +} + +type clientOptions interface { + Options() *redis.Options +} + +type clusterOptions interface { + Options() *redis.ClusterOptions +} + +func additionalTagOptions(client redis.UniversalClient) []ddtrace.StartSpanOption { + additionalTags := []ddtrace.StartSpanOption{} + if clientOptions, ok := client.(clientOptions); ok { + opt := clientOptions.Options() + if opt.Addr == "FailoverClient" { + additionalTags = []ddtrace.StartSpanOption{ + tracer.Tag("out.db", strconv.Itoa(opt.DB)), + } + } else { + host, port, err := net.SplitHostPort(opt.Addr) + if err != nil { + host = opt.Addr + port = "6379" + } + additionalTags = []ddtrace.StartSpanOption{ + tracer.Tag(ext.TargetHost, host), + tracer.Tag(ext.TargetPort, port), + tracer.Tag("out.db", strconv.Itoa(opt.DB)), + } + } + } else if clientOptions, ok := client.(clusterOptions); ok { + addrs := []string{} + for _, addr := range clientOptions.Options().Addrs { + addrs = append(addrs, addr) + } + additionalTags = []ddtrace.StartSpanOption{ + tracer.Tag("addrs", strings.Join(addrs, ", ")), + } } - client := redis.NewClient(opt) - client.AddHook(&datadogHook{params: params}) - return client + return additionalTags } func (ddh *datadogHook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) { @@ -68,12 +107,10 @@ func (ddh *datadogHook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (con tracer.SpanType(ext.SpanTypeRedis), tracer.ServiceName(p.config.serviceName), tracer.ResourceName(parts[0]), - tracer.Tag(ext.TargetHost, p.host), - tracer.Tag(ext.TargetPort, p.port), - tracer.Tag("out.db", p.db), tracer.Tag("redis.raw_command", raw), tracer.Tag("redis.args_length", strconv.Itoa(length)), } + opts = append(opts, ddh.additionalTags...) if !math.IsNaN(p.config.analyticsRate) { opts = append(opts, tracer.Tag(ext.EventSampleRate, p.config.analyticsRate)) } @@ -102,14 +139,12 @@ func (ddh *datadogHook) BeforeProcessPipeline(ctx context.Context, cmds []redis. tracer.SpanType(ext.SpanTypeRedis), tracer.ServiceName(p.config.serviceName), tracer.ResourceName(parts[0]), - tracer.Tag(ext.TargetHost, p.host), - tracer.Tag(ext.TargetPort, p.port), - tracer.Tag("out.db", p.db), tracer.Tag("redis.raw_command", raw), tracer.Tag("redis.args_length", strconv.Itoa(length)), tracer.Tag(ext.ResourceName, raw), tracer.Tag("redis.pipeline_length", strconv.Itoa(len(cmds))), } + opts = append(opts, ddh.additionalTags...) if !math.IsNaN(p.config.analyticsRate) { opts = append(opts, tracer.Tag(ext.EventSampleRate, p.config.analyticsRate)) } diff --git a/contrib/go-redis/redis.v8/redis_test.go b/contrib/go-redis/redis.v8/redis_test.go index 9886fa3468..bd68d4bdf0 100644 --- a/contrib/go-redis/redis.v8/redis_test.go +++ b/contrib/go-redis/redis.v8/redis_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" @@ -84,6 +85,125 @@ func TestClient(t *testing.T) { assert.Equal("3", span.Tag("redis.args_length")) } +func TestWrapClient(t *testing.T) { + simpleClientOpts := &redis.UniversalOptions{Addrs: []string{"127.0.0.1:6379"}} + simpleClient := redis.NewUniversalClient(simpleClientOpts) + + failoverClientOpts := &redis.UniversalOptions{ + MasterName: "leader.redis.host", + Addrs: []string{ + "127.0.0.1:6379", + "127.0.0.2:6379", + }} + failoverClient := redis.NewUniversalClient(failoverClientOpts) + + clusterClientOpts := &redis.UniversalOptions{ + Addrs: []string{ + "127.0.0.1:6379", + "127.0.0.2:6379", + }, + DialTimeout: 1} + clusterClient := redis.NewUniversalClient(clusterClientOpts) + + testCases := []struct { + name string + client redis.UniversalClient + }{ + { + name: "simple-client", + client: simpleClient, + }, + { + name: "failover-client", + client: failoverClient, + }, + { + name: "cluster-client", + client: clusterClient, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + assert := assert.New(t) + mt := mocktracer.Start() + defer mt.Stop() + + WrapClient(tc.client, WithServiceName("my-redis")) + tc.client.Set(ctx, "test_key", "test_value", 0) + + spans := mt.FinishedSpans() + assert.Len(spans, 1) + + span := spans[0] + assert.Equal("redis.command", span.OperationName()) + assert.Equal(ext.SpanTypeRedis, span.Tag(ext.SpanType)) + assert.Equal("my-redis", span.Tag(ext.ServiceName)) + assert.Equal("set test_key test_value: ", span.Tag("redis.raw_command")) + assert.Equal("3", span.Tag("redis.args_length")) + }) + } +} + +func TestAdditionalTagsFromClient(t *testing.T) { + t.Run("simple-client", func(t *testing.T) { + simpleClientOpts := &redis.UniversalOptions{Addrs: []string{"127.0.0.1:6379"}} + simpleClient := redis.NewUniversalClient(simpleClientOpts) + config := &ddtrace.StartSpanConfig{} + expectedTags := map[string]interface{}{ + "out.db": "0", + "out.host": "127.0.0.1", + "out.port": "6379", + } + + additionalTagOptions := additionalTagOptions(simpleClient) + for _, t := range additionalTagOptions { + t(config) + } + assert.Equal(t, expectedTags, config.Tags) + }) + + t.Run("failover-client", func(t *testing.T) { + failoverClientOpts := &redis.UniversalOptions{ + MasterName: "leader.redis.host", + Addrs: []string{ + "127.0.0.1:6379", + "127.0.0.2:6379", + }} + failoverClient := redis.NewUniversalClient(failoverClientOpts) + config := &ddtrace.StartSpanConfig{} + expectedTags := map[string]interface{}{ + "out.db": "0", + } + + additionalTagOptions := additionalTagOptions(failoverClient) + for _, t := range additionalTagOptions { + t(config) + } + assert.Equal(t, expectedTags, config.Tags) + }) + + t.Run("cluster-client", func(t *testing.T) { + clusterClientOpts := &redis.UniversalOptions{ + Addrs: []string{ + "127.0.0.1:6379", + "127.0.0.2:6379", + }, + DialTimeout: 1} + clusterClient := redis.NewUniversalClient(clusterClientOpts) + config := &ddtrace.StartSpanConfig{} + expectedTags := map[string]interface{}{ + "addrs": "127.0.0.1:6379, 127.0.0.2:6379", + } + + additionalTagOptions := additionalTagOptions(clusterClient) + for _, t := range additionalTagOptions { + t(config) + } + assert.Equal(t, expectedTags, config.Tags) + }) +} + func TestPipeline(t *testing.T) { ctx := context.Background() opts := &redis.Options{Addr: "127.0.0.1:6379"}