Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a DevMode to the client #169

Merged
merged 2 commits into from Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 16 additions & 7 deletions statsd/aggregator.go
Expand Up @@ -14,7 +14,10 @@ type (
)

type aggregator struct {
nbContext int32
nbContext int32
nbContextGauge int32
nbContextCount int32
nbContextSet int32

countsM sync.RWMutex
gaugesM sync.RWMutex
Expand All @@ -31,7 +34,10 @@ type aggregator struct {
}

type aggregatorMetrics struct {
nbContext int32
nbContext int32
nbContextGauge int32
nbContextCount int32
nbContextSet int32
}

func newAggregator(c *Client) *aggregator {
Expand Down Expand Up @@ -79,7 +85,10 @@ func (a *aggregator) flushTelemetryMetrics() *aggregatorMetrics {
}

return &aggregatorMetrics{
nbContext: a.nbContext,
nbContext: a.nbContext,
nbContextGauge: a.nbContextGauge,
nbContextCount: a.nbContextCount,
nbContextSet: a.nbContextSet,
}
}

Expand All @@ -94,8 +103,6 @@ func (a *aggregator) flushMetrics() []metric {
a.sets = setsMap{}
a.setsM.Unlock()

atomic.StoreInt32(&a.nbContext, int32(len(sets)))

for _, s := range sets {
metrics = append(metrics, s.flushUnsafe()...)
}
Expand All @@ -105,7 +112,6 @@ func (a *aggregator) flushMetrics() []metric {
a.gauges = gaugesMap{}
a.gaugesM.Unlock()

atomic.AddInt32(&a.nbContext, int32(len(gauges)))
for _, g := range gauges {
metrics = append(metrics, g.flushUnsafe())
}
Expand All @@ -115,11 +121,14 @@ func (a *aggregator) flushMetrics() []metric {
a.counts = countsMap{}
a.countsM.Unlock()

atomic.AddInt32(&a.nbContext, int32(len(counts)))
for _, c := range counts {
metrics = append(metrics, c.flushUnsafe())
}

atomic.StoreInt32(&a.nbContextCount, int32(len(counts)))
hush-hush marked this conversation as resolved.
Show resolved Hide resolved
atomic.StoreInt32(&a.nbContextGauge, int32(len(gauges)))
atomic.StoreInt32(&a.nbContextSet, int32(len(sets)))
atomic.StoreInt32(&a.nbContext, int32(len(sets)+len(gauges)+len(counts)))
return metrics
}

Expand Down
23 changes: 23 additions & 0 deletions statsd/options.go
Expand Up @@ -35,6 +35,8 @@ var (
DefaultAggregationFlushInterval = 3 * time.Second
// DefaultAggregation
DefaultAggregation = false
// DevMode
DevMode = false
hush-hush marked this conversation as resolved.
Show resolved Hide resolved
)

// Options contains the configuration options for a client.
Expand Down Expand Up @@ -97,6 +99,9 @@ type Options struct {
Aggregation bool
// TelemetryAddr specify a different endpoint for telemetry metrics.
TelemetryAddr string
// DevMode enables the "dev" mode where the client sends much more
// telemetry metrics to help troubleshooting the client behavior.
DevMode bool
}

func resolveOptions(options []Option) (*Options, error) {
Expand Down Expand Up @@ -269,3 +274,21 @@ func WithTelemetryAddr(addr string) Option {
return nil
}
}

// WithDevMode enables client "dev" mode, sending more Telemetry metrics to
// help troubleshoot client behavior.
func WithDevMode() Option {
return func(o *Options) error {
o.DevMode = true
return nil
}
}

// WithoutDevMode disables client "dev" mode, sending more Telemetry metrics to
// help troubleshoot client behavior.
func WithoutDevMode() Option {
return func(o *Options) error {
o.DevMode = false
return nil
}
}
3 changes: 3 additions & 0 deletions statsd/options_test.go
Expand Up @@ -26,6 +26,7 @@ func TestDefaultOptions(t *testing.T) {
assert.Equal(t, options.AggregationFlushInterval, DefaultAggregationFlushInterval)
assert.Equal(t, options.Aggregation, DefaultAggregation)
assert.Zero(t, options.TelemetryAddr)
assert.False(t, options.DevMode)
}

func TestOptions(t *testing.T) {
Expand Down Expand Up @@ -58,6 +59,7 @@ func TestOptions(t *testing.T) {
WithAggregationInterval(testAggregationWindow),
WithClientSideAggregation(),
WithTelemetryAddr(testTelemetryAddr),
WithDevMode(),
})

assert.NoError(t, err)
Expand All @@ -76,6 +78,7 @@ func TestOptions(t *testing.T) {
assert.Equal(t, options.AggregationFlushInterval, testAggregationWindow)
assert.Equal(t, options.Aggregation, true)
assert.Equal(t, options.TelemetryAddr, testTelemetryAddr)
assert.True(t, options.DevMode)
}

func TestResetOptions(t *testing.T) {
Expand Down
53 changes: 35 additions & 18 deletions statsd/statsd.go
Expand Up @@ -206,10 +206,16 @@ type Client struct {

// ClientMetrics contains metrics about the client
type ClientMetrics struct {
TotalMetrics uint64
TotalEvents uint64
TotalServiceChecks uint64
TotalDroppedOnReceive uint64
TotalMetrics uint64
TotalMetricsGauge uint64
TotalMetricsCount uint64
TotalMetricsHistogram uint64
TotalMetricsDistribution uint64
TotalMetricsSet uint64
TotalMetricsTiming uint64
TotalEvents uint64
TotalServiceChecks uint64
TotalDroppedOnReceive uint64
}

// Verify that Client implements the ClientInterface.
Expand Down Expand Up @@ -326,10 +332,10 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro

if o.Telemetry {
if o.TelemetryAddr == "" {
c.telemetry = NewTelemetryClient(&c, writerName)
c.telemetry = NewTelemetryClient(&c, writerName, o.DevMode)
} else {
var err error
c.telemetry, err = NewTelemetryClientWithCustomAddr(&c, writerName, o.TelemetryAddr, bufferPool)
c.telemetry, err = NewTelemetryClientWithCustomAddr(&c, writerName, o.DevMode, o.TelemetryAddr, bufferPool)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -396,12 +402,23 @@ func (c *Client) Flush() error {
}

func (c *Client) FlushTelemetryMetrics() ClientMetrics {
return ClientMetrics{
TotalMetrics: atomic.SwapUint64(&c.metrics.TotalMetrics, 0),
TotalEvents: atomic.SwapUint64(&c.metrics.TotalEvents, 0),
TotalServiceChecks: atomic.SwapUint64(&c.metrics.TotalServiceChecks, 0),
TotalDroppedOnReceive: atomic.SwapUint64(&c.metrics.TotalDroppedOnReceive, 0),
}
cm := ClientMetrics{
TotalMetricsGauge: atomic.SwapUint64(&c.metrics.TotalMetricsGauge, 0),
TotalMetricsCount: atomic.SwapUint64(&c.metrics.TotalMetricsCount, 0),
TotalMetricsSet: atomic.SwapUint64(&c.metrics.TotalMetricsSet, 0),
TotalMetricsHistogram: atomic.SwapUint64(&c.metrics.TotalMetricsHistogram, 0),
TotalMetricsDistribution: atomic.SwapUint64(&c.metrics.TotalMetricsDistribution, 0),
TotalMetricsTiming: atomic.SwapUint64(&c.metrics.TotalMetricsTiming, 0),
TotalEvents: atomic.SwapUint64(&c.metrics.TotalEvents, 0),
TotalServiceChecks: atomic.SwapUint64(&c.metrics.TotalServiceChecks, 0),
TotalDroppedOnReceive: atomic.SwapUint64(&c.metrics.TotalDroppedOnReceive, 0),
}

cm.TotalMetrics = cm.TotalMetricsGauge + cm.TotalMetricsCount +
cm.TotalMetricsSet + cm.TotalMetricsHistogram +
cm.TotalMetricsDistribution + cm.TotalMetricsTiming

return cm
}

func (c *Client) send(m metric) error {
Expand Down Expand Up @@ -431,7 +448,7 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64)
if c == nil {
return ErrNoClient
}
atomic.AddUint64(&c.metrics.TotalMetrics, 1)
atomic.AddUint64(&c.metrics.TotalMetricsGauge, 1)
if c.agg != nil {
return c.agg.gauge(name, value, tags, rate)
}
Expand All @@ -443,7 +460,7 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er
if c == nil {
return ErrNoClient
}
atomic.AddUint64(&c.metrics.TotalMetrics, 1)
atomic.AddUint64(&c.metrics.TotalMetricsCount, 1)
if c.agg != nil {
return c.agg.count(name, value, tags, rate)
}
Expand All @@ -455,7 +472,7 @@ func (c *Client) Histogram(name string, value float64, tags []string, rate float
if c == nil {
return ErrNoClient
}
atomic.AddUint64(&c.metrics.TotalMetrics, 1)
atomic.AddUint64(&c.metrics.TotalMetricsHistogram, 1)
return c.send(metric{metricType: histogram, name: name, fvalue: value, tags: tags, rate: rate})
}

Expand All @@ -464,7 +481,7 @@ func (c *Client) Distribution(name string, value float64, tags []string, rate fl
if c == nil {
return ErrNoClient
}
atomic.AddUint64(&c.metrics.TotalMetrics, 1)
atomic.AddUint64(&c.metrics.TotalMetricsDistribution, 1)
return c.send(metric{metricType: distribution, name: name, fvalue: value, tags: tags, rate: rate})
}

Expand All @@ -483,7 +500,7 @@ func (c *Client) Set(name string, value string, tags []string, rate float64) err
if c == nil {
return ErrNoClient
}
atomic.AddUint64(&c.metrics.TotalMetrics, 1)
atomic.AddUint64(&c.metrics.TotalMetricsSet, 1)
if c.agg != nil {
return c.agg.set(name, value, tags, rate)
}
Expand All @@ -501,7 +518,7 @@ func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, r
if c == nil {
return ErrNoClient
}
atomic.AddUint64(&c.metrics.TotalMetrics, 1)
atomic.AddUint64(&c.metrics.TotalMetricsTiming, 1)
return c.send(metric{metricType: timing, name: name, fvalue: value, tags: tags, rate: rate})
}

Expand Down
12 changes: 12 additions & 0 deletions statsd/statsd_test.go
Expand Up @@ -76,6 +76,7 @@ func TestChannelMode(t *testing.T) {

client, err := New(addr, WithChannelMode())
require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err))
assert.False(t, client.telemetry.devMode)

testStatsdPipeline(t, client, addr)
}
Expand All @@ -85,6 +86,17 @@ func TestMutexMode(t *testing.T) {

client, err := New(addr)
require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err))
assert.False(t, client.telemetry.devMode)

testStatsdPipeline(t, client, addr)
}

func TestDevMode(t *testing.T) {
addr := "localhost:1201"

client, err := New(addr, WithDevMode())
require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err))
assert.True(t, client.telemetry.devMode)

testStatsdPipeline(t, client, addr)
}
Expand Down
35 changes: 26 additions & 9 deletions statsd/telemetry.go
Expand Up @@ -22,26 +22,28 @@ clientVersionTelemetryTag is a tag identifying this specific client version.
var clientVersionTelemetryTag = "client_version:4.0.1"

type telemetryClient struct {
c *Client
tags []string
sender *sender
worker *worker
c *Client
tags []string
sender *sender
worker *worker
devMode bool
}

func NewTelemetryClient(c *Client, transport string) *telemetryClient {
func NewTelemetryClient(c *Client, transport string, devMode bool) *telemetryClient {
return &telemetryClient{
c: c,
tags: append(c.Tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+transport),
c: c,
tags: append(c.Tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+transport),
devMode: devMode,
}
}

func NewTelemetryClientWithCustomAddr(c *Client, transport string, telemetryAddr string, pool *bufferPool) (*telemetryClient, error) {
func NewTelemetryClientWithCustomAddr(c *Client, transport string, devMode bool, telemetryAddr string, pool *bufferPool) (*telemetryClient, error) {
telemetryWriter, _, err := resolveAddr(telemetryAddr)
if err != nil {
return nil, fmt.Errorf("Could not resolve telemetry address: %v", err)
}

t := NewTelemetryClient(c, transport)
t := NewTelemetryClient(c, transport, devMode)

// Creating a custom sender/worker with 1 worker in mutex mode for the
// telemetry that share the same bufferPool.
Expand Down Expand Up @@ -97,6 +99,15 @@ func (t *telemetryClient) flush() []metric {

clientMetrics := t.c.FlushTelemetryMetrics()
telemetryCount("datadog.dogstatsd.client.metrics", int64(clientMetrics.TotalMetrics))
if t.devMode {
telemetryCount("datadog.dogstatsd.client.metricsGauge", int64(clientMetrics.TotalMetricsGauge))
telemetryCount("datadog.dogstatsd.client.metricsCount", int64(clientMetrics.TotalMetricsCount))
telemetryCount("datadog.dogstatsd.client.metricsHistogram", int64(clientMetrics.TotalMetricsHistogram))
telemetryCount("datadog.dogstatsd.client.metricsDistribution", int64(clientMetrics.TotalMetricsDistribution))
telemetryCount("datadog.dogstatsd.client.metricsSet", int64(clientMetrics.TotalMetricsSet))
telemetryCount("datadog.dogstatsd.client.metricsTiming", int64(clientMetrics.TotalMetricsTiming))
}

telemetryCount("datadog.dogstatsd.client.events", int64(clientMetrics.TotalEvents))
telemetryCount("datadog.dogstatsd.client.service_checks", int64(clientMetrics.TotalServiceChecks))
telemetryCount("datadog.dogstatsd.client.metric_dropped_on_receive", int64(clientMetrics.TotalDroppedOnReceive))
Expand All @@ -113,6 +124,12 @@ func (t *telemetryClient) flush() []metric {

if aggMetrics := t.c.agg.flushTelemetryMetrics(); aggMetrics != nil {
telemetryCount("datadog.dogstatsd.client.aggregated_context", int64(aggMetrics.nbContext))
if t.devMode {
telemetryCount("datadog.dogstatsd.client.aggregated_context_gauge", int64(aggMetrics.nbContextGauge))
telemetryCount("datadog.dogstatsd.client.aggregated_context_set", int64(aggMetrics.nbContextSet))
telemetryCount("datadog.dogstatsd.client.aggregated_context_count", int64(aggMetrics.nbContextCount))
}
}

return m
}