From 2cc35c37e8b1563d829479d0a16a189e932b68bf Mon Sep 17 00:00:00 2001 From: Maxime mouial Date: Thu, 12 Nov 2020 12:55:56 +0100 Subject: [PATCH] Update devMode telemetry naming and taging This makes the telemetry for the beta client aggregation easier to graph and use --- statsd/telemetry.go | 78 ++++++++------ statsd/telemetry_test.go | 228 +++++++++++++++++++++++++++------------ 2 files changed, 207 insertions(+), 99 deletions(-) diff --git a/statsd/telemetry.go b/statsd/telemetry.go index 94691025..e7342f64 100644 --- a/statsd/telemetry.go +++ b/statsd/telemetry.go @@ -22,19 +22,31 @@ clientVersionTelemetryTag is a tag identifying this specific client version. var clientVersionTelemetryTag = "client_version:4.2.0" type telemetryClient struct { - c *Client - tags []string - sender *sender - worker *worker - devMode bool + c *Client + tags []string + tagsByType map[metricType][]string + sender *sender + worker *worker + devMode bool } func newTelemetryClient(c *Client, transport string, devMode bool) *telemetryClient { - return &telemetryClient{ - c: c, - tags: append(c.Tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+transport), - devMode: devMode, + t := &telemetryClient{ + c: c, + tags: append(c.Tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+transport), + tagsByType: map[metricType][]string{}, + devMode: devMode, } + + if devMode { + t.tagsByType[gauge] = append(append([]string{}, t.tags...), "metrics_type:gauge") + t.tagsByType[count] = append(append([]string{}, t.tags...), "metrics_type:count") + t.tagsByType[set] = append(append([]string{}, t.tags...), "metrics_type:set") + t.tagsByType[timing] = append(append([]string{}, t.tags...), "metrics_type:timing") + t.tagsByType[histogram] = append(append([]string{}, t.tags...), "metrics_type:histogram") + t.tagsByType[distribution] = append(append([]string{}, t.tags...), "metrics_type:distribution") + } + return t } func newTelemetryClientWithCustomAddr(c *Client, transport string, devMode bool, telemetryAddr string, pool *bufferPool) (*telemetryClient, error) { @@ -93,41 +105,41 @@ func (t *telemetryClient) flush() []metric { m := []metric{} // same as Count but without global namespace - telemetryCount := func(name string, value int64) { - m = append(m, metric{metricType: count, name: name, ivalue: value, tags: t.tags, rate: 1}) + telemetryCount := func(name string, value int64, tags []string) { + m = append(m, metric{metricType: count, name: name, ivalue: value, tags: tags, rate: 1}) } clientMetrics := t.c.FlushTelemetryMetrics() - telemetryCount("datadog.dogstatsd.client.metrics", int64(clientMetrics.TotalMetrics)) + telemetryCount("datadog.dogstatsd.client.metrics", int64(clientMetrics.TotalMetrics), t.tags) if t.devMode { - telemetryCount("datadog.dogstatsd.client.metricsGauge", int64(clientMetrics.TotalMetricsGauge)) - telemetryCount("datadog.dogstatsd.client.metricsCount", int64(clientMetrics.TotalMetricsCount)) - telemetryCount("datadog.dogstatsd.client.metricsHistogram", int64(clientMetrics.TotalMetricsHistogram)) - telemetryCount("datadog.dogstatsd.client.metricsDistribution", int64(clientMetrics.TotalMetricsDistribution)) - telemetryCount("datadog.dogstatsd.client.metricsSet", int64(clientMetrics.TotalMetricsSet)) - telemetryCount("datadog.dogstatsd.client.metricsTiming", int64(clientMetrics.TotalMetricsTiming)) + telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsGauge), t.tagsByType[gauge]) + telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsCount), t.tagsByType[count]) + telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsHistogram), t.tagsByType[histogram]) + telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsDistribution), t.tagsByType[distribution]) + telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsSet), t.tagsByType[set]) + telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsTiming), t.tagsByType[timing]) } - telemetryCount("datadog.dogstatsd.client.events", int64(clientMetrics.TotalEvents)) - telemetryCount("datadog.dogstatsd.client.service_checks", int64(clientMetrics.TotalServiceChecks)) - telemetryCount("datadog.dogstatsd.client.metric_dropped_on_receive", int64(clientMetrics.TotalDroppedOnReceive)) + telemetryCount("datadog.dogstatsd.client.events", int64(clientMetrics.TotalEvents), t.tags) + telemetryCount("datadog.dogstatsd.client.service_checks", int64(clientMetrics.TotalServiceChecks), t.tags) + telemetryCount("datadog.dogstatsd.client.metric_dropped_on_receive", int64(clientMetrics.TotalDroppedOnReceive), t.tags) senderMetrics := t.c.sender.flushTelemetryMetrics() - telemetryCount("datadog.dogstatsd.client.packets_sent", int64(senderMetrics.TotalSentPayloads)) - telemetryCount("datadog.dogstatsd.client.bytes_sent", int64(senderMetrics.TotalSentBytes)) - telemetryCount("datadog.dogstatsd.client.packets_dropped", int64(senderMetrics.TotalDroppedPayloads)) - telemetryCount("datadog.dogstatsd.client.bytes_dropped", int64(senderMetrics.TotalDroppedBytes)) - telemetryCount("datadog.dogstatsd.client.packets_dropped_queue", int64(senderMetrics.TotalDroppedPayloadsQueueFull)) - telemetryCount("datadog.dogstatsd.client.bytes_dropped_queue", int64(senderMetrics.TotalDroppedBytesQueueFull)) - telemetryCount("datadog.dogstatsd.client.packets_dropped_writer", int64(senderMetrics.TotalDroppedPayloadsWriter)) - telemetryCount("datadog.dogstatsd.client.bytes_dropped_writer", int64(senderMetrics.TotalDroppedBytesWriter)) + telemetryCount("datadog.dogstatsd.client.packets_sent", int64(senderMetrics.TotalSentPayloads), t.tags) + telemetryCount("datadog.dogstatsd.client.bytes_sent", int64(senderMetrics.TotalSentBytes), t.tags) + telemetryCount("datadog.dogstatsd.client.packets_dropped", int64(senderMetrics.TotalDroppedPayloads), t.tags) + telemetryCount("datadog.dogstatsd.client.bytes_dropped", int64(senderMetrics.TotalDroppedBytes), t.tags) + telemetryCount("datadog.dogstatsd.client.packets_dropped_queue", int64(senderMetrics.TotalDroppedPayloadsQueueFull), t.tags) + telemetryCount("datadog.dogstatsd.client.bytes_dropped_queue", int64(senderMetrics.TotalDroppedBytesQueueFull), t.tags) + telemetryCount("datadog.dogstatsd.client.packets_dropped_writer", int64(senderMetrics.TotalDroppedPayloadsWriter), t.tags) + telemetryCount("datadog.dogstatsd.client.bytes_dropped_writer", int64(senderMetrics.TotalDroppedBytesWriter), t.tags) if aggMetrics := t.c.agg.flushTelemetryMetrics(); aggMetrics != nil { - telemetryCount("datadog.dogstatsd.client.aggregated_context", int64(aggMetrics.nbContext)) + telemetryCount("datadog.dogstatsd.client.aggregated_context", int64(aggMetrics.nbContext), t.tags) if t.devMode { - telemetryCount("datadog.dogstatsd.client.aggregated_context_gauge", int64(aggMetrics.nbContextGauge)) - telemetryCount("datadog.dogstatsd.client.aggregated_context_set", int64(aggMetrics.nbContextSet)) - telemetryCount("datadog.dogstatsd.client.aggregated_context_count", int64(aggMetrics.nbContextCount)) + telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextGauge), t.tagsByType[gauge]) + telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextSet), t.tagsByType[set]) + telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextCount), t.tagsByType[count]) } } diff --git a/statsd/telemetry_test.go b/statsd/telemetry_test.go index 5017b69b..0c643bd8 100644 --- a/statsd/telemetry_test.go +++ b/statsd/telemetry_test.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "os" + "sort" "testing" "time" @@ -12,34 +13,86 @@ import ( ) var basicExpectedTags = []string{clientTelemetryTag, clientVersionTelemetryTag, "client_transport:test_transport"} -var basicExpectedMetrics = map[string]int64{ - "datadog.dogstatsd.client.metrics": 9, - "datadog.dogstatsd.client.events": 1, - "datadog.dogstatsd.client.service_checks": 1, - "datadog.dogstatsd.client.metric_dropped_on_receive": 0, - "datadog.dogstatsd.client.packets_sent": 0, - "datadog.dogstatsd.client.bytes_sent": 0, - "datadog.dogstatsd.client.packets_dropped": 0, - "datadog.dogstatsd.client.bytes_dropped": 0, - "datadog.dogstatsd.client.packets_dropped_queue": 0, - "datadog.dogstatsd.client.bytes_dropped_queue": 0, - "datadog.dogstatsd.client.packets_dropped_writer": 0, - "datadog.dogstatsd.client.bytes_dropped_writer": 0, + +func appendBasicMetrics(metrics []metric, tags []string) []metric { + basicExpectedMetrics := map[string]int64{ + "datadog.dogstatsd.client.metrics": 9, + "datadog.dogstatsd.client.events": 1, + "datadog.dogstatsd.client.service_checks": 1, + "datadog.dogstatsd.client.metric_dropped_on_receive": 0, + "datadog.dogstatsd.client.packets_sent": 0, + "datadog.dogstatsd.client.bytes_sent": 0, + "datadog.dogstatsd.client.packets_dropped": 0, + "datadog.dogstatsd.client.bytes_dropped": 0, + "datadog.dogstatsd.client.packets_dropped_queue": 0, + "datadog.dogstatsd.client.bytes_dropped_queue": 0, + "datadog.dogstatsd.client.packets_dropped_writer": 0, + "datadog.dogstatsd.client.bytes_dropped_writer": 0, + } + + for name, value := range basicExpectedMetrics { + metrics = append(metrics, metric{ + name: name, + ivalue: value, + metricType: count, + tags: append(tags, basicExpectedTags...), + rate: float64(1), + }) + } + return metrics } -var devModeExpectedMetrics = map[string]int64{ - "datadog.dogstatsd.client.metricsGauge": 1, - "datadog.dogstatsd.client.metricsCount": 3, - "datadog.dogstatsd.client.metricsHistogram": 1, - "datadog.dogstatsd.client.metricsDistribution": 1, - "datadog.dogstatsd.client.metricsSet": 1, - "datadog.dogstatsd.client.metricsTiming": 2, +func appendAggregationMetrics(metrics []metric, tags []string, devMode bool) []metric { + metrics = append(metrics, metric{ + name: "datadog.dogstatsd.client.aggregated_context", + ivalue: 5, + metricType: count, + tags: append(tags, basicExpectedTags...), + rate: float64(1), + }) + + if devMode { + contextByTypeName := "datadog.dogstatsd.client.aggregated_context_by_type" + devModeAggregationExpectedMetrics := map[string]int64{ + "metrics_type:gauge": 1, + "metrics_type:set": 1, + "metrics_type:count": 3, + } + + for typeTag, value := range devModeAggregationExpectedMetrics { + metrics = append(metrics, metric{ + name: contextByTypeName, + ivalue: value, + metricType: count, + tags: append(tags, append(basicExpectedTags, typeTag)...), + rate: float64(1), + }) + } + } + return metrics } -var devModeAggregationExpectedMetrics = map[string]int64{ - "datadog.dogstatsd.client.aggregated_context_gauge": 1, - "datadog.dogstatsd.client.aggregated_context_set": 1, - "datadog.dogstatsd.client.aggregated_context_count": 3, +func appendDevModeMetrics(metrics []metric, tags []string) []metric { + metricByTypeName := "datadog.dogstatsd.client.metrics_by_type" + devModeExpectedMetrics := map[string]int64{ + "metrics_type:gauge": 1, + "metrics_type:count": 3, + "metrics_type:set": 1, + "metrics_type:timing": 2, + "metrics_type:histogram": 1, + "metrics_type:distribution": 1, + } + + for typeTag, value := range devModeExpectedMetrics { + metrics = append(metrics, metric{ + name: metricByTypeName, + ivalue: value, + metricType: count, + tags: append(tags, append(basicExpectedTags, typeTag)...), + rate: float64(1), + }) + } + return metrics } func TestNewTelemetry(t *testing.T) { @@ -69,7 +122,31 @@ func submitTestMetrics(c *Client) { c.SimpleServiceCheck("hello", Warn) } -func testTelemetry(t *testing.T, telemetry *telemetryClient, expectedMetrics map[string]int64, expectedTelemetryTags []string) { +type metricSorted []metric + +func (s metricSorted) Len() int { return len(s) } +func (s metricSorted) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s metricSorted) Less(i, j int) bool { + if s[i].name == s[j].name { + if len(s[i].tags) != len(s[j].tags) { + return len(s[i].tags) < len(s[j].tags) + } + + sort.Strings(s[i].tags) + sort.Strings(s[j].tags) + + for idx := range s[i].tags { + if s[i].tags[idx] != s[j].tags[idx] { + return s[i].tags[idx] < s[j].tags[idx] + } + } + return false + } + return s[i].name < s[j].name +} + +func testTelemetry(t *testing.T, telemetry *telemetryClient, expectedMetrics []metric) { assert.NotNil(t, telemetry) submitTestMetrics(telemetry.c) @@ -78,15 +155,20 @@ func testTelemetry(t *testing.T, telemetry *telemetryClient, expectedMetrics map } metrics := telemetry.flush() - assert.Equal(t, len(expectedMetrics), len(metrics)) - for _, m := range metrics { - expectedValue, found := expectedMetrics[m.name] - assert.True(t, found, fmt.Sprintf("Unknown metrics: %s", m.name)) + require.Equal(t, len(expectedMetrics), len(metrics), fmt.Sprintf("expected:\n%v\nactual:\n%v", expectedMetrics, metrics)) - assert.Equal(t, expectedValue, m.ivalue, fmt.Sprintf("wrong ivalue for '%s'", m.name)) - assert.Equal(t, count, m.metricType, fmt.Sprintf("wrong metricTypefor '%s'", m.name)) - assert.Equal(t, expectedTelemetryTags, m.tags, fmt.Sprintf("wrong tags for '%s'", m.name)) - assert.Equal(t, float64(1), m.rate, fmt.Sprintf("wrong rate for '%s'", m.name)) + sort.Sort(metricSorted(metrics)) + sort.Sort(metricSorted(expectedMetrics)) + + for idx := range metrics { + m := metrics[idx] + expected := expectedMetrics[idx] + + assert.Equal(t, expected.ivalue, m.ivalue, fmt.Sprintf("wrong ivalue for '%s' with tags '%v'", m.name, m.tags)) + assert.Equal(t, expected.metricType, m.metricType, fmt.Sprintf("wrong metricType for '%s'", m.name)) + + assert.Equal(t, expected.tags, m.tags, fmt.Sprintf("wrong tags for '%s'", m.name)) + assert.Equal(t, expected.rate, m.rate, fmt.Sprintf("wrong rate for '%s'", m.name)) } } @@ -95,8 +177,11 @@ func TestTelemetry(t *testing.T) { client, err := New("localhost:8125", WithoutTelemetry()) require.Nil(t, err) + expectedMetrics := []metric{} + expectedMetrics = appendBasicMetrics(expectedMetrics, nil) + telemetry := newTelemetryClient(client, "test_transport", false) - testTelemetry(t, telemetry, basicExpectedMetrics, basicExpectedTags) + testTelemetry(t, telemetry, expectedMetrics) } func TestTelemetryDevMode(t *testing.T) { @@ -104,16 +189,12 @@ func TestTelemetryDevMode(t *testing.T) { client, err := New("localhost:8125", WithoutTelemetry(), WithDevMode()) require.Nil(t, err) - expectedMetrics := map[string]int64{} - for k, v := range basicExpectedMetrics { - expectedMetrics[k] = v - } - for k, v := range devModeExpectedMetrics { - expectedMetrics[k] = v - } + expectedMetrics := []metric{} + expectedMetrics = appendBasicMetrics(expectedMetrics, nil) + expectedMetrics = appendDevModeMetrics(expectedMetrics, nil) telemetry := newTelemetryClient(client, "test_transport", true) - testTelemetry(t, telemetry, expectedMetrics, basicExpectedTags) + testTelemetry(t, telemetry, expectedMetrics) } func TestTelemetryChannelMode(t *testing.T) { @@ -122,12 +203,17 @@ func TestTelemetryChannelMode(t *testing.T) { require.Nil(t, err) telemetry := newTelemetryClient(client, "test_transport", false) - testTelemetry(t, telemetry, basicExpectedMetrics, basicExpectedTags) + + expectedMetrics := []metric{} + expectedMetrics = appendBasicMetrics(expectedMetrics, nil) + + testTelemetry(t, telemetry, expectedMetrics) } func TestTelemetryWithGlobalTags(t *testing.T) { + orig := os.Getenv("DD_ENV") os.Setenv("DD_ENV", "test") - defer os.Unsetenv("DD_ENV") + defer os.Setenv("DD_ENV", orig) // disabling autoflush of the telemetry client, err := New("localhost:8125", WithoutTelemetry(), WithTags([]string{"tag1", "tag2"})) @@ -135,8 +221,10 @@ func TestTelemetryWithGlobalTags(t *testing.T) { telemetry := newTelemetryClient(client, "test_transport", false) - expectedTelemetryTags := append([]string{"tag1", "tag2", "env:test"}, basicExpectedTags...) - testTelemetry(t, telemetry, basicExpectedMetrics, expectedTelemetryTags) + expectedMetrics := []metric{} + expectedMetrics = appendBasicMetrics(expectedMetrics, []string{"tag1", "tag2", "env:test"}) + + testTelemetry(t, telemetry, expectedMetrics) } func TestTelemetryWithAggregation(t *testing.T) { @@ -146,14 +234,11 @@ func TestTelemetryWithAggregation(t *testing.T) { telemetry := newTelemetryClient(client, "test_transport", false) - expectedMetrics := map[string]int64{ - "datadog.dogstatsd.client.aggregated_context": 5, - } - for k, v := range basicExpectedMetrics { - expectedMetrics[k] = v - } + expectedMetrics := []metric{} + expectedMetrics = appendBasicMetrics(expectedMetrics, nil) + expectedMetrics = appendAggregationMetrics(expectedMetrics, nil, false) - testTelemetry(t, telemetry, expectedMetrics, basicExpectedTags) + testTelemetry(t, telemetry, expectedMetrics) } func TestTelemetryWithAggregationDevMode(t *testing.T) { @@ -163,20 +248,31 @@ func TestTelemetryWithAggregationDevMode(t *testing.T) { telemetry := newTelemetryClient(client, "test_transport", true) - expectedMetrics := map[string]int64{ - "datadog.dogstatsd.client.aggregated_context": 5, - } - for k, v := range basicExpectedMetrics { - expectedMetrics[k] = v - } - for k, v := range devModeExpectedMetrics { - expectedMetrics[k] = v - } - for k, v := range devModeAggregationExpectedMetrics { - expectedMetrics[k] = v - } + expectedMetrics := []metric{} + expectedMetrics = appendBasicMetrics(expectedMetrics, nil) + expectedMetrics = appendAggregationMetrics(expectedMetrics, nil, true) + expectedMetrics = appendDevModeMetrics(expectedMetrics, nil) + + testTelemetry(t, telemetry, expectedMetrics) +} + +func TestTelemetryWithAggregationDevModeWithGlobalTags(t *testing.T) { + orig := os.Getenv("DD_ENV") + os.Setenv("DD_ENV", "test") + defer os.Setenv("DD_ENV", orig) + + // disabling autoflush of the telemetry + client, err := New("localhost:8125", WithoutTelemetry(), WithClientSideAggregation(), WithDevMode(), WithTags([]string{"tag1", "tag2"})) + require.Nil(t, err) + + telemetry := newTelemetryClient(client, "test_transport", true) + + expectedMetrics := []metric{} + expectedMetrics = appendBasicMetrics(expectedMetrics, []string{"tag1", "tag2", "env:test"}) + expectedMetrics = appendAggregationMetrics(expectedMetrics, []string{"tag1", "tag2", "env:test"}, true) + expectedMetrics = appendDevModeMetrics(expectedMetrics, []string{"tag1", "tag2", "env:test"}) - testTelemetry(t, telemetry, expectedMetrics, basicExpectedTags) + testTelemetry(t, telemetry, expectedMetrics) } func TestTelemetryCustomAddr(t *testing.T) {