From 715201d893f0175ce5d7e4336674f0ba70fb0161 Mon Sep 17 00:00:00 2001 From: Maxime mouial Date: Fri, 29 Jan 2021 12:09:47 +0100 Subject: [PATCH] Fix multi-metric aggregation --- statsd/buffer.go | 1 + statsd/worker.go | 2 +- statsd/worker_test.go | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/statsd/buffer.go b/statsd/buffer.go index 5da60e09..37ea6ac2 100644 --- a/statsd/buffer.go +++ b/statsd/buffer.go @@ -72,6 +72,7 @@ func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, gl } originalBuffer := b.buffer + b.writeSeparator() b.buffer = appendHeader(b.buffer, namespace, name) // buffer already full diff --git a/statsd/worker.go b/statsd/worker.go index a7cb085b..2ab0c75f 100644 --- a/statsd/worker.go +++ b/statsd/worker.go @@ -90,7 +90,7 @@ func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte) erro // first check how much data we can write to the buffer: // +3 + len(metricSymbol) because the message will include '||#' before the tags - // +1 for the coma between the two set of tags + // +1 for the potential line break at the start of the metric tagsSize := len(m.stags) + 4 + len(metricSymbol) for _, t := range m.globalTags { tagsSize += len(t) + 1 diff --git a/statsd/worker_test.go b/statsd/worker_test.go index 7d07e13b..2f9f5c74 100644 --- a/statsd/worker_test.go +++ b/statsd/worker_test.go @@ -253,3 +253,37 @@ func TestWorkerDistributionAggregatedMultiple(t *testing.T) { data = <-s.queue assert.Equal(t, "namespace.test_distribution:3.3:4.4|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) } + +func TestWorkerMultipleDifferentDistributionAggregated(t *testing.T) { + // first metric will fit but not the second one + _, s, w := initWorker(160) + + m := metric{ + metricType: distributionAggregated, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_distribution", + fvalues: []float64{1.1, 2.2, 3.3, 4.4}, + stags: "tag1,tag2", + rate: 1, + } + err := w.processMetric(m) + assert.Nil(t, err) + m = metric{ + metricType: distributionAggregated, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_distribution_2", + fvalues: []float64{1.1, 2.2, 3.3, 4.4}, + stags: "tag1,tag2", + rate: 1, + } + err = w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data := <-s.queue + assert.Equal(t, "namespace.test_distribution:1.1:2.2:3.3:4.4|d|#globalTags,globalTags2,tag1,tag2\nnamespace.test_distribution_2:1.1:2.2:3.3|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) + data = <-s.queue + assert.Equal(t, "namespace.test_distribution_2:4.4|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) +}