From e5e4eb855635c968261ab06e1d6d72e366842cef Mon Sep 17 00:00:00 2001 From: maxime mouial Date: Wed, 20 Jan 2021 10:17:29 +0100 Subject: [PATCH] Client side aggregation for distribution, histogram and timing We now buffer values for those types and pack them in one dogstatsd message. This is based on the new dogstatsd 1.1 protocol. --- statsd/aggregator.go | 126 ++++++++++++++++++---- statsd/aggregator_test.go | 119 ++++++++++++++++----- statsd/buffer.go | 62 ++++++++++- statsd/buffer_test.go | 54 ++++++++++ statsd/format.go | 31 +++++- statsd/format_test.go | 36 +++++++ statsd/metrics.go | 62 +++++++++++ statsd/metrics_test.go | 114 ++++++++++++++++++++ statsd/options.go | 26 ++++- statsd/options_test.go | 12 +++ statsd/statsd.go | 34 +++--- statsd/telemetry.go | 4 + statsd/telemetry_test.go | 64 ++++++++--- statsd/worker.go | 31 ++++++ statsd/worker_test.go | 217 ++++++++++++++++++++++++++++++++++++++ 15 files changed, 909 insertions(+), 83 deletions(-) diff --git a/statsd/aggregator.go b/statsd/aggregator.go index 84815e68..7d1d44f5 100644 --- a/statsd/aggregator.go +++ b/statsd/aggregator.go @@ -8,11 +8,62 @@ import ( ) type ( - countsMap map[string]*countMetric - gaugesMap map[string]*gaugeMetric - setsMap map[string]*setMetric + countsMap map[string]*countMetric + gaugesMap map[string]*gaugeMetric + setsMap map[string]*setMetric + bufferedMetricMap map[string]*histogramMetric ) +// bufferedMetricContexts represent the contexts for Histograms, Distributions +// and Timing. Since those 3 metric types behave the same way and are sampled +// with the same type they're represented by the same class. +type bufferedMetricContexts struct { + nbContext int32 + mutex sync.RWMutex + values bufferedMetricMap + newMetric func(string, float64, string) *bufferedMetric +} + +func newBufferedContexts(newMetric func(string, float64, string) *bufferedMetric) bufferedMetricContexts { + return bufferedMetricContexts{ + values: bufferedMetricMap{}, + newMetric: newMetric, + } +} + +func (bc *bufferedMetricContexts) flush(metrics []metric) []metric { + bc.mutex.Lock() + values := bc.values + bc.values = bufferedMetricMap{} + bc.mutex.Unlock() + + for _, d := range values { + metrics = append(metrics, d.flushUnsafe()) + } + atomic.AddInt32(&bc.nbContext, int32(len(values))) + return metrics +} + +func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string) error { + context, stringTags := getContextAndTags(name, tags) + bc.mutex.RLock() + if v, found := bc.values[context]; found { + v.sample(value) + bc.mutex.RUnlock() + return nil + } + bc.mutex.RUnlock() + + bc.mutex.Lock() + bc.values[context] = bc.newMetric(name, value, stringTags) + bc.mutex.Unlock() + return nil +} + +func (bc *bufferedMetricContexts) resetAndGetNbContext() int32 { + return atomic.SwapInt32(&bc.nbContext, 0) +} + type aggregator struct { nbContextGauge int32 nbContextCount int32 @@ -22,9 +73,12 @@ type aggregator struct { gaugesM sync.RWMutex setsM sync.RWMutex - gauges gaugesMap - counts countsMap - sets setsMap + gauges gaugesMap + counts countsMap + sets setsMap + histograms bufferedMetricContexts + distributions bufferedMetricContexts + timings bufferedMetricContexts closed chan struct{} exited chan struct{} @@ -33,20 +87,26 @@ type aggregator struct { } type aggregatorMetrics struct { - nbContext int32 - nbContextGauge int32 - nbContextCount int32 - nbContextSet int32 + nbContext int32 + nbContextGauge int32 + nbContextCount int32 + nbContextSet int32 + nbContextHistogram int32 + nbContextDistribution int32 + nbContextTiming int32 } func newAggregator(c *Client) *aggregator { return &aggregator{ - client: c, - counts: countsMap{}, - gauges: gaugesMap{}, - sets: setsMap{}, - closed: make(chan struct{}), - exited: make(chan struct{}), + client: c, + counts: countsMap{}, + gauges: gaugesMap{}, + sets: setsMap{}, + histograms: newBufferedContexts(newHistogramMetric), + distributions: newBufferedContexts(newDistributionMetric), + timings: newBufferedContexts(newTimingMetric), + closed: make(chan struct{}), + exited: make(chan struct{}), } } @@ -84,12 +144,15 @@ func (a *aggregator) flushTelemetryMetrics() *aggregatorMetrics { } am := &aggregatorMetrics{ - nbContextGauge: atomic.SwapInt32(&a.nbContextGauge, 0), - nbContextCount: atomic.SwapInt32(&a.nbContextCount, 0), - nbContextSet: atomic.SwapInt32(&a.nbContextSet, 0), + nbContextGauge: atomic.SwapInt32(&a.nbContextGauge, 0), + nbContextCount: atomic.SwapInt32(&a.nbContextCount, 0), + nbContextSet: atomic.SwapInt32(&a.nbContextSet, 0), + nbContextHistogram: a.histograms.resetAndGetNbContext(), + nbContextDistribution: a.distributions.resetAndGetNbContext(), + nbContextTiming: a.timings.resetAndGetNbContext(), } - am.nbContext = am.nbContextGauge + am.nbContextCount + am.nbContextSet + am.nbContext = am.nbContextGauge + am.nbContextCount + am.nbContextSet + am.nbContextHistogram + am.nbContextDistribution + am.nbContextTiming return am } @@ -126,6 +189,10 @@ func (a *aggregator) flushMetrics() []metric { metrics = append(metrics, c.flushUnsafe()) } + metrics = a.histograms.flush(metrics) + metrics = a.distributions.flush(metrics) + metrics = a.timings.flush(metrics) + atomic.AddInt32(&a.nbContextCount, int32(len(counts))) atomic.AddInt32(&a.nbContextGauge, int32(len(gauges))) atomic.AddInt32(&a.nbContextSet, int32(len(sets))) @@ -133,7 +200,12 @@ func (a *aggregator) flushMetrics() []metric { } func getContext(name string, tags []string) string { - return name + ":" + strings.Join(tags, ",") + return name + ":" + strings.Join(tags, tagSeparatorSymbol) +} + +func getContextAndTags(name string, tags []string) (string, string) { + stringTags := strings.Join(tags, tagSeparatorSymbol) + return name + ":" + stringTags, stringTags } func (a *aggregator) count(name string, value int64, tags []string) error { @@ -185,3 +257,15 @@ func (a *aggregator) set(name string, value string, tags []string) error { a.setsM.Unlock() return nil } + +func (a *aggregator) histogram(name string, value float64, tags []string) error { + return a.histograms.sample(name, value, tags) +} + +func (a *aggregator) distribution(name string, value float64, tags []string) error { + return a.distributions.sample(name, value, tags) +} + +func (a *aggregator) timing(name string, value float64, tags []string) error { + return a.timings.sample(name, value, tags) +} diff --git a/statsd/aggregator_test.go b/statsd/aggregator_test.go index d8a7a4ac..4eb50c98 100644 --- a/statsd/aggregator_test.go +++ b/statsd/aggregator_test.go @@ -14,29 +14,35 @@ func TestAggregatorSample(t *testing.T) { tags := []string{"tag1", "tag2"} - a.gauge("gaugeTest", 21, tags) - assert.Len(t, a.gauges, 1) - assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2") - - a.count("countTest", 21, tags) - assert.Len(t, a.counts, 1) - assert.Contains(t, a.counts, "countTest:tag1,tag2") - - a.set("setTest", "value1", tags) - assert.Len(t, a.sets, 1) - assert.Contains(t, a.sets, "setTest:tag1,tag2") - - a.gauge("gaugeTest", 123, tags) - assert.Len(t, a.gauges, 1) - assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2") - - a.count("countTest", 10, tags) - assert.Len(t, a.counts, 1) - assert.Contains(t, a.counts, "countTest:tag1,tag2") - - a.set("setTest", "value1", tags) - assert.Len(t, a.sets, 1) - assert.Contains(t, a.sets, "setTest:tag1,tag2") + for i := 0; i < 2; i++ { + a.gauge("gaugeTest", 21, tags) + assert.Len(t, a.gauges, 1) + assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2") + + a.count("countTest", 21, tags) + assert.Len(t, a.counts, 1) + assert.Contains(t, a.counts, "countTest:tag1,tag2") + + a.set("setTest", "value1", tags) + assert.Len(t, a.sets, 1) + assert.Contains(t, a.sets, "setTest:tag1,tag2") + + a.set("setTest", "value1", tags) + assert.Len(t, a.sets, 1) + assert.Contains(t, a.sets, "setTest:tag1,tag2") + + a.histogram("histogramTest", 21, tags) + assert.Len(t, a.histograms.values, 1) + assert.Contains(t, a.histograms.values, "histogramTest:tag1,tag2") + + a.distribution("distributionTest", 21, tags) + assert.Len(t, a.distributions.values, 1) + assert.Contains(t, a.distributions.values, "distributionTest:tag1,tag2") + + a.timing("timingTest", 21, tags) + assert.Len(t, a.timings.values, 1) + assert.Contains(t, a.timings.values, "timingTest:tag1,tag2") + } } func TestAggregatorFlush(t *testing.T) { @@ -57,13 +63,28 @@ func TestAggregatorFlush(t *testing.T) { a.set("setTest1", "value2", tags) a.set("setTest2", "value1", tags) + a.histogram("histogramTest1", 21, tags) + a.histogram("histogramTest1", 22, tags) + a.histogram("histogramTest2", 23, tags) + + a.distribution("distributionTest1", 21, tags) + a.distribution("distributionTest1", 22, tags) + a.distribution("distributionTest2", 23, tags) + + a.timing("timingTest1", 21, tags) + a.timing("timingTest1", 22, tags) + a.timing("timingTest2", 23, tags) + metrics := a.flushMetrics() assert.Len(t, a.gauges, 0) assert.Len(t, a.counts, 0) assert.Len(t, a.sets, 0) + assert.Len(t, a.histograms.values, 0) + assert.Len(t, a.distributions.values, 0) + assert.Len(t, a.timings.values, 0) - assert.Len(t, metrics, 7) + assert.Len(t, metrics, 13) sort.Slice(metrics, func(i, j int) bool { if metrics[i].metricType == metrics[j].metricType { @@ -77,7 +98,7 @@ func TestAggregatorFlush(t *testing.T) { return metrics[i].metricType < metrics[j].metricType }) - assert.Equal(t, metrics, []metric{ + assert.Equal(t, []metric{ metric{ metricType: gauge, name: "gaugeTest1", @@ -106,6 +127,34 @@ func TestAggregatorFlush(t *testing.T) { rate: 1, ivalue: int64(1), }, + metric{ + metricType: histogramAggregated, + name: "histogramTest1", + stags: strings.Join(tags, tagSeparatorSymbol), + rate: 1, + fvalues: []float64{21.0, 22.0}, + }, + metric{ + metricType: histogramAggregated, + name: "histogramTest2", + stags: strings.Join(tags, tagSeparatorSymbol), + rate: 1, + fvalues: []float64{23.0}, + }, + metric{ + metricType: distributionAggregated, + name: "distributionTest1", + stags: strings.Join(tags, tagSeparatorSymbol), + rate: 1, + fvalues: []float64{21.0, 22.0}, + }, + metric{ + metricType: distributionAggregated, + name: "distributionTest2", + stags: strings.Join(tags, tagSeparatorSymbol), + rate: 1, + fvalues: []float64{23.0}, + }, metric{ metricType: set, name: "setTest1", @@ -127,7 +176,22 @@ func TestAggregatorFlush(t *testing.T) { rate: 1, svalue: "value1", }, - }) + metric{ + metricType: timingAggregated, + name: "timingTest1", + stags: strings.Join(tags, tagSeparatorSymbol), + rate: 1, + fvalues: []float64{21.0, 22.0}, + }, + metric{ + metricType: timingAggregated, + name: "timingTest2", + stags: strings.Join(tags, tagSeparatorSymbol), + rate: 1, + fvalues: []float64{23.0}, + }, + }, + metrics) } @@ -146,6 +210,9 @@ func TestAggregatorFlushConcurrency(t *testing.T) { a.gauge("gaugeTest1", 21, tags) a.count("countTest1", 21, tags) a.set("setTest1", "value1", tags) + a.histogram("histogramTest1", 21, tags) + a.distribution("distributionTest1", 21, tags) + a.timing("timingTest1", 21, tags) }() } diff --git a/statsd/buffer.go b/statsd/buffer.go index c38e229a..5da60e09 100644 --- a/statsd/buffer.go +++ b/statsd/buffer.go @@ -1,11 +1,21 @@ package statsd +import ( + "strconv" +) + type bufferFullError string func (e bufferFullError) Error() string { return string(e) } const errBufferFull = bufferFullError("statsd buffer is full") +type partialWriteError string + +func (e partialWriteError) Error() string { return string(e) } + +const errPartialWrite = partialWriteError("value partially written") + const metricOverhead = 512 // statsdBuffer is a buffer containing statsd messages @@ -55,6 +65,56 @@ func (b *statsdBuffer) writeHistogram(namespace string, globalTags []string, nam return b.validateNewElement(originalBuffer) } +// writeAggregated serialized as many values as possible in the current buffer and return the position in values where it stopped. +func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, globalTags []string, name string, values []float64, tags string, tagSize int) (int, error) { + if b.elementCount >= b.maxElements { + return 0, errBufferFull + } + + originalBuffer := b.buffer + b.buffer = appendHeader(b.buffer, namespace, name) + + // buffer already full + if len(b.buffer)+tagSize > b.maxSize { + b.buffer = originalBuffer + return 0, errBufferFull + } + + // We add as many value as possible + var position int + for idx, v := range values { + previousBuffer := b.buffer + if idx != 0 { + b.buffer = append(b.buffer, ':') + } + b.buffer = strconv.AppendFloat(b.buffer, v, 'f', -1, 64) + + // Should we stop serializing and switch to another buffer + if len(b.buffer)+tagSize > b.maxSize { + b.buffer = previousBuffer + break + } + position = idx + 1 + } + + // we could not add a single value + if position == 0 { + b.buffer = originalBuffer + return 0, errBufferFull + } + + b.buffer = append(b.buffer, '|') + b.buffer = append(b.buffer, metricSymbol...) + b.buffer = appendTagsAggregated(b.buffer, globalTags, tags) + b.elementCount++ + + if position != len(values) { + return position, errPartialWrite + } + return position, nil + +} + func (b *statsdBuffer) writeDistribution(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error { if b.elementCount >= b.maxElements { return errBufferFull @@ -116,7 +176,7 @@ func (b *statsdBuffer) validateNewElement(originalBuffer []byte) error { func (b *statsdBuffer) writeSeparator() { if b.elementCount != 0 { - b.buffer = appendSeparator(b.buffer) + b.buffer = append(b.buffer, '\n') } } diff --git a/statsd/buffer_test.go b/statsd/buffer_test.go index 01c62fa4..e8701fac 100644 --- a/statsd/buffer_test.go +++ b/statsd/buffer_test.go @@ -86,3 +86,57 @@ func TestBufferSeparator(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|g|#tag:tag\nnamespace.metric:1|g|#tag:tag", string(buffer.bytes())) } + +func TestBufferAggregated(t *testing.T) { + buffer := newStatsdBuffer(1024, 1) + pos, err := buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1}, "", 12) + assert.Nil(t, err) + assert.Equal(t, 1, pos) + assert.Equal(t, `namespace.metric:1|h|#tag:tag`, string(buffer.bytes())) + + buffer = newStatsdBuffer(1024, 1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12) + assert.Nil(t, err) + assert.Equal(t, 4, pos) + assert.Equal(t, `namespace.metric:1:2:3:4|h|#tag:tag`, string(buffer.bytes())) + + // max element already used + buffer = newStatsdBuffer(1024, 1) + buffer.elementCount = 1 + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12) + assert.Equal(t, errBufferFull, err) + + // not enought size to start serializing + buffer = newStatsdBuffer(29, 1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12) + assert.Equal(t, errBufferFull, err) + + // space for only 1 number + buffer = newStatsdBuffer(30, 1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12) + assert.Equal(t, errPartialWrite, err) + assert.Equal(t, 1, pos) + assert.Equal(t, `namespace.metric:1|h|#tag:tag`, string(buffer.bytes())) + + // first value too big + buffer = newStatsdBuffer(30, 1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{12, 2, 3, 4}, "", 12) + assert.Equal(t, errBufferFull, err) + assert.Equal(t, 0, pos) + assert.Equal(t, "", string(buffer.bytes())) // checking that the buffer was reset + + // not enough space left + buffer = newStatsdBuffer(40, 1) + buffer.buffer = append(buffer.buffer, []byte("abcdefghij")...) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{12, 2, 3, 4}, "", 12) + assert.Equal(t, errBufferFull, err) + assert.Equal(t, 0, pos) + assert.Equal(t, "abcdefghij", string(buffer.bytes())) // checking that the buffer was reset + + // space for only 2 number + buffer = newStatsdBuffer(32, 1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12) + assert.Equal(t, errPartialWrite, err) + assert.Equal(t, 2, pos) + assert.Equal(t, `namespace.metric:1:2|h|#tag:tag`, string(buffer.bytes())) +} diff --git a/statsd/format.go b/statsd/format.go index bd856a04..8d62aa7b 100644 --- a/statsd/format.go +++ b/statsd/format.go @@ -12,6 +12,7 @@ var ( distributionSymbol = []byte("d") setSymbol = []byte("s") timingSymbol = []byte("ms") + tagSeparatorSymbol = "," ) func appendHeader(buffer []byte, namespace string, name string) []byte { @@ -54,14 +55,14 @@ func appendTags(buffer []byte, globalTags []string, tags []string) []byte { for _, tag := range globalTags { if !firstTag { - buffer = append(buffer, ',') + buffer = append(buffer, tagSeparatorSymbol...) } buffer = appendWithoutNewlines(buffer, tag) firstTag = false } for _, tag := range tags { if !firstTag { - buffer = append(buffer, ',') + buffer = append(buffer, tagSeparatorSymbol...) } buffer = appendWithoutNewlines(buffer, tag) firstTag = false @@ -69,6 +70,30 @@ func appendTags(buffer []byte, globalTags []string, tags []string) []byte { return buffer } +func appendTagsAggregated(buffer []byte, globalTags []string, tags string) []byte { + if len(globalTags) == 0 && tags == "" { + return buffer + } + + buffer = append(buffer, "|#"...) + firstTag := true + + for _, tag := range globalTags { + if !firstTag { + buffer = append(buffer, tagSeparatorSymbol...) + } + buffer = appendWithoutNewlines(buffer, tag) + firstTag = false + } + if tags != "" { + if !firstTag { + buffer = append(buffer, tagSeparatorSymbol...) + } + buffer = appendWithoutNewlines(buffer, tags) + } + return buffer +} + func appendFloatMetric(buffer []byte, typeSymbol []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64, precision int) []byte { buffer = appendHeader(buffer, namespace, name) buffer = strconv.AppendFloat(buffer, value, 'f', precision, 64) @@ -143,7 +168,7 @@ func appendEvent(buffer []byte, event Event, globalTags []string) []byte { buffer = append(buffer, "_e{"...) buffer = strconv.AppendInt(buffer, int64(len(event.Title)), 10) - buffer = append(buffer, ',') + buffer = append(buffer, tagSeparatorSymbol...) buffer = strconv.AppendInt(buffer, int64(escapedTextLen), 10) buffer = append(buffer, "}:"...) buffer = append(buffer, event.Title...) diff --git a/statsd/format_test.go b/statsd/format_test.go index d2f9a028..ae5d59a4 100644 --- a/statsd/format_test.go +++ b/statsd/format_test.go @@ -7,6 +7,42 @@ import ( "github.com/stretchr/testify/assert" ) +func TestFormatAppendTags(t *testing.T) { + var buffer []byte + buffer = appendTags(buffer, []string{"global:tag"}, []string{"tag:tag", "tag2:tag2"}) + assert.Equal(t, `|#global:tag,tag:tag,tag2:tag2`, string(buffer)) + + var buffer2 []byte + buffer2 = appendTags(buffer2, []string{"global:tag"}, nil) + assert.Equal(t, `|#global:tag`, string(buffer2)) + + var buffer3 []byte + buffer3 = appendTags(buffer3, nil, []string{"tag:tag", "tag2:tag2"}) + assert.Equal(t, `|#tag:tag,tag2:tag2`, string(buffer3)) + + var buffer4 []byte + buffer4 = appendTags(buffer4, nil, nil) + assert.Equal(t, "", string(buffer4)) +} + +func TestFormatAppendTagsAggregated(t *testing.T) { + var buffer []byte + buffer = appendTagsAggregated(buffer, []string{"global:tag"}, "tag:tag,tag2:tag2") + assert.Equal(t, `|#global:tag,tag:tag,tag2:tag2`, string(buffer)) + + var buffer2 []byte + buffer2 = appendTagsAggregated(buffer2, []string{"global:tag"}, "") + assert.Equal(t, `|#global:tag`, string(buffer2)) + + var buffer3 []byte + buffer3 = appendTagsAggregated(buffer3, nil, "tag:tag,tag2:tag2") + assert.Equal(t, `|#tag:tag,tag2:tag2`, string(buffer3)) + + var buffer4 []byte + buffer4 = appendTagsAggregated(buffer4, nil, "") + assert.Equal(t, "", string(buffer4)) +} + func TestFormatAppendGauge(t *testing.T) { var buffer []byte buffer = appendGauge(buffer, "namespace.", []string{"global:tag"}, "gauge", 1., []string{"tag:tag"}, 1) diff --git a/statsd/metrics.go b/statsd/metrics.go index de3b448c..99ed4da5 100644 --- a/statsd/metrics.go +++ b/statsd/metrics.go @@ -117,3 +117,65 @@ func (s *setMetric) flushUnsafe() []metric { } return metrics } + +// Histograms, Distributions and Timings + +type bufferedMetric struct { + sync.Mutex + + data []float64 + name string + // Histograms and Distributions store tags as one string since we need + // to compute its size multiple time when serializing. + tags string + mtype metricType +} + +func (s *bufferedMetric) sample(v float64) { + s.Lock() + defer s.Unlock() + s.data = append(s.data, v) +} + +func (s *bufferedMetric) flushUnsafe() metric { + return metric{ + metricType: s.mtype, + name: s.name, + stags: s.tags, + rate: 1, + fvalues: s.data, + } +} + +type histogramMetric = bufferedMetric + +func newHistogramMetric(name string, value float64, stringTags string) *histogramMetric { + return &histogramMetric{ + data: []float64{value}, + name: name, + tags: stringTags, + mtype: histogramAggregated, + } +} + +type distributionMetric = bufferedMetric + +func newDistributionMetric(name string, value float64, stringTags string) *distributionMetric { + return &distributionMetric{ + data: []float64{value}, + name: name, + tags: stringTags, + mtype: distributionAggregated, + } +} + +type timingMetric = bufferedMetric + +func newTimingMetric(name string, value float64, stringTags string) *timingMetric { + return &timingMetric{ + data: []float64{value}, + name: name, + tags: stringTags, + mtype: timingAggregated, + } +} diff --git a/statsd/metrics_test.go b/statsd/metrics_test.go index 6934109f..41bb743d 100644 --- a/statsd/metrics_test.go +++ b/statsd/metrics_test.go @@ -116,3 +116,117 @@ func TestFlushUnsafeSetMetricSample(t *testing.T) { assert.Equal(t, m[1].name, "test") assert.Equal(t, m[1].tags, []string{"tag1", "tag2"}) } + +func TestNewHistogramMetric(t *testing.T) { + s := newHistogramMetric("test", 1.0, "tag1,tag2") + assert.Equal(t, s.data, []float64{1.0}) + assert.Equal(t, s.name, "test") + assert.Equal(t, s.tags, "tag1,tag2") + assert.Equal(t, s.mtype, histogramAggregated) +} + +func TestHistogramMetricSample(t *testing.T) { + s := newHistogramMetric("test", 1.0, "tag1,tag2") + s.sample(123.45) + assert.Equal(t, s.data, []float64{1.0, 123.45}) + assert.Equal(t, s.name, "test") + assert.Equal(t, s.tags, "tag1,tag2") + assert.Equal(t, s.mtype, histogramAggregated) +} + +func TestFlushUnsafeHistogramMetricSample(t *testing.T) { + s := newHistogramMetric("test", 1.0, "tag1,tag2") + m := s.flushUnsafe() + + assert.Equal(t, m.metricType, histogramAggregated) + assert.Equal(t, m.fvalues, []float64{1.0}) + assert.Equal(t, m.name, "test") + assert.Equal(t, m.stags, "tag1,tag2") + assert.Nil(t, m.tags) + + s.sample(21) + s.sample(123.45) + m = s.flushUnsafe() + + assert.Equal(t, m.metricType, histogramAggregated) + assert.Equal(t, m.fvalues, []float64{1.0, 21.0, 123.45}) + assert.Equal(t, m.name, "test") + assert.Equal(t, m.stags, "tag1,tag2") + assert.Nil(t, m.tags) +} + +func TestNewDistributionMetric(t *testing.T) { + s := newDistributionMetric("test", 1.0, "tag1,tag2") + assert.Equal(t, s.data, []float64{1.0}) + assert.Equal(t, s.name, "test") + assert.Equal(t, s.tags, "tag1,tag2") + assert.Equal(t, s.mtype, distributionAggregated) +} + +func TestDistributionMetricSample(t *testing.T) { + s := newDistributionMetric("test", 1.0, "tag1,tag2") + s.sample(123.45) + assert.Equal(t, s.data, []float64{1.0, 123.45}) + assert.Equal(t, s.name, "test") + assert.Equal(t, s.tags, "tag1,tag2") + assert.Equal(t, s.mtype, distributionAggregated) +} + +func TestFlushUnsafeDistributionMetricSample(t *testing.T) { + s := newDistributionMetric("test", 1.0, "tag1,tag2") + m := s.flushUnsafe() + + assert.Equal(t, m.metricType, distributionAggregated) + assert.Equal(t, m.fvalues, []float64{1.0}) + assert.Equal(t, m.name, "test") + assert.Equal(t, m.stags, "tag1,tag2") + assert.Nil(t, m.tags) + + s.sample(21) + s.sample(123.45) + m = s.flushUnsafe() + + assert.Equal(t, m.metricType, distributionAggregated) + assert.Equal(t, m.fvalues, []float64{1.0, 21.0, 123.45}) + assert.Equal(t, m.name, "test") + assert.Equal(t, m.stags, "tag1,tag2") + assert.Nil(t, m.tags) +} + +func TestNewTimingMetric(t *testing.T) { + s := newTimingMetric("test", 1.0, "tag1,tag2") + assert.Equal(t, s.data, []float64{1.0}) + assert.Equal(t, s.name, "test") + assert.Equal(t, s.tags, "tag1,tag2") + assert.Equal(t, s.mtype, timingAggregated) +} + +func TestTimingMetricSample(t *testing.T) { + s := newTimingMetric("test", 1.0, "tag1,tag2") + s.sample(123.45) + assert.Equal(t, s.data, []float64{1.0, 123.45}) + assert.Equal(t, s.name, "test") + assert.Equal(t, s.tags, "tag1,tag2") + assert.Equal(t, s.mtype, timingAggregated) +} + +func TestFlushUnsafeTimingMetricSample(t *testing.T) { + s := newTimingMetric("test", 1.0, "tag1,tag2") + m := s.flushUnsafe() + + assert.Equal(t, m.metricType, timingAggregated) + assert.Equal(t, m.fvalues, []float64{1.0}) + assert.Equal(t, m.name, "test") + assert.Equal(t, m.stags, "tag1,tag2") + assert.Nil(t, m.tags) + + s.sample(21) + s.sample(123.45) + m = s.flushUnsafe() + + assert.Equal(t, m.metricType, timingAggregated) + assert.Equal(t, m.fvalues, []float64{1.0, 21.0, 123.45}) + assert.Equal(t, m.name, "test") + assert.Equal(t, m.stags, "tag1,tag2") + assert.Nil(t, m.tags) +} diff --git a/statsd/options.go b/statsd/options.go index fef96c0a..1e4651ea 100644 --- a/statsd/options.go +++ b/statsd/options.go @@ -35,6 +35,8 @@ var ( DefaultAggregationFlushInterval = 3 * time.Second // DefaultAggregation DefaultAggregation = false + // DefaultExtendedAggregation + DefaultExtendedAggregation = false // DefaultDevMode DefaultDevMode = false ) @@ -95,8 +97,13 @@ type Options struct { ChannelModeBufferSize int // AggregationFlushInterval is the interval for the aggregator to flush metrics AggregationFlushInterval time.Duration - // [beta] Aggregation enables/disables client side aggregation + // [beta] Aggregation enables/disables client side aggregation for + // Gauges, Counts and Sets (compatible with every Agent's version). Aggregation bool + // [beta] Extended aggregation enables/disables client side aggregation + // for all types. This feature is only compatible with Agent's versions + // >=7.25.0 or Agent's version >=6.25.0 && < 7.0.0. + ExtendedAggregation bool // TelemetryAddr specify a different endpoint for telemetry metrics. TelemetryAddr string // DevMode enables the "dev" mode where the client sends much more @@ -120,6 +127,7 @@ func resolveOptions(options []Option) (*Options, error) { ChannelModeBufferSize: DefaultChannelModeBufferSize, AggregationFlushInterval: DefaultAggregationFlushInterval, Aggregation: DefaultAggregation, + ExtendedAggregation: DefaultExtendedAggregation, DevMode: DefaultDevMode, } @@ -252,7 +260,8 @@ func WithAggregationInterval(interval time.Duration) Option { } } -// WithClientSideAggregation enables client side aggregation. Client side aggregation is a beta feature. +// WithClientSideAggregation enables client side aggregation for Gauges, Counts +// and Sets. Client side aggregation is a beta feature. func WithClientSideAggregation() Option { return func(o *Options) error { o.Aggregation = true @@ -264,6 +273,19 @@ func WithClientSideAggregation() Option { func WithoutClientSideAggregation() Option { return func(o *Options) error { o.Aggregation = false + o.ExtendedAggregation = false + return nil + } +} + +// WithExtendedClientSideAggregation enables client side aggregation for all +// types. This feature is only compatible with Agent's version >=6.25.0 && +// <7.0.0 or Agent's versions >=7.25.0. Client side aggregation is a beta +// feature. +func WithExtendedClientSideAggregation() Option { + return func(o *Options) error { + o.Aggregation = true + o.ExtendedAggregation = true return nil } } diff --git a/statsd/options_test.go b/statsd/options_test.go index c4a4c2a9..776ae2fb 100644 --- a/statsd/options_test.go +++ b/statsd/options_test.go @@ -25,6 +25,7 @@ func TestDefaultOptions(t *testing.T) { assert.Equal(t, options.ChannelModeBufferSize, DefaultChannelModeBufferSize) assert.Equal(t, options.AggregationFlushInterval, DefaultAggregationFlushInterval) assert.Equal(t, options.Aggregation, DefaultAggregation) + assert.Equal(t, options.ExtendedAggregation, DefaultExtendedAggregation) assert.Zero(t, options.TelemetryAddr) assert.False(t, options.DevMode) } @@ -77,10 +78,21 @@ func TestOptions(t *testing.T) { assert.Equal(t, options.ChannelModeBufferSize, testChannelBufferSize) assert.Equal(t, options.AggregationFlushInterval, testAggregationWindow) assert.Equal(t, options.Aggregation, true) + assert.Equal(t, options.ExtendedAggregation, false) assert.Equal(t, options.TelemetryAddr, testTelemetryAddr) assert.True(t, options.DevMode) } +func TestExtendedAggregation(t *testing.T) { + options, err := resolveOptions([]Option{ + WithExtendedClientSideAggregation(), + }) + + assert.NoError(t, err) + assert.Equal(t, options.Aggregation, true) + assert.Equal(t, options.ExtendedAggregation, true) +} + func TestResetOptions(t *testing.T) { options, err := resolveOptions([]Option{ WithChannelMode(), diff --git a/statsd/statsd.go b/statsd/statsd.go index 42e4a4e9..da433e63 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -6,19 +6,6 @@ adding tags and histograms and pushing upstream to Datadog. Refer to http://docs.datadoghq.com/guides/dogstatsd/ for information about DogStatsD. -Example Usage: - - // Create the client - c, err := statsd.New("127.0.0.1:8125") - if err != nil { - log.Fatal(err) - } - // Prefix every metric with the app name - c.Namespace = "flubber." - // Send the EC2 availability zone as a tag with every metric - c.Tags = append(c.Tags, "us-east-1a") - err = c.Gauge("request.duration", 1.2, nil, 1) - statsd is based on go-statsd-client. */ package statsd @@ -89,9 +76,12 @@ const ( gauge metricType = iota count histogram + histogramAggregated distribution + distributionAggregated set timing + timingAggregated event serviceCheck ) @@ -114,11 +104,13 @@ type metric struct { globalTags []string name string fvalue float64 + fvalues []float64 ivalue int64 svalue string evalue *Event scvalue *ServiceCheck tags []string + stags string rate float64 } @@ -206,6 +198,7 @@ type Client struct { closerLock sync.Mutex receiveMode ReceivingMode agg *aggregator + aggHistDist *aggregator options []Option addrOption string } @@ -291,9 +284,13 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro Tags: o.Tags, metrics: &ClientMetrics{}, } - if o.Aggregation { + if o.Aggregation || o.ExtendedAggregation { c.agg = newAggregator(&c) c.agg.start(o.AggregationFlushInterval) + + if o.ExtendedAggregation { + c.aggHistDist = c.agg + } } // Inject values of DD_* environment variables as global tags. @@ -488,6 +485,9 @@ func (c *Client) Histogram(name string, value float64, tags []string, rate float return ErrNoClient } atomic.AddUint64(&c.metrics.TotalMetricsHistogram, 1) + if c.aggHistDist != nil { + return c.agg.histogram(name, value, tags) + } return c.send(metric{metricType: histogram, name: name, fvalue: value, tags: tags, rate: rate}) } @@ -497,6 +497,9 @@ func (c *Client) Distribution(name string, value float64, tags []string, rate fl return ErrNoClient } atomic.AddUint64(&c.metrics.TotalMetricsDistribution, 1) + if c.aggHistDist != nil { + return c.agg.distribution(name, value, tags) + } return c.send(metric{metricType: distribution, name: name, fvalue: value, tags: tags, rate: rate}) } @@ -534,6 +537,9 @@ func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, r return ErrNoClient } atomic.AddUint64(&c.metrics.TotalMetricsTiming, 1) + if c.aggHistDist != nil { + return c.agg.timing(name, value, tags) + } return c.send(metric{metricType: timing, name: name, fvalue: value, tags: tags, rate: rate}) } diff --git a/statsd/telemetry.go b/statsd/telemetry.go index e7342f64..a8c3728f 100644 --- a/statsd/telemetry.go +++ b/statsd/telemetry.go @@ -45,6 +45,7 @@ func newTelemetryClient(c *Client, transport string, devMode bool) *telemetryCli t.tagsByType[timing] = append(append([]string{}, t.tags...), "metrics_type:timing") t.tagsByType[histogram] = append(append([]string{}, t.tags...), "metrics_type:histogram") t.tagsByType[distribution] = append(append([]string{}, t.tags...), "metrics_type:distribution") + t.tagsByType[timing] = append(append([]string{}, t.tags...), "metrics_type:timing") } return t } @@ -140,6 +141,9 @@ func (t *telemetryClient) flush() []metric { telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextGauge), t.tagsByType[gauge]) telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextSet), t.tagsByType[set]) telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextCount), t.tagsByType[count]) + telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextHistogram), t.tagsByType[histogram]) + telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextDistribution), t.tagsByType[distribution]) + telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextTiming), t.tagsByType[timing]) } } diff --git a/statsd/telemetry_test.go b/statsd/telemetry_test.go index 0c643bd8..9dd18389 100644 --- a/statsd/telemetry_test.go +++ b/statsd/telemetry_test.go @@ -42,21 +42,39 @@ func appendBasicMetrics(metrics []metric, tags []string) []metric { return metrics } -func appendAggregationMetrics(metrics []metric, tags []string, devMode bool) []metric { - metrics = append(metrics, metric{ - name: "datadog.dogstatsd.client.aggregated_context", - ivalue: 5, - metricType: count, - tags: append(tags, basicExpectedTags...), - rate: float64(1), - }) +func appendAggregationMetrics(metrics []metric, tags []string, devMode bool, extendedAggregation bool) []metric { + if extendedAggregation { + metrics = append(metrics, metric{ + name: "datadog.dogstatsd.client.aggregated_context", + ivalue: 9, + metricType: count, + tags: append(tags, basicExpectedTags...), + rate: float64(1), + }) + } else { + metrics = append(metrics, metric{ + name: "datadog.dogstatsd.client.aggregated_context", + ivalue: 5, + metricType: count, + tags: append(tags, basicExpectedTags...), + rate: float64(1), + }) + } if devMode { contextByTypeName := "datadog.dogstatsd.client.aggregated_context_by_type" devModeAggregationExpectedMetrics := map[string]int64{ - "metrics_type:gauge": 1, - "metrics_type:set": 1, - "metrics_type:count": 3, + "metrics_type:gauge": 1, + "metrics_type:set": 1, + "metrics_type:count": 3, + "metrics_type:histogram": 0, + "metrics_type:distribution": 0, + "metrics_type:timing": 0, + } + if extendedAggregation { + devModeAggregationExpectedMetrics["metrics_type:histogram"] = 1 + devModeAggregationExpectedMetrics["metrics_type:distribution"] = 1 + devModeAggregationExpectedMetrics["metrics_type:timing"] = 2 } for typeTag, value := range devModeAggregationExpectedMetrics { @@ -227,7 +245,7 @@ func TestTelemetryWithGlobalTags(t *testing.T) { testTelemetry(t, telemetry, expectedMetrics) } -func TestTelemetryWithAggregation(t *testing.T) { +func TestTelemetryWithAggregationBasic(t *testing.T) { // disabling autoflush of the telemetry client, err := New("localhost:8125", WithoutTelemetry(), WithClientSideAggregation()) require.Nil(t, err) @@ -236,21 +254,35 @@ func TestTelemetryWithAggregation(t *testing.T) { expectedMetrics := []metric{} expectedMetrics = appendBasicMetrics(expectedMetrics, nil) - expectedMetrics = appendAggregationMetrics(expectedMetrics, nil, false) + expectedMetrics = appendAggregationMetrics(expectedMetrics, nil, false, false) + + testTelemetry(t, telemetry, expectedMetrics) +} + +func TestTelemetryWithAggregationAllType(t *testing.T) { + // disabling autoflush of the telemetry + client, err := New("localhost:8125", WithoutTelemetry(), WithExtendedClientSideAggregation()) + require.Nil(t, err) + + telemetry := newTelemetryClient(client, "test_transport", false) + + expectedMetrics := []metric{} + expectedMetrics = appendBasicMetrics(expectedMetrics, nil) + expectedMetrics = appendAggregationMetrics(expectedMetrics, nil, false, true) testTelemetry(t, telemetry, expectedMetrics) } func TestTelemetryWithAggregationDevMode(t *testing.T) { // disabling autoflush of the telemetry - client, err := New("localhost:8125", WithoutTelemetry(), WithClientSideAggregation(), WithDevMode()) + client, err := New("localhost:8125", WithoutTelemetry(), WithExtendedClientSideAggregation(), WithDevMode()) require.Nil(t, err) telemetry := newTelemetryClient(client, "test_transport", true) expectedMetrics := []metric{} expectedMetrics = appendBasicMetrics(expectedMetrics, nil) - expectedMetrics = appendAggregationMetrics(expectedMetrics, nil, true) + expectedMetrics = appendAggregationMetrics(expectedMetrics, nil, true, true) expectedMetrics = appendDevModeMetrics(expectedMetrics, nil) testTelemetry(t, telemetry, expectedMetrics) @@ -269,7 +301,7 @@ func TestTelemetryWithAggregationDevModeWithGlobalTags(t *testing.T) { expectedMetrics := []metric{} expectedMetrics = appendBasicMetrics(expectedMetrics, []string{"tag1", "tag2", "env:test"}) - expectedMetrics = appendAggregationMetrics(expectedMetrics, []string{"tag1", "tag2", "env:test"}, true) + expectedMetrics = appendAggregationMetrics(expectedMetrics, []string{"tag1", "tag2", "env:test"}, true, false) expectedMetrics = appendDevModeMetrics(expectedMetrics, []string{"tag1", "tag2", "env:test"}) testTelemetry(t, telemetry, expectedMetrics) diff --git a/statsd/worker.go b/statsd/worker.go index 4741c5ac..40f790a8 100644 --- a/statsd/worker.go +++ b/statsd/worker.go @@ -77,6 +77,31 @@ func (w *worker) shouldSample(rate float64) bool { return true } +func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte) error { + globalPos := 0 + + // first check how much data we can write to the buffer: + // +3 + len(metricSymbol) because the message will include '||#' before the tags + // +1 for the coma between the two set of tags + tagsSize := len(m.stags) + 4 + len(metricSymbol) + for _, t := range m.globalTags { + tagsSize += len(t) + 1 + } + + for { + pos, err := w.buffer.writeAggregated(metricSymbol, m.namespace, m.globalTags, m.name, m.fvalues[globalPos:], m.stags, tagsSize) + if err == errPartialWrite { + // We successfully wrote part of the histogram metrics. + // We flush the current buffer and finish the histogram + // in a new one. + w.flushUnsafe() + globalPos += pos + } else { + return err + } + } +} + func (w *worker) writeMetricUnsafe(m metric) error { switch m.metricType { case gauge: @@ -95,6 +120,12 @@ func (w *worker) writeMetricUnsafe(m metric) error { return w.buffer.writeEvent(*m.evalue, m.globalTags) case serviceCheck: return w.buffer.writeServiceCheck(*m.scvalue, m.globalTags) + case histogramAggregated: + return w.writeAggregatedMetricUnsafe(m, histogramSymbol) + case distributionAggregated: + return w.writeAggregatedMetricUnsafe(m, distributionSymbol) + case timingAggregated: + return w.writeAggregatedMetricUnsafe(m, timingSymbol) default: return nil } diff --git a/statsd/worker_test.go b/statsd/worker_test.go index 58a8d881..7d07e13b 100644 --- a/statsd/worker_test.go +++ b/statsd/worker_test.go @@ -36,3 +36,220 @@ func BenchmarkShouldSample(b *testing.B) { } }) } + +func initWorker(bufferSize int) (*bufferPool, *sender, *worker) { + pool := newBufferPool(10, bufferSize, 5) + // manually create the sender so the sender loop is not started. All we + // need is the queue + s := &sender{ + queue: make(chan *statsdBuffer, 10), + pool: pool, + } + + w := newWorker(pool, s) + return pool, s, w +} + +func testWorker(t *testing.T, m metric, expectedBuffer string) { + _, s, w := initWorker(100) + + err := w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data := <-s.queue + assert.Equal(t, expectedBuffer, string(data.buffer)) + +} + +func TestWorkerGauge(t *testing.T) { + testWorker( + t, + metric{ + metricType: gauge, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_gauge", + fvalue: 21, + tags: []string{"tag1", "tag2"}, + rate: 1, + }, + "namespace.test_gauge:21|g|#globalTags,globalTags2,tag1,tag2", + ) +} + +func TestWorkerCount(t *testing.T) { + testWorker( + t, + metric{ + metricType: count, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_count", + ivalue: 21, + tags: []string{"tag1", "tag2"}, + rate: 1, + }, + "namespace.test_count:21|c|#globalTags,globalTags2,tag1,tag2", + ) +} + +func TestWorkerHistogram(t *testing.T) { + testWorker( + t, + metric{ + metricType: histogram, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_histogram", + fvalue: 21, + tags: []string{"tag1", "tag2"}, + rate: 1, + }, + "namespace.test_histogram:21|h|#globalTags,globalTags2,tag1,tag2", + ) +} + +func TestWorkerDistribution(t *testing.T) { + testWorker( + t, + metric{ + metricType: distribution, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_distribution", + fvalue: 21, + tags: []string{"tag1", "tag2"}, + rate: 1, + }, + "namespace.test_distribution:21|d|#globalTags,globalTags2,tag1,tag2", + ) +} + +func TestWorkerSet(t *testing.T) { + testWorker( + t, + metric{ + metricType: set, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_set", + svalue: "value:1", + tags: []string{"tag1", "tag2"}, + rate: 1, + }, + "namespace.test_set:value:1|s|#globalTags,globalTags2,tag1,tag2", + ) +} + +func TestWorkerTiming(t *testing.T) { + testWorker( + t, + metric{ + metricType: timing, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_timing", + fvalue: 1.2, + tags: []string{"tag1", "tag2"}, + rate: 1, + }, + "namespace.test_timing:1.200000|ms|#globalTags,globalTags2,tag1,tag2", + ) +} + +func TestWorkerHistogramAggregated(t *testing.T) { + testWorker( + t, + metric{ + metricType: histogramAggregated, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_histogram", + fvalues: []float64{1.2}, + stags: "tag1,tag2", + rate: 1, + }, + "namespace.test_histogram:1.2|h|#globalTags,globalTags2,tag1,tag2", + ) +} + +func TestWorkerHistogramAggregatedMultiple(t *testing.T) { + _, s, w := initWorker(100) + + m := metric{ + metricType: histogramAggregated, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_histogram", + fvalues: []float64{1.1, 2.2, 3.3, 4.4}, + stags: "tag1,tag2", + rate: 1, + } + err := w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data := <-s.queue + assert.Equal(t, "namespace.test_histogram:1.1:2.2:3.3:4.4|h|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) + + // reducing buffer size so not all values fit in one packet + _, s, w = initWorker(70) + + err = w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data = <-s.queue + assert.Equal(t, "namespace.test_histogram:1.1:2.2|h|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) + data = <-s.queue + assert.Equal(t, "namespace.test_histogram:3.3:4.4|h|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) +} + +func TestWorkerDistributionAggregated(t *testing.T) { + testWorker( + t, + metric{ + metricType: distributionAggregated, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_distribution", + fvalues: []float64{1.2}, + stags: "tag1,tag2", + rate: 1, + }, + "namespace.test_distribution:1.2|d|#globalTags,globalTags2,tag1,tag2", + ) +} + +func TestWorkerDistributionAggregatedMultiple(t *testing.T) { + _, s, w := initWorker(100) + + m := metric{ + metricType: distributionAggregated, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_distribution", + fvalues: []float64{1.1, 2.2, 3.3, 4.4}, + stags: "tag1,tag2", + rate: 1, + } + err := w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data := <-s.queue + assert.Equal(t, "namespace.test_distribution:1.1:2.2:3.3:4.4|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) + + // reducing buffer size so not all values fit in one packet + _, s, w = initWorker(72) + + err = w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data = <-s.queue + assert.Equal(t, "namespace.test_distribution:1.1:2.2|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) + data = <-s.queue + assert.Equal(t, "namespace.test_distribution:3.3:4.4|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) +}