Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multi-metric aggregation #181

Merged
merged 1 commit into from Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions statsd/buffer.go
Expand Up @@ -72,6 +72,7 @@ func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, gl
}

originalBuffer := b.buffer
b.writeSeparator()
b.buffer = appendHeader(b.buffer, namespace, name)

// buffer already full
Expand Down
2 changes: 1 addition & 1 deletion statsd/worker.go
Expand Up @@ -90,7 +90,7 @@ func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte) erro

// first check how much data we can write to the buffer:
// +3 + len(metricSymbol) because the message will include '|<metricSymbol>|#' before the tags
// +1 for the coma between the two set of tags
// +1 for the potential line break at the start of the metric
tagsSize := len(m.stags) + 4 + len(metricSymbol)
for _, t := range m.globalTags {
tagsSize += len(t) + 1
Expand Down
34 changes: 34 additions & 0 deletions statsd/worker_test.go
Expand Up @@ -253,3 +253,37 @@ func TestWorkerDistributionAggregatedMultiple(t *testing.T) {
data = <-s.queue
assert.Equal(t, "namespace.test_distribution:3.3:4.4|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer))
}

func TestWorkerMultipleDifferentDistributionAggregated(t *testing.T) {
// first metric will fit but not the second one
_, s, w := initWorker(160)

m := metric{
metricType: distributionAggregated,
namespace: "namespace.",
globalTags: []string{"globalTags", "globalTags2"},
name: "test_distribution",
fvalues: []float64{1.1, 2.2, 3.3, 4.4},
stags: "tag1,tag2",
rate: 1,
}
err := w.processMetric(m)
assert.Nil(t, err)
m = metric{
metricType: distributionAggregated,
namespace: "namespace.",
globalTags: []string{"globalTags", "globalTags2"},
name: "test_distribution_2",
fvalues: []float64{1.1, 2.2, 3.3, 4.4},
stags: "tag1,tag2",
rate: 1,
}
err = w.processMetric(m)
assert.Nil(t, err)

w.flush()
data := <-s.queue
assert.Equal(t, "namespace.test_distribution:1.1:2.2:3.3:4.4|d|#globalTags,globalTags2,tag1,tag2\nnamespace.test_distribution_2:1.1:2.2:3.3|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer))
data = <-s.queue
assert.Equal(t, "namespace.test_distribution_2:4.4|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer))
}