Skip to content

Commit

Permalink
Fix multi-metric aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
hush-hush committed Jan 29, 2021
1 parent 3c524c6 commit 9434658
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 1 deletion.
1 change: 1 addition & 0 deletions statsd/buffer.go
Expand Up @@ -72,6 +72,7 @@ func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, gl
}

originalBuffer := b.buffer
b.writeSeparator()
b.buffer = appendHeader(b.buffer, namespace, name)

// buffer already full
Expand Down
2 changes: 1 addition & 1 deletion statsd/worker.go
Expand Up @@ -90,7 +90,7 @@ func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte) erro

// first check how much data we can write to the buffer:
// +3 + len(metricSymbol) because the message will include '|<metricSymbol>|#' before the tags
// +1 for the coma between the two set of tags
// +1 for the potential line break at the start of the metric
tagsSize := len(m.stags) + 4 + len(metricSymbol)
for _, t := range m.globalTags {
tagsSize += len(t) + 1
Expand Down
34 changes: 34 additions & 0 deletions statsd/worker_test.go
Expand Up @@ -253,3 +253,37 @@ func TestWorkerDistributionAggregatedMultiple(t *testing.T) {
data = <-s.queue
assert.Equal(t, "namespace.test_distribution:3.3:4.4|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer))
}

func TestWorkerMultipleDifferentDistributionAggregated(t *testing.T) {
// first metric will fit but not the second one
_, s, w := initWorker(160)

m := metric{
metricType: distributionAggregated,
namespace: "namespace.",
globalTags: []string{"globalTags", "globalTags2"},
name: "test_distribution",
fvalues: []float64{1.1, 2.2, 3.3, 4.4},
stags: "tag1,tag2",
rate: 1,
}
err := w.processMetric(m)
assert.Nil(t, err)
m = metric{
metricType: distributionAggregated,
namespace: "namespace.",
globalTags: []string{"globalTags", "globalTags2"},
name: "test_distribution_2",
fvalues: []float64{1.1, 2.2, 3.3, 4.4},
stags: "tag1,tag2",
rate: 1,
}
err = w.processMetric(m)
assert.Nil(t, err)

w.flush()
data := <-s.queue
assert.Equal(t, "namespace.test_distribution:1.1:2.2:3.3:4.4|d|#globalTags,globalTags2,tag1,tag2\nnamespace.test_distribution_2:1.1:2.2:3.3|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer))
data = <-s.queue
assert.Equal(t, "namespace.test_distribution_2:4.4|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer))
}

0 comments on commit 9434658

Please sign in to comment.