From a64f8dc8626837b8f5cde1f153289a3e004bda6c Mon Sep 17 00:00:00 2001 From: Remy Mathieu Date: Thu, 28 Jul 2022 14:29:33 +0200 Subject: [PATCH] statsd: the metrics with timestamp are directly written in the serialization buffer. They are not going through the client aggregator. --- statsd/aggregator.go | 16 ------ statsd/aggregator_test.go | 103 -------------------------------------- statsd/statsd.go | 22 ++++---- 3 files changed, 11 insertions(+), 130 deletions(-) diff --git a/statsd/aggregator.go b/statsd/aggregator.go index fd2460a7..58596349 100644 --- a/statsd/aggregator.go +++ b/statsd/aggregator.go @@ -226,14 +226,6 @@ func (a *aggregator) count(name string, value int64, tags []string) error { return nil } -func (a *aggregator) countWithTimestamp(name string, value int64, tags []string, timestamp int64) error { - context := getContext(name, tags) - a.countsM.Lock() - a.counts[context] = newCountMetric(name, value, tags, timestamp) - a.countsM.Unlock() - return nil -} - func (a *aggregator) gauge(name string, value float64, tags []string) error { context := getContext(name, tags) a.gaugesM.RLock() @@ -258,14 +250,6 @@ func (a *aggregator) gauge(name string, value float64, tags []string) error { return nil } -func (a *aggregator) gaugeWithTimestamp(name string, value float64, tags []string, timestamp int64) error { - context := getContext(name, tags) - a.gaugesM.Lock() - a.gauges[context] = newGaugeMetric(name, value, tags, timestamp) - a.gaugesM.Unlock() - return nil -} - func (a *aggregator) set(name string, value string, tags []string) error { context := getContext(name, tags) a.setsM.RLock() diff --git a/statsd/aggregator_test.go b/statsd/aggregator_test.go index b292d17e..c2e92c44 100644 --- a/statsd/aggregator_test.go +++ b/statsd/aggregator_test.go @@ -5,7 +5,6 @@ import ( "strings" "sync" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -47,36 +46,6 @@ func TestAggregatorSample(t *testing.T) { } } -func TestAggregatorWithTimedSamples(t *testing.T) { - a := newAggregator(nil) - - tags := []string{"tag1", "tag2"} - - for i := 0; i < 2; i++ { - a.gauge("gaugeTest", 21, tags) - assert.Len(t, a.gauges, 1) - assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2") - - for j := 0; j < 10; j++ { - // this one should only update the existing one, no new entry in a.gauges - a.gaugeWithTimestamp("gaugeTest", 21, tags, time.Now().Unix()) - assert.Len(t, a.gauges, 1) - assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2") - } - - a.count("countTest", 21, tags) - assert.Len(t, a.counts, 1) - assert.Contains(t, a.counts, "countTest:tag1,tag2") - - for j := 0; j < 10; j++ { - // this one should only override the existing one, no new entry in a.counts - a.countWithTimestamp("countTest", 21, tags, time.Now().Unix()) - assert.Len(t, a.counts, 1) - assert.Contains(t, a.counts, "countTest:tag1,tag2") - } - } -} - func TestAggregatorFlush(t *testing.T) { a := newAggregator(nil) @@ -226,78 +195,6 @@ func TestAggregatorFlush(t *testing.T) { metrics) } -func TestAggregatorFlushWithTimedSamplesMixed(t *testing.T) { - a := newAggregator(nil) - - tags := []string{"tag1", "tag2"} - - veryOld := time.Now().Add(-24 * time.Hour) - old := time.Now().Add(-6 * time.Hour) - - a.gauge("gaugeTest1", 21, tags) - a.gaugeWithTimestamp("gaugeTest2", 10, tags, veryOld.Unix()) - a.gaugeWithTimestamp("gaugeTest2", 15, tags, old.Unix()) - - a.count("countTest1", 44, tags) - a.countWithTimestamp("countTest2", 23, tags, veryOld.Unix()) - a.countWithTimestamp("countTest2", 25, tags, old.Unix()) - - metrics := a.flushMetrics() - - assert.Len(t, a.gauges, 0) - assert.Len(t, a.counts, 0) - - assert.Len(t, metrics, 4) - - sort.Slice(metrics, func(i, j int) bool { - if metrics[i].metricType == metrics[j].metricType { - res := strings.Compare(metrics[i].name, metrics[j].name) - // this happens fo set - if res == 0 { - return strings.Compare(metrics[i].svalue, metrics[j].svalue) != 1 - } - return res != 1 - } - return metrics[i].metricType < metrics[j].metricType - }) - - assert.Equal(t, []metric{ - metric{ - metricType: gauge, - name: "gaugeTest1", - tags: tags, - rate: 1, - fvalue: float64(21), - timestamp: 0, - }, - metric{ - metricType: gauge, - name: "gaugeTest2", - tags: tags, - rate: 1, - fvalue: float64(15), - timestamp: old.Unix(), - }, - metric{ - metricType: count, - name: "countTest1", - tags: tags, - rate: 1, - ivalue: 44, - timestamp: 0, - }, - metric{ - metricType: count, - name: "countTest2", - tags: tags, - rate: 1, - ivalue: 25, - timestamp: old.Unix(), - }, - }, - metrics) -} - func TestAggregatorFlushConcurrency(t *testing.T) { a := newAggregator(nil) diff --git a/statsd/statsd.go b/statsd/statsd.go index b807ffd4..fb85a19b 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -161,12 +161,16 @@ type ClientInterface interface { Gauge(name string, value float64, tags []string, rate float64) error // Gauge measures the value of a metric at a given time. + // Even with client side aggregation enabled, there is no aggregation done on a metric + // sent using GaugeWithTimestamp: it is written as is in the serialization buffer. GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error // Count tracks how many times something happened per second. Count(name string, value int64, tags []string, rate float64) error - // Count tracks how many times something happened at the given second. + // CountWithTimestamp tracks how many times something happened at the given second. + // Even with client side aggregation enabled, there is no aggregation done on a metric + // sent using CountWithTimestamp: it is written as is in the serialization buffer. CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error // Histogram tracks the statistical distribution of a set of values on each host. @@ -560,16 +564,14 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64) } // Gauge measures the value of a metric at a given time. +// Even with client side aggregation enabled, there is no aggregation done on a metric +// sent using GaugeWithTimestamp: it is written as is in the serialization buffer. func (c *Client) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error { if c == nil { return ErrNoClient } - ts := timestamp.Unix() atomic.AddUint64(&c.telemetry.totalMetricsGauge, 1) - if c.agg != nil { - return c.agg.gaugeWithTimestamp(name, value, tags, ts) - } - return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: ts}) + return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix()}) } // Count tracks how many times something happened per second. @@ -585,16 +587,14 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er } // Count tracks how many times something happened at the given second. +// Even with client side aggregation enabled, there is no aggregation done on a metric +// sent using CountWithTimestamp: it is written as is in the serialization buffer. func (c *Client) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error { if c == nil { return ErrNoClient } - ts := timestamp.Unix() atomic.AddUint64(&c.telemetry.totalMetricsCount, 1) - if c.agg != nil { - return c.agg.countWithTimestamp(name, value, tags, ts) - } - return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: ts}) + return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix()}) } // Histogram tracks the statistical distribution of a set of values on each host.