From 870d284ede60acdf9c89d0ea60b845420c9ee330 Mon Sep 17 00:00:00 2001 From: Maxime mouial Date: Fri, 9 Oct 2020 12:09:53 +0200 Subject: [PATCH 1/2] Adding a "DevMode" to the client The new mode will send more telemetry metrics allowing users to more easily troubleshoot issues and usage of the client. --- statsd/aggregator.go | 23 +++++++++----- statsd/options.go | 23 ++++++++++++++ statsd/options_test.go | 3 ++ statsd/statsd.go | 53 +++++++++++++++++++++----------- statsd/statsd_test.go | 12 ++++++++ statsd/telemetry.go | 35 ++++++++++++++++------ statsd/telemetry_test.go | 65 ++++++++++++++++++++++++++++++++++++---- 7 files changed, 175 insertions(+), 39 deletions(-) diff --git a/statsd/aggregator.go b/statsd/aggregator.go index 010b92e4..9eee96ec 100644 --- a/statsd/aggregator.go +++ b/statsd/aggregator.go @@ -14,7 +14,10 @@ type ( ) type aggregator struct { - nbContext int32 + nbContext int32 + nbContextGauge int32 + nbContextCount int32 + nbContextSet int32 countsM sync.RWMutex gaugesM sync.RWMutex @@ -31,7 +34,10 @@ type aggregator struct { } type aggregatorMetrics struct { - nbContext int32 + nbContext int32 + nbContextGauge int32 + nbContextCount int32 + nbContextSet int32 } func newAggregator(c *Client) *aggregator { @@ -79,7 +85,10 @@ func (a *aggregator) flushTelemetryMetrics() *aggregatorMetrics { } return &aggregatorMetrics{ - nbContext: a.nbContext, + nbContext: a.nbContext, + nbContextGauge: a.nbContextGauge, + nbContextCount: a.nbContextCount, + nbContextSet: a.nbContextSet, } } @@ -94,8 +103,6 @@ func (a *aggregator) flushMetrics() []metric { a.sets = setsMap{} a.setsM.Unlock() - atomic.StoreInt32(&a.nbContext, int32(len(sets))) - for _, s := range sets { metrics = append(metrics, s.flushUnsafe()...) } @@ -105,7 +112,6 @@ func (a *aggregator) flushMetrics() []metric { a.gauges = gaugesMap{} a.gaugesM.Unlock() - atomic.AddInt32(&a.nbContext, int32(len(gauges))) for _, g := range gauges { metrics = append(metrics, g.flushUnsafe()) } @@ -115,11 +121,14 @@ func (a *aggregator) flushMetrics() []metric { a.counts = countsMap{} a.countsM.Unlock() - atomic.AddInt32(&a.nbContext, int32(len(counts))) for _, c := range counts { metrics = append(metrics, c.flushUnsafe()) } + atomic.StoreInt32(&a.nbContextCount, int32(len(counts))) + atomic.StoreInt32(&a.nbContextGauge, int32(len(gauges))) + atomic.StoreInt32(&a.nbContextSet, int32(len(sets))) + atomic.StoreInt32(&a.nbContext, int32(len(sets)+len(gauges)+len(counts))) return metrics } diff --git a/statsd/options.go b/statsd/options.go index 189f7f7c..eec996a3 100644 --- a/statsd/options.go +++ b/statsd/options.go @@ -35,6 +35,8 @@ var ( DefaultAggregationFlushInterval = 3 * time.Second // DefaultAggregation DefaultAggregation = false + // DevMode + DevMode = false ) // Options contains the configuration options for a client. @@ -97,6 +99,9 @@ type Options struct { Aggregation bool // TelemetryAddr specify a different endpoint for telemetry metrics. TelemetryAddr string + // DevMode enables the "dev" mode where the client sends much more + // telemetry metrics to help troubleshooting the client behavior. + DevMode bool } func resolveOptions(options []Option) (*Options, error) { @@ -269,3 +274,21 @@ func WithTelemetryAddr(addr string) Option { return nil } } + +// WithDevMode enables client "dev" mode, sending more Telemetry metrics to +// help troubleshoot client behavior. +func WithDevMode() Option { + return func(o *Options) error { + o.DevMode = true + return nil + } +} + +// WithoutDevMode disables client "dev" mode, sending more Telemetry metrics to +// help troubleshoot client behavior. +func WithoutDevMode() Option { + return func(o *Options) error { + o.DevMode = false + return nil + } +} diff --git a/statsd/options_test.go b/statsd/options_test.go index 0f97d951..c4a4c2a9 100644 --- a/statsd/options_test.go +++ b/statsd/options_test.go @@ -26,6 +26,7 @@ func TestDefaultOptions(t *testing.T) { assert.Equal(t, options.AggregationFlushInterval, DefaultAggregationFlushInterval) assert.Equal(t, options.Aggregation, DefaultAggregation) assert.Zero(t, options.TelemetryAddr) + assert.False(t, options.DevMode) } func TestOptions(t *testing.T) { @@ -58,6 +59,7 @@ func TestOptions(t *testing.T) { WithAggregationInterval(testAggregationWindow), WithClientSideAggregation(), WithTelemetryAddr(testTelemetryAddr), + WithDevMode(), }) assert.NoError(t, err) @@ -76,6 +78,7 @@ func TestOptions(t *testing.T) { assert.Equal(t, options.AggregationFlushInterval, testAggregationWindow) assert.Equal(t, options.Aggregation, true) assert.Equal(t, options.TelemetryAddr, testTelemetryAddr) + assert.True(t, options.DevMode) } func TestResetOptions(t *testing.T) { diff --git a/statsd/statsd.go b/statsd/statsd.go index 22e442a8..357ee8c1 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -206,10 +206,16 @@ type Client struct { // ClientMetrics contains metrics about the client type ClientMetrics struct { - TotalMetrics uint64 - TotalEvents uint64 - TotalServiceChecks uint64 - TotalDroppedOnReceive uint64 + TotalMetrics uint64 + TotalMetricsGauge uint64 + TotalMetricsCount uint64 + TotalMetricsHistogram uint64 + TotalMetricsDistribution uint64 + TotalMetricsSet uint64 + TotalMetricsTiming uint64 + TotalEvents uint64 + TotalServiceChecks uint64 + TotalDroppedOnReceive uint64 } // Verify that Client implements the ClientInterface. @@ -326,10 +332,10 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro if o.Telemetry { if o.TelemetryAddr == "" { - c.telemetry = NewTelemetryClient(&c, writerName) + c.telemetry = NewTelemetryClient(&c, writerName, o.DevMode) } else { var err error - c.telemetry, err = NewTelemetryClientWithCustomAddr(&c, writerName, o.TelemetryAddr, bufferPool) + c.telemetry, err = NewTelemetryClientWithCustomAddr(&c, writerName, o.DevMode, o.TelemetryAddr, bufferPool) if err != nil { return nil, err } @@ -396,12 +402,23 @@ func (c *Client) Flush() error { } func (c *Client) FlushTelemetryMetrics() ClientMetrics { - return ClientMetrics{ - TotalMetrics: atomic.SwapUint64(&c.metrics.TotalMetrics, 0), - TotalEvents: atomic.SwapUint64(&c.metrics.TotalEvents, 0), - TotalServiceChecks: atomic.SwapUint64(&c.metrics.TotalServiceChecks, 0), - TotalDroppedOnReceive: atomic.SwapUint64(&c.metrics.TotalDroppedOnReceive, 0), - } + cm := ClientMetrics{ + TotalMetricsGauge: atomic.SwapUint64(&c.metrics.TotalMetricsGauge, 0), + TotalMetricsCount: atomic.SwapUint64(&c.metrics.TotalMetricsCount, 0), + TotalMetricsSet: atomic.SwapUint64(&c.metrics.TotalMetricsSet, 0), + TotalMetricsHistogram: atomic.SwapUint64(&c.metrics.TotalMetricsHistogram, 0), + TotalMetricsDistribution: atomic.SwapUint64(&c.metrics.TotalMetricsDistribution, 0), + TotalMetricsTiming: atomic.SwapUint64(&c.metrics.TotalMetricsTiming, 0), + TotalEvents: atomic.SwapUint64(&c.metrics.TotalEvents, 0), + TotalServiceChecks: atomic.SwapUint64(&c.metrics.TotalServiceChecks, 0), + TotalDroppedOnReceive: atomic.SwapUint64(&c.metrics.TotalDroppedOnReceive, 0), + } + + cm.TotalMetrics = cm.TotalMetricsGauge + cm.TotalMetricsCount + + cm.TotalMetricsSet + cm.TotalMetricsHistogram + + cm.TotalMetricsDistribution + cm.TotalMetricsTiming + + return cm } func (c *Client) send(m metric) error { @@ -431,7 +448,7 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64) if c == nil { return ErrNoClient } - atomic.AddUint64(&c.metrics.TotalMetrics, 1) + atomic.AddUint64(&c.metrics.TotalMetricsGauge, 1) if c.agg != nil { return c.agg.gauge(name, value, tags, rate) } @@ -443,7 +460,7 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er if c == nil { return ErrNoClient } - atomic.AddUint64(&c.metrics.TotalMetrics, 1) + atomic.AddUint64(&c.metrics.TotalMetricsCount, 1) if c.agg != nil { return c.agg.count(name, value, tags, rate) } @@ -455,7 +472,7 @@ func (c *Client) Histogram(name string, value float64, tags []string, rate float if c == nil { return ErrNoClient } - atomic.AddUint64(&c.metrics.TotalMetrics, 1) + atomic.AddUint64(&c.metrics.TotalMetricsHistogram, 1) return c.send(metric{metricType: histogram, name: name, fvalue: value, tags: tags, rate: rate}) } @@ -464,7 +481,7 @@ func (c *Client) Distribution(name string, value float64, tags []string, rate fl if c == nil { return ErrNoClient } - atomic.AddUint64(&c.metrics.TotalMetrics, 1) + atomic.AddUint64(&c.metrics.TotalMetricsDistribution, 1) return c.send(metric{metricType: distribution, name: name, fvalue: value, tags: tags, rate: rate}) } @@ -483,7 +500,7 @@ func (c *Client) Set(name string, value string, tags []string, rate float64) err if c == nil { return ErrNoClient } - atomic.AddUint64(&c.metrics.TotalMetrics, 1) + atomic.AddUint64(&c.metrics.TotalMetricsSet, 1) if c.agg != nil { return c.agg.set(name, value, tags, rate) } @@ -501,7 +518,7 @@ func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, r if c == nil { return ErrNoClient } - atomic.AddUint64(&c.metrics.TotalMetrics, 1) + atomic.AddUint64(&c.metrics.TotalMetricsTiming, 1) return c.send(metric{metricType: timing, name: name, fvalue: value, tags: tags, rate: rate}) } diff --git a/statsd/statsd_test.go b/statsd/statsd_test.go index 7cd55e59..987e9159 100644 --- a/statsd/statsd_test.go +++ b/statsd/statsd_test.go @@ -76,6 +76,7 @@ func TestChannelMode(t *testing.T) { client, err := New(addr, WithChannelMode()) require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err)) + assert.False(t, client.telemetry.devMode) testStatsdPipeline(t, client, addr) } @@ -85,6 +86,17 @@ func TestMutexMode(t *testing.T) { client, err := New(addr) require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err)) + assert.False(t, client.telemetry.devMode) + + testStatsdPipeline(t, client, addr) +} + +func TestDevMode(t *testing.T) { + addr := "localhost:1201" + + client, err := New(addr, WithDevMode()) + require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err)) + assert.True(t, client.telemetry.devMode) testStatsdPipeline(t, client, addr) } diff --git a/statsd/telemetry.go b/statsd/telemetry.go index ce57bf8a..0516d308 100644 --- a/statsd/telemetry.go +++ b/statsd/telemetry.go @@ -22,26 +22,28 @@ clientVersionTelemetryTag is a tag identifying this specific client version. var clientVersionTelemetryTag = "client_version:4.0.1" type telemetryClient struct { - c *Client - tags []string - sender *sender - worker *worker + c *Client + tags []string + sender *sender + worker *worker + devMode bool } -func NewTelemetryClient(c *Client, transport string) *telemetryClient { +func NewTelemetryClient(c *Client, transport string, devMode bool) *telemetryClient { return &telemetryClient{ - c: c, - tags: append(c.Tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+transport), + c: c, + tags: append(c.Tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+transport), + devMode: devMode, } } -func NewTelemetryClientWithCustomAddr(c *Client, transport string, telemetryAddr string, pool *bufferPool) (*telemetryClient, error) { +func NewTelemetryClientWithCustomAddr(c *Client, transport string, devMode bool, telemetryAddr string, pool *bufferPool) (*telemetryClient, error) { telemetryWriter, _, err := resolveAddr(telemetryAddr) if err != nil { return nil, fmt.Errorf("Could not resolve telemetry address: %v", err) } - t := NewTelemetryClient(c, transport) + t := NewTelemetryClient(c, transport, devMode) // Creating a custom sender/worker with 1 worker in mutex mode for the // telemetry that share the same bufferPool. @@ -97,6 +99,15 @@ func (t *telemetryClient) flush() []metric { clientMetrics := t.c.FlushTelemetryMetrics() telemetryCount("datadog.dogstatsd.client.metrics", int64(clientMetrics.TotalMetrics)) + if t.devMode { + telemetryCount("datadog.dogstatsd.client.metricsGauge", int64(clientMetrics.TotalMetricsGauge)) + telemetryCount("datadog.dogstatsd.client.metricsCount", int64(clientMetrics.TotalMetricsCount)) + telemetryCount("datadog.dogstatsd.client.metricsHistogram", int64(clientMetrics.TotalMetricsHistogram)) + telemetryCount("datadog.dogstatsd.client.metricsDistribution", int64(clientMetrics.TotalMetricsDistribution)) + telemetryCount("datadog.dogstatsd.client.metricsSet", int64(clientMetrics.TotalMetricsSet)) + telemetryCount("datadog.dogstatsd.client.metricsTiming", int64(clientMetrics.TotalMetricsTiming)) + } + telemetryCount("datadog.dogstatsd.client.events", int64(clientMetrics.TotalEvents)) telemetryCount("datadog.dogstatsd.client.service_checks", int64(clientMetrics.TotalServiceChecks)) telemetryCount("datadog.dogstatsd.client.metric_dropped_on_receive", int64(clientMetrics.TotalDroppedOnReceive)) @@ -113,6 +124,12 @@ func (t *telemetryClient) flush() []metric { if aggMetrics := t.c.agg.flushTelemetryMetrics(); aggMetrics != nil { telemetryCount("datadog.dogstatsd.client.aggregated_context", int64(aggMetrics.nbContext)) + if t.devMode { + telemetryCount("datadog.dogstatsd.client.aggregated_context_gauge", int64(aggMetrics.nbContextGauge)) + telemetryCount("datadog.dogstatsd.client.aggregated_context_set", int64(aggMetrics.nbContextSet)) + telemetryCount("datadog.dogstatsd.client.aggregated_context_count", int64(aggMetrics.nbContextCount)) + } } + return m } diff --git a/statsd/telemetry_test.go b/statsd/telemetry_test.go index 5e6f7ada..a23b3cf7 100644 --- a/statsd/telemetry_test.go +++ b/statsd/telemetry_test.go @@ -27,11 +27,26 @@ var basicExpectedMetrics = map[string]int64{ "datadog.dogstatsd.client.bytes_dropped_writer": 0, } +var devModeExpectedMetrics = map[string]int64{ + "datadog.dogstatsd.client.metricsGauge": 1, + "datadog.dogstatsd.client.metricsCount": 3, + "datadog.dogstatsd.client.metricsHistogram": 1, + "datadog.dogstatsd.client.metricsDistribution": 1, + "datadog.dogstatsd.client.metricsSet": 1, + "datadog.dogstatsd.client.metricsTiming": 2, +} + +var devModeAggregationExpectedMetrics = map[string]int64{ + "datadog.dogstatsd.client.aggregated_context_gauge": 1, + "datadog.dogstatsd.client.aggregated_context_set": 1, + "datadog.dogstatsd.client.aggregated_context_count": 3, +} + func TestNewTelemetry(t *testing.T) { client, err := New("localhost:8125", WithoutTelemetry(), WithNamespace("test_namespace")) require.Nil(t, err) - telemetry := NewTelemetryClient(client, "test_transport") + telemetry := NewTelemetryClient(client, "test_transport", false) assert.NotNil(t, telemetry) assert.Equal(t, telemetry.c, client) @@ -80,16 +95,33 @@ func TestTelemetry(t *testing.T) { client, err := New("localhost:8125", WithoutTelemetry()) require.Nil(t, err) - telemetry := NewTelemetryClient(client, "test_transport") + telemetry := NewTelemetryClient(client, "test_transport", false) testTelemetry(t, telemetry, basicExpectedMetrics, basicExpectedTags) } +func TestTelemetryDevMode(t *testing.T) { + // disabling autoflush of the telemetry + client, err := New("localhost:8125", WithoutTelemetry(), WithDevMode()) + require.Nil(t, err) + + expectedMetrics := map[string]int64{} + for k, v := range basicExpectedMetrics { + expectedMetrics[k] = v + } + for k, v := range devModeExpectedMetrics { + expectedMetrics[k] = v + } + + telemetry := NewTelemetryClient(client, "test_transport", true) + testTelemetry(t, telemetry, expectedMetrics, basicExpectedTags) +} + func TestTelemetryChannelMode(t *testing.T) { // disabling autoflush of the telemetry client, err := New("localhost:8125", WithoutTelemetry(), WithChannelMode()) require.Nil(t, err) - telemetry := NewTelemetryClient(client, "test_transport") + telemetry := NewTelemetryClient(client, "test_transport", false) testTelemetry(t, telemetry, basicExpectedMetrics, basicExpectedTags) } @@ -101,7 +133,7 @@ func TestTelemetryWithGlobalTags(t *testing.T) { client, err := New("localhost:8125", WithoutTelemetry(), WithTags([]string{"tag1", "tag2"})) require.Nil(t, err) - telemetry := NewTelemetryClient(client, "test_transport") + telemetry := NewTelemetryClient(client, "test_transport", false) expectedTelemetryTags := append([]string{"tag1", "tag2", "env:test"}, basicExpectedTags...) testTelemetry(t, telemetry, basicExpectedMetrics, expectedTelemetryTags) @@ -112,7 +144,24 @@ func TestTelemetryWithAggregation(t *testing.T) { client, err := New("localhost:8125", WithoutTelemetry(), WithClientSideAggregation()) require.Nil(t, err) - telemetry := NewTelemetryClient(client, "test_transport") + telemetry := NewTelemetryClient(client, "test_transport", false) + + expectedMetrics := map[string]int64{ + "datadog.dogstatsd.client.aggregated_context": 5, + } + for k, v := range basicExpectedMetrics { + expectedMetrics[k] = v + } + + testTelemetry(t, telemetry, expectedMetrics, basicExpectedTags) +} + +func TestTelemetryWithAggregationDevMode(t *testing.T) { + // disabling autoflush of the telemetry + client, err := New("localhost:8125", WithoutTelemetry(), WithClientSideAggregation(), WithDevMode()) + require.Nil(t, err) + + telemetry := NewTelemetryClient(client, "test_transport", true) expectedMetrics := map[string]int64{ "datadog.dogstatsd.client.aggregated_context": 5, @@ -120,6 +169,12 @@ func TestTelemetryWithAggregation(t *testing.T) { for k, v := range basicExpectedMetrics { expectedMetrics[k] = v } + for k, v := range devModeExpectedMetrics { + expectedMetrics[k] = v + } + for k, v := range devModeAggregationExpectedMetrics { + expectedMetrics[k] = v + } testTelemetry(t, telemetry, expectedMetrics, basicExpectedTags) } From d78d533386f48d599d8929b841e2831995411d3f Mon Sep 17 00:00:00 2001 From: Maxime mouial Date: Fri, 9 Oct 2020 12:36:37 +0200 Subject: [PATCH 2/2] Makin telemetry creator functions private Those function should not have been public in the first place and there signature should not be part of the public API we support. Users shouldn't have any reason to use them as they can control everything through the client options. --- statsd/aggregator.go | 20 ++++++++++---------- statsd/options.go | 5 +++-- statsd/statsd.go | 4 ++-- statsd/telemetry.go | 6 +++--- statsd/telemetry_test.go | 14 +++++++------- 5 files changed, 25 insertions(+), 24 deletions(-) diff --git a/statsd/aggregator.go b/statsd/aggregator.go index 9eee96ec..91c8e692 100644 --- a/statsd/aggregator.go +++ b/statsd/aggregator.go @@ -14,7 +14,6 @@ type ( ) type aggregator struct { - nbContext int32 nbContextGauge int32 nbContextCount int32 nbContextSet int32 @@ -84,12 +83,14 @@ func (a *aggregator) flushTelemetryMetrics() *aggregatorMetrics { return nil } - return &aggregatorMetrics{ - nbContext: a.nbContext, - nbContextGauge: a.nbContextGauge, - nbContextCount: a.nbContextCount, - nbContextSet: a.nbContextSet, + am := &aggregatorMetrics{ + nbContextGauge: atomic.SwapInt32(&a.nbContextGauge, 0), + nbContextCount: atomic.SwapInt32(&a.nbContextCount, 0), + nbContextSet: atomic.SwapInt32(&a.nbContextSet, 0), } + + am.nbContext = am.nbContextGauge + am.nbContextCount + am.nbContextSet + return am } func (a *aggregator) flushMetrics() []metric { @@ -125,10 +126,9 @@ func (a *aggregator) flushMetrics() []metric { metrics = append(metrics, c.flushUnsafe()) } - atomic.StoreInt32(&a.nbContextCount, int32(len(counts))) - atomic.StoreInt32(&a.nbContextGauge, int32(len(gauges))) - atomic.StoreInt32(&a.nbContextSet, int32(len(sets))) - atomic.StoreInt32(&a.nbContext, int32(len(sets)+len(gauges)+len(counts))) + atomic.AddInt32(&a.nbContextCount, int32(len(counts))) + atomic.AddInt32(&a.nbContextGauge, int32(len(gauges))) + atomic.AddInt32(&a.nbContextSet, int32(len(sets))) return metrics } diff --git a/statsd/options.go b/statsd/options.go index eec996a3..fef96c0a 100644 --- a/statsd/options.go +++ b/statsd/options.go @@ -35,8 +35,8 @@ var ( DefaultAggregationFlushInterval = 3 * time.Second // DefaultAggregation DefaultAggregation = false - // DevMode - DevMode = false + // DefaultDevMode + DefaultDevMode = false ) // Options contains the configuration options for a client. @@ -120,6 +120,7 @@ func resolveOptions(options []Option) (*Options, error) { ChannelModeBufferSize: DefaultChannelModeBufferSize, AggregationFlushInterval: DefaultAggregationFlushInterval, Aggregation: DefaultAggregation, + DevMode: DefaultDevMode, } for _, option := range options { diff --git a/statsd/statsd.go b/statsd/statsd.go index 357ee8c1..690d0b5e 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -332,10 +332,10 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro if o.Telemetry { if o.TelemetryAddr == "" { - c.telemetry = NewTelemetryClient(&c, writerName, o.DevMode) + c.telemetry = newTelemetryClient(&c, writerName, o.DevMode) } else { var err error - c.telemetry, err = NewTelemetryClientWithCustomAddr(&c, writerName, o.DevMode, o.TelemetryAddr, bufferPool) + c.telemetry, err = newTelemetryClientWithCustomAddr(&c, writerName, o.DevMode, o.TelemetryAddr, bufferPool) if err != nil { return nil, err } diff --git a/statsd/telemetry.go b/statsd/telemetry.go index 0516d308..8213e9eb 100644 --- a/statsd/telemetry.go +++ b/statsd/telemetry.go @@ -29,7 +29,7 @@ type telemetryClient struct { devMode bool } -func NewTelemetryClient(c *Client, transport string, devMode bool) *telemetryClient { +func newTelemetryClient(c *Client, transport string, devMode bool) *telemetryClient { return &telemetryClient{ c: c, tags: append(c.Tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+transport), @@ -37,13 +37,13 @@ func NewTelemetryClient(c *Client, transport string, devMode bool) *telemetryCli } } -func NewTelemetryClientWithCustomAddr(c *Client, transport string, devMode bool, telemetryAddr string, pool *bufferPool) (*telemetryClient, error) { +func newTelemetryClientWithCustomAddr(c *Client, transport string, devMode bool, telemetryAddr string, pool *bufferPool) (*telemetryClient, error) { telemetryWriter, _, err := resolveAddr(telemetryAddr) if err != nil { return nil, fmt.Errorf("Could not resolve telemetry address: %v", err) } - t := NewTelemetryClient(c, transport, devMode) + t := newTelemetryClient(c, transport, devMode) // Creating a custom sender/worker with 1 worker in mutex mode for the // telemetry that share the same bufferPool. diff --git a/statsd/telemetry_test.go b/statsd/telemetry_test.go index a23b3cf7..5017b69b 100644 --- a/statsd/telemetry_test.go +++ b/statsd/telemetry_test.go @@ -46,7 +46,7 @@ func TestNewTelemetry(t *testing.T) { client, err := New("localhost:8125", WithoutTelemetry(), WithNamespace("test_namespace")) require.Nil(t, err) - telemetry := NewTelemetryClient(client, "test_transport", false) + telemetry := newTelemetryClient(client, "test_transport", false) assert.NotNil(t, telemetry) assert.Equal(t, telemetry.c, client) @@ -95,7 +95,7 @@ func TestTelemetry(t *testing.T) { client, err := New("localhost:8125", WithoutTelemetry()) require.Nil(t, err) - telemetry := NewTelemetryClient(client, "test_transport", false) + telemetry := newTelemetryClient(client, "test_transport", false) testTelemetry(t, telemetry, basicExpectedMetrics, basicExpectedTags) } @@ -112,7 +112,7 @@ func TestTelemetryDevMode(t *testing.T) { expectedMetrics[k] = v } - telemetry := NewTelemetryClient(client, "test_transport", true) + telemetry := newTelemetryClient(client, "test_transport", true) testTelemetry(t, telemetry, expectedMetrics, basicExpectedTags) } @@ -121,7 +121,7 @@ func TestTelemetryChannelMode(t *testing.T) { client, err := New("localhost:8125", WithoutTelemetry(), WithChannelMode()) require.Nil(t, err) - telemetry := NewTelemetryClient(client, "test_transport", false) + telemetry := newTelemetryClient(client, "test_transport", false) testTelemetry(t, telemetry, basicExpectedMetrics, basicExpectedTags) } @@ -133,7 +133,7 @@ func TestTelemetryWithGlobalTags(t *testing.T) { client, err := New("localhost:8125", WithoutTelemetry(), WithTags([]string{"tag1", "tag2"})) require.Nil(t, err) - telemetry := NewTelemetryClient(client, "test_transport", false) + telemetry := newTelemetryClient(client, "test_transport", false) expectedTelemetryTags := append([]string{"tag1", "tag2", "env:test"}, basicExpectedTags...) testTelemetry(t, telemetry, basicExpectedMetrics, expectedTelemetryTags) @@ -144,7 +144,7 @@ func TestTelemetryWithAggregation(t *testing.T) { client, err := New("localhost:8125", WithoutTelemetry(), WithClientSideAggregation()) require.Nil(t, err) - telemetry := NewTelemetryClient(client, "test_transport", false) + telemetry := newTelemetryClient(client, "test_transport", false) expectedMetrics := map[string]int64{ "datadog.dogstatsd.client.aggregated_context": 5, @@ -161,7 +161,7 @@ func TestTelemetryWithAggregationDevMode(t *testing.T) { client, err := New("localhost:8125", WithoutTelemetry(), WithClientSideAggregation(), WithDevMode()) require.Nil(t, err) - telemetry := NewTelemetryClient(client, "test_transport", true) + telemetry := newTelemetryClient(client, "test_transport", true) expectedMetrics := map[string]int64{ "datadog.dogstatsd.client.aggregated_context": 5,