From 597553ec2e7e11d8e497373f3455d2d56d95a8cc Mon Sep 17 00:00:00 2001 From: Remy Mathieu Date: Wed, 27 Jul 2022 16:59:09 +0200 Subject: [PATCH] statsd: support sending metrics with timestamp. Adds two methods in the `ClientInterface` to support sending gauge and count with timestamp `GaugeWithTimestamp` and `CountWithTimestamp`, for users aggregating metrics on their side and who can provide a timestamp for a given metric. --- statsd/aggregator.go | 23 ++++++++- statsd/aggregator_test.go | 103 +++++++++++++++++++++++++++++++++++++ statsd/buffer.go | 10 +++- statsd/buffer_pool_test.go | 2 +- statsd/buffer_test.go | 35 +++++++++---- statsd/format.go | 6 +++ statsd/metrics.go | 34 +++++++----- statsd/metrics_test.go | 36 ++++++++++--- statsd/noop.go | 10 ++++ statsd/noop_test.go | 2 + statsd/statsd.go | 33 ++++++++++++ statsd/worker.go | 4 +- 12 files changed, 260 insertions(+), 38 deletions(-) diff --git a/statsd/aggregator.go b/statsd/aggregator.go index 65c050ed..fd2460a7 100644 --- a/statsd/aggregator.go +++ b/statsd/aggregator.go @@ -14,6 +14,9 @@ type ( bufferedMetricMap map[string]*bufferedMetric ) +// noTimestamp is used as a value for metric without a given timestamp. +const noTimestamp = int64(0) + type aggregator struct { nbContextGauge uint64 nbContextCount uint64 @@ -218,7 +221,15 @@ func (a *aggregator) count(name string, value int64, tags []string) error { return nil } - a.counts[context] = newCountMetric(name, value, tags) + a.counts[context] = newCountMetric(name, value, tags, noTimestamp) + a.countsM.Unlock() + return nil +} + +func (a *aggregator) countWithTimestamp(name string, value int64, tags []string, timestamp int64) error { + context := getContext(name, tags) + a.countsM.Lock() + a.counts[context] = newCountMetric(name, value, tags, timestamp) a.countsM.Unlock() return nil } @@ -233,7 +244,7 @@ func (a *aggregator) gauge(name string, value float64, tags []string) error { } a.gaugesM.RUnlock() - gauge := newGaugeMetric(name, value, tags) + gauge := newGaugeMetric(name, value, tags, noTimestamp) a.gaugesM.Lock() // Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock' @@ -247,6 +258,14 @@ func (a *aggregator) gauge(name string, value float64, tags []string) error { return nil } +func (a *aggregator) gaugeWithTimestamp(name string, value float64, tags []string, timestamp int64) error { + context := getContext(name, tags) + a.gaugesM.Lock() + a.gauges[context] = newGaugeMetric(name, value, tags, timestamp) + a.gaugesM.Unlock() + return nil +} + func (a *aggregator) set(name string, value string, tags []string) error { context := getContext(name, tags) a.setsM.RLock() diff --git a/statsd/aggregator_test.go b/statsd/aggregator_test.go index c2e92c44..b292d17e 100644 --- a/statsd/aggregator_test.go +++ b/statsd/aggregator_test.go @@ -5,6 +5,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -46,6 +47,36 @@ func TestAggregatorSample(t *testing.T) { } } +func TestAggregatorWithTimedSamples(t *testing.T) { + a := newAggregator(nil) + + tags := []string{"tag1", "tag2"} + + for i := 0; i < 2; i++ { + a.gauge("gaugeTest", 21, tags) + assert.Len(t, a.gauges, 1) + assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2") + + for j := 0; j < 10; j++ { + // this one should only update the existing one, no new entry in a.gauges + a.gaugeWithTimestamp("gaugeTest", 21, tags, time.Now().Unix()) + assert.Len(t, a.gauges, 1) + assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2") + } + + a.count("countTest", 21, tags) + assert.Len(t, a.counts, 1) + assert.Contains(t, a.counts, "countTest:tag1,tag2") + + for j := 0; j < 10; j++ { + // this one should only override the existing one, no new entry in a.counts + a.countWithTimestamp("countTest", 21, tags, time.Now().Unix()) + assert.Len(t, a.counts, 1) + assert.Contains(t, a.counts, "countTest:tag1,tag2") + } + } +} + func TestAggregatorFlush(t *testing.T) { a := newAggregator(nil) @@ -195,6 +226,78 @@ func TestAggregatorFlush(t *testing.T) { metrics) } +func TestAggregatorFlushWithTimedSamplesMixed(t *testing.T) { + a := newAggregator(nil) + + tags := []string{"tag1", "tag2"} + + veryOld := time.Now().Add(-24 * time.Hour) + old := time.Now().Add(-6 * time.Hour) + + a.gauge("gaugeTest1", 21, tags) + a.gaugeWithTimestamp("gaugeTest2", 10, tags, veryOld.Unix()) + a.gaugeWithTimestamp("gaugeTest2", 15, tags, old.Unix()) + + a.count("countTest1", 44, tags) + a.countWithTimestamp("countTest2", 23, tags, veryOld.Unix()) + a.countWithTimestamp("countTest2", 25, tags, old.Unix()) + + metrics := a.flushMetrics() + + assert.Len(t, a.gauges, 0) + assert.Len(t, a.counts, 0) + + assert.Len(t, metrics, 4) + + sort.Slice(metrics, func(i, j int) bool { + if metrics[i].metricType == metrics[j].metricType { + res := strings.Compare(metrics[i].name, metrics[j].name) + // this happens fo set + if res == 0 { + return strings.Compare(metrics[i].svalue, metrics[j].svalue) != 1 + } + return res != 1 + } + return metrics[i].metricType < metrics[j].metricType + }) + + assert.Equal(t, []metric{ + metric{ + metricType: gauge, + name: "gaugeTest1", + tags: tags, + rate: 1, + fvalue: float64(21), + timestamp: 0, + }, + metric{ + metricType: gauge, + name: "gaugeTest2", + tags: tags, + rate: 1, + fvalue: float64(15), + timestamp: old.Unix(), + }, + metric{ + metricType: count, + name: "countTest1", + tags: tags, + rate: 1, + ivalue: 44, + timestamp: 0, + }, + metric{ + metricType: count, + name: "countTest2", + tags: tags, + rate: 1, + ivalue: 25, + timestamp: old.Unix(), + }, + }, + metrics) +} + func TestAggregatorFlushConcurrency(t *testing.T) { a := newAggregator(nil) diff --git a/statsd/buffer.go b/statsd/buffer.go index 0e4ea2b2..9a01fbb3 100644 --- a/statsd/buffer.go +++ b/statsd/buffer.go @@ -39,22 +39,28 @@ func newStatsdBuffer(maxSize, maxElements int) *statsdBuffer { } } -func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error { +func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64, timestamp int64) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer b.buffer = appendGauge(b.buffer, namespace, globalTags, name, value, tags, rate) + if timestamp != noTimestamp { + b.buffer = appendTimestamp(b.buffer, timestamp) + } b.writeSeparator() return b.validateNewElement(originalBuffer) } -func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64) error { +func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64, timestamp int64) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer b.buffer = appendCount(b.buffer, namespace, globalTags, name, value, tags, rate) + if timestamp != noTimestamp { + b.buffer = appendTimestamp(b.buffer, timestamp) + } b.writeSeparator() return b.validateNewElement(originalBuffer) } diff --git a/statsd/buffer_pool_test.go b/statsd/buffer_pool_test.go index f150af75..ab2c614b 100644 --- a/statsd/buffer_pool_test.go +++ b/statsd/buffer_pool_test.go @@ -33,7 +33,7 @@ func TestBufferPoolEmpty(t *testing.T) { func TestBufferReturn(t *testing.T) { bufferPool := newBufferPool(1, 1024, 20) buffer := bufferPool.borrowBuffer() - buffer.writeCount("", nil, "", 1, nil, 1) + buffer.writeCount("", nil, "", 1, nil, 1, noTimestamp) assert.Equal(t, 0, len(bufferPool.pool)) bufferPool.returnBuffer(buffer) diff --git a/statsd/buffer_test.go b/statsd/buffer_test.go index 9096407a..83d147e0 100644 --- a/statsd/buffer_test.go +++ b/statsd/buffer_test.go @@ -8,7 +8,7 @@ import ( func TestBufferGauge(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|g|#tag:tag\n", string(buffer.bytes())) @@ -17,14 +17,21 @@ func TestBufferGauge(t *testing.T) { defer resetContainerID() buffer = newStatsdBuffer(1024, 1) - err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|g|#tag:tag|c:container-id\n", string(buffer.bytes())) + + // with a timestamp + buffer = newStatsdBuffer(1024, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, 1658934092) + assert.Nil(t, err) + assert.Equal(t, "namespace.metric:1|g|#tag:tag|c:container-id|T1658934092\n", string(buffer.bytes())) + } func TestBufferCount(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err := buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|c|#tag:tag\n", string(buffer.bytes())) @@ -33,9 +40,15 @@ func TestBufferCount(t *testing.T) { defer resetContainerID() buffer = newStatsdBuffer(1024, 1) - err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|c|#tag:tag|c:container-id\n", string(buffer.bytes())) + + // with a timestamp + buffer = newStatsdBuffer(1024, 1) + err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, 1658934092) + assert.Nil(t, err) + assert.Equal(t, "namespace.metric:1|c|#tag:tag|c:container-id|T1658934092\n", string(buffer.bytes())) } func TestBufferHistogram(t *testing.T) { @@ -135,18 +148,18 @@ func TestBufferServiceCheck(t *testing.T) { func TestBufferFullSize(t *testing.T) { buffer := newStatsdBuffer(30, 10) - err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Len(t, buffer.bytes(), 30) - err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Equal(t, errBufferFull, err) } func TestBufferSeparator(t *testing.T) { buffer := newStatsdBuffer(1024, 10) - err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) - err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|g|#tag:tag\nnamespace.metric:1|g|#tag:tag\n", string(buffer.bytes())) } @@ -223,13 +236,13 @@ func TestBufferAggregated(t *testing.T) { func TestBufferMaxElement(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) - err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Equal(t, errBufferFull, err) - err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Equal(t, errBufferFull, err) err = buffer.writeHistogram("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) diff --git a/statsd/format.go b/statsd/format.go index 6e05ad56..43ea2730 100644 --- a/statsd/format.go +++ b/statsd/format.go @@ -270,3 +270,9 @@ func appendContainerID(buffer []byte) []byte { } return buffer } + +func appendTimestamp(buffer []byte, timestamp int64) []byte { + buffer = append(buffer, "|T"...) + buffer = strconv.AppendInt(buffer, timestamp, 10) + return buffer +} diff --git a/statsd/metrics.go b/statsd/metrics.go index 82f11ac1..46b99968 100644 --- a/statsd/metrics.go +++ b/statsd/metrics.go @@ -14,16 +14,18 @@ Those are metrics type that can be aggregated on the client side: */ type countMetric struct { - value int64 - name string - tags []string + value int64 + timestamp int64 + name string + tags []string } -func newCountMetric(name string, value int64, tags []string) *countMetric { +func newCountMetric(name string, value int64, tags []string, timestamp int64) *countMetric { return &countMetric{ - value: value, - name: name, - tags: copySlice(tags), + value: value, + timestamp: timestamp, + name: name, + tags: copySlice(tags), } } @@ -38,22 +40,25 @@ func (c *countMetric) flushUnsafe() metric { tags: c.tags, rate: 1, ivalue: c.value, + timestamp: c.timestamp, } } // Gauge type gaugeMetric struct { - value uint64 - name string - tags []string + value uint64 + timestamp int64 + name string + tags []string } -func newGaugeMetric(name string, value float64, tags []string) *gaugeMetric { +func newGaugeMetric(name string, value float64, tags []string, timestamp int64) *gaugeMetric { return &gaugeMetric{ - value: math.Float64bits(value), - name: name, - tags: copySlice(tags), + value: math.Float64bits(value), + timestamp: timestamp, + name: name, + tags: copySlice(tags), } } @@ -68,6 +73,7 @@ func (g *gaugeMetric) flushUnsafe() metric { tags: g.tags, rate: 1, fvalue: math.Float64frombits(g.value), + timestamp: g.timestamp, } } diff --git a/statsd/metrics_test.go b/statsd/metrics_test.go index 41bb743d..281ff2e2 100644 --- a/statsd/metrics_test.go +++ b/statsd/metrics_test.go @@ -11,27 +11,38 @@ import ( ) func TestNewCountMetric(t *testing.T) { - c := newCountMetric("test", 21, []string{"tag1", "tag2"}) + c := newCountMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) assert.Equal(t, c.value, int64(21)) assert.Equal(t, c.name, "test") assert.Equal(t, c.tags, []string{"tag1", "tag2"}) + assert.Equal(t, c.timestamp, noTimestamp) +} + +func TestNewCountMetricWithTimestamp(t *testing.T) { + c := newCountMetric("test", 21, []string{"tag1", "tag2"}, 1658934956) + assert.Equal(t, c.value, int64(21)) + assert.Equal(t, c.name, "test") + assert.Equal(t, c.tags, []string{"tag1", "tag2"}) + assert.Equal(t, c.timestamp, int64(1658934956)) } func TestCountMetricSample(t *testing.T) { - c := newCountMetric("test", 21, []string{"tag1", "tag2"}) + c := newCountMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) c.sample(12) assert.Equal(t, c.value, int64(33)) assert.Equal(t, c.name, "test") assert.Equal(t, c.tags, []string{"tag1", "tag2"}) + assert.Equal(t, c.timestamp, noTimestamp) } func TestFlushUnsafeCountMetricSample(t *testing.T) { - c := newCountMetric("test", 21, []string{"tag1", "tag2"}) + c := newCountMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) m := c.flushUnsafe() assert.Equal(t, m.metricType, count) assert.Equal(t, m.ivalue, int64(21)) assert.Equal(t, m.name, "test") assert.Equal(t, m.tags, []string{"tag1", "tag2"}) + assert.Equal(t, m.timestamp, noTimestamp) c.sample(12) m = c.flushUnsafe() @@ -39,30 +50,42 @@ func TestFlushUnsafeCountMetricSample(t *testing.T) { assert.Equal(t, m.ivalue, int64(33)) assert.Equal(t, m.name, "test") assert.Equal(t, m.tags, []string{"tag1", "tag2"}) + assert.Equal(t, m.timestamp, noTimestamp) } func TestNewGaugeMetric(t *testing.T) { - g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}) + g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) assert.Equal(t, math.Float64frombits(g.value), float64(21)) assert.Equal(t, g.name, "test") assert.Equal(t, g.tags, []string{"tag1", "tag2"}) + assert.Equal(t, g.timestamp, noTimestamp) } func TestGaugeMetricSample(t *testing.T) { - g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}) + g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) g.sample(12) assert.Equal(t, math.Float64frombits(g.value), float64(12)) assert.Equal(t, g.name, "test") assert.Equal(t, g.tags, []string{"tag1", "tag2"}) + assert.Equal(t, g.timestamp, noTimestamp) +} + +func TestNewGaugeMetricWithTimestamp(t *testing.T) { + g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, 1658934956) + assert.Equal(t, math.Float64frombits(g.value), float64(21)) + assert.Equal(t, g.name, "test") + assert.Equal(t, g.tags, []string{"tag1", "tag2"}) + assert.Equal(t, g.timestamp, int64(1658934956)) } func TestFlushUnsafeGaugeMetricSample(t *testing.T) { - g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}) + g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) m := g.flushUnsafe() assert.Equal(t, m.metricType, gauge) assert.Equal(t, m.fvalue, float64(21)) assert.Equal(t, m.name, "test") assert.Equal(t, m.tags, []string{"tag1", "tag2"}) + assert.Equal(t, m.timestamp, noTimestamp) g.sample(12) m = g.flushUnsafe() @@ -70,6 +93,7 @@ func TestFlushUnsafeGaugeMetricSample(t *testing.T) { assert.Equal(t, m.fvalue, float64(12)) assert.Equal(t, m.name, "test") assert.Equal(t, m.tags, []string{"tag1", "tag2"}) + assert.Equal(t, m.timestamp, noTimestamp) } func TestNewSetMetric(t *testing.T) { diff --git a/statsd/noop.go b/statsd/noop.go index 5c093988..e92744f4 100644 --- a/statsd/noop.go +++ b/statsd/noop.go @@ -11,11 +11,21 @@ func (n *NoOpClient) Gauge(name string, value float64, tags []string, rate float return nil } +// GaugeWithTimestamp does nothing and returns nil +func (n *NoOpClient) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error { + return nil +} + // Count does nothing and returns nil func (n *NoOpClient) Count(name string, value int64, tags []string, rate float64) error { return nil } +// CountWithTimestamp does nothing and returns nil +func (n *NoOpClient) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error { + return nil +} + // Histogram does nothing and returns nil func (n *NoOpClient) Histogram(name string, value float64, tags []string, rate float64) error { return nil diff --git a/statsd/noop_test.go b/statsd/noop_test.go index 2aaf0133..4df17019 100644 --- a/statsd/noop_test.go +++ b/statsd/noop_test.go @@ -13,7 +13,9 @@ func TestNoOpClient(t *testing.T) { tags := []string{"a:b"} a.Nil(c.Gauge("asd", 123.4, tags, 56.0)) + a.Nil(c.GaugeWithTimestamp("asd", 123.4, tags, 56.0, time.Now())) a.Nil(c.Count("asd", 1234, tags, 56.0)) + a.Nil(c.CountWithTimestamp("asd", 123, tags, 56.0, time.Now())) a.Nil(c.Histogram("asd", 12.34, tags, 56.0)) a.Nil(c.Distribution("asd", 1.234, tags, 56.0)) a.Nil(c.Decr("asd", tags, 56.0)) diff --git a/statsd/statsd.go b/statsd/statsd.go index b1bcce01..b807ffd4 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -140,6 +140,7 @@ type metric struct { tags []string stags string rate float64 + timestamp int64 } type noClientErr string @@ -159,9 +160,15 @@ type ClientInterface interface { // Gauge measures the value of a metric at a particular time. Gauge(name string, value float64, tags []string, rate float64) error + // Gauge measures the value of a metric at a given time. + GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error + // Count tracks how many times something happened per second. Count(name string, value int64, tags []string, rate float64) error + // Count tracks how many times something happened at the given second. + CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error + // Histogram tracks the statistical distribution of a set of values on each host. Histogram(name string, value float64, tags []string, rate float64) error @@ -552,6 +559,19 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64) return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) } +// Gauge measures the value of a metric at a given time. +func (c *Client) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error { + if c == nil { + return ErrNoClient + } + ts := timestamp.Unix() + atomic.AddUint64(&c.telemetry.totalMetricsGauge, 1) + if c.agg != nil { + return c.agg.gaugeWithTimestamp(name, value, tags, ts) + } + return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: ts}) +} + // Count tracks how many times something happened per second. func (c *Client) Count(name string, value int64, tags []string, rate float64) error { if c == nil { @@ -564,6 +584,19 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) } +// Count tracks how many times something happened at the given second. +func (c *Client) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error { + if c == nil { + return ErrNoClient + } + ts := timestamp.Unix() + atomic.AddUint64(&c.telemetry.totalMetricsCount, 1) + if c.agg != nil { + return c.agg.countWithTimestamp(name, value, tags, ts) + } + return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: ts}) +} + // Histogram tracks the statistical distribution of a set of values on each host. func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error { if c == nil { diff --git a/statsd/worker.go b/statsd/worker.go index 5446d506..952a9fe3 100644 --- a/statsd/worker.go +++ b/statsd/worker.go @@ -100,9 +100,9 @@ func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte, prec func (w *worker) writeMetricUnsafe(m metric) error { switch m.metricType { case gauge: - return w.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) + return w.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate, m.timestamp) case count: - return w.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate) + return w.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate, m.timestamp) case histogram: return w.buffer.writeHistogram(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) case distribution: