Skip to content

Commit

Permalink
Merge pull request #262 from DataDog/remeh/timed-sample
Browse files Browse the repository at this point in the history
statsd: support sending metrics with timestamp.
  • Loading branch information
remeh committed Oct 31, 2022
2 parents 553de96 + da9d9c6 commit 6391269
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 16 deletions.
6 changes: 4 additions & 2 deletions statsd/buffer.go
Expand Up @@ -39,22 +39,24 @@ func newStatsdBuffer(maxSize, maxElements int) *statsdBuffer {
}
}

func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error {
func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64, timestamp int64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendGauge(b.buffer, namespace, globalTags, name, value, tags, rate)
b.buffer = appendTimestamp(b.buffer, timestamp)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}

func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64) error {
func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64, timestamp int64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendCount(b.buffer, namespace, globalTags, name, value, tags, rate)
b.buffer = appendTimestamp(b.buffer, timestamp)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}
Expand Down
2 changes: 1 addition & 1 deletion statsd/buffer_pool_test.go
Expand Up @@ -33,7 +33,7 @@ func TestBufferPoolEmpty(t *testing.T) {
func TestBufferReturn(t *testing.T) {
bufferPool := newBufferPool(1, 1024, 20)
buffer := bufferPool.borrowBuffer()
buffer.writeCount("", nil, "", 1, nil, 1)
buffer.writeCount("", nil, "", 1, nil, 1, noTimestamp)

assert.Equal(t, 0, len(bufferPool.pool))
bufferPool.returnBuffer(buffer)
Expand Down
35 changes: 24 additions & 11 deletions statsd/buffer_test.go
Expand Up @@ -8,7 +8,7 @@ import (

func TestBufferGauge(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|g|#tag:tag\n", string(buffer.bytes()))

Expand All @@ -17,14 +17,21 @@ func TestBufferGauge(t *testing.T) {
defer resetContainerID()

buffer = newStatsdBuffer(1024, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|g|#tag:tag|c:container-id\n", string(buffer.bytes()))

// with a timestamp
buffer = newStatsdBuffer(1024, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, 1658934092)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|g|#tag:tag|c:container-id|T1658934092\n", string(buffer.bytes()))

}

func TestBufferCount(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
err := buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|c|#tag:tag\n", string(buffer.bytes()))

Expand All @@ -33,9 +40,15 @@ func TestBufferCount(t *testing.T) {
defer resetContainerID()

buffer = newStatsdBuffer(1024, 1)
err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|c|#tag:tag|c:container-id\n", string(buffer.bytes()))

// with a timestamp
buffer = newStatsdBuffer(1024, 1)
err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, 1658934092)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|c|#tag:tag|c:container-id|T1658934092\n", string(buffer.bytes()))
}

func TestBufferHistogram(t *testing.T) {
Expand Down Expand Up @@ -135,18 +148,18 @@ func TestBufferServiceCheck(t *testing.T) {

func TestBufferFullSize(t *testing.T) {
buffer := newStatsdBuffer(30, 10)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Len(t, buffer.bytes(), 30)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Equal(t, errBufferFull, err)
}

func TestBufferSeparator(t *testing.T) {
buffer := newStatsdBuffer(1024, 10)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|g|#tag:tag\nnamespace.metric:1|g|#tag:tag\n", string(buffer.bytes()))
}
Expand Down Expand Up @@ -223,13 +236,13 @@ func TestBufferAggregated(t *testing.T) {
func TestBufferMaxElement(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)

err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)

err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Equal(t, errBufferFull, err)

err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Equal(t, errBufferFull, err)

err = buffer.writeHistogram("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
Expand Down
8 changes: 8 additions & 0 deletions statsd/format.go
Expand Up @@ -270,3 +270,11 @@ func appendContainerID(buffer []byte) []byte {
}
return buffer
}

func appendTimestamp(buffer []byte, timestamp int64) []byte {
if timestamp > noTimestamp {
buffer = append(buffer, "|T"...)
buffer = strconv.AppendInt(buffer, timestamp, 10)
}
return buffer
}
14 changes: 14 additions & 0 deletions statsd/metrics_test.go
Expand Up @@ -17,6 +17,13 @@ func TestNewCountMetric(t *testing.T) {
assert.Equal(t, c.tags, []string{"tag1", "tag2"})
}

func TestNewCountMetricWithTimestamp(t *testing.T) {
c := newCountMetric("test", 21, []string{"tag1", "tag2"})
assert.Equal(t, c.value, int64(21))
assert.Equal(t, c.name, "test")
assert.Equal(t, c.tags, []string{"tag1", "tag2"})
}

func TestCountMetricSample(t *testing.T) {
c := newCountMetric("test", 21, []string{"tag1", "tag2"})
c.sample(12)
Expand Down Expand Up @@ -56,6 +63,13 @@ func TestGaugeMetricSample(t *testing.T) {
assert.Equal(t, g.tags, []string{"tag1", "tag2"})
}

func TestNewGaugeMetricWithTimestamp(t *testing.T) {
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"})
assert.Equal(t, math.Float64frombits(g.value), float64(21))
assert.Equal(t, g.name, "test")
assert.Equal(t, g.tags, []string{"tag1", "tag2"})
}

func TestFlushUnsafeGaugeMetricSample(t *testing.T) {
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"})
m := g.flushUnsafe()
Expand Down
28 changes: 28 additions & 0 deletions statsd/mocks/statsd.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions statsd/noop.go
Expand Up @@ -11,11 +11,21 @@ func (n *NoOpClient) Gauge(name string, value float64, tags []string, rate float
return nil
}

// GaugeWithTimestamp does nothing and returns nil
func (n *NoOpClient) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error {
return nil
}

// Count does nothing and returns nil
func (n *NoOpClient) Count(name string, value int64, tags []string, rate float64) error {
return nil
}

// CountWithTimestamp does nothing and returns nil
func (n *NoOpClient) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error {
return nil
}

// Histogram does nothing and returns nil
func (n *NoOpClient) Histogram(name string, value float64, tags []string, rate float64) error {
return nil
Expand Down
2 changes: 2 additions & 0 deletions statsd/noop_test.go
Expand Up @@ -13,7 +13,9 @@ func TestNoOpClient(t *testing.T) {
tags := []string{"a:b"}

a.Nil(c.Gauge("asd", 123.4, tags, 56.0))
a.Nil(c.GaugeWithTimestamp("asd", 123.4, tags, 56.0, time.Now()))
a.Nil(c.Count("asd", 1234, tags, 56.0))
a.Nil(c.CountWithTimestamp("asd", 123, tags, 56.0, time.Now()))
a.Nil(c.Histogram("asd", 12.34, tags, 56.0))
a.Nil(c.Distribution("asd", 1.234, tags, 56.0))
a.Nil(c.Decr("asd", tags, 56.0))
Expand Down
67 changes: 67 additions & 0 deletions statsd/statsd.go
Expand Up @@ -126,6 +126,9 @@ const (
writerWindowsPipe string = "pipe"
)

// noTimestamp is used as a value for metric without a given timestamp.
const noTimestamp = int64(0)

type metric struct {
metricType metricType
namespace string
Expand All @@ -140,6 +143,7 @@ type metric struct {
tags []string
stags string
rate float64
timestamp int64
}

type noClientErr string
Expand All @@ -152,16 +156,41 @@ func (e noClientErr) Error() string {
return string(e)
}

type invalidTimestampErr string

// InvalidTimestamp is returned if a provided timestamp is invalid.
const InvalidTimestamp = invalidTimestampErr("invalid timestamp")

func (e invalidTimestampErr) Error() string {
return string(e)
}

// ClientInterface is an interface that exposes the common client functions for the
// purpose of being able to provide a no-op client or even mocking. This can aid
// downstream users' with their testing.
type ClientInterface interface {
// Gauge measures the value of a metric at a particular time.
Gauge(name string, value float64, tags []string, rate float64) error

// GaugeWithTimestamp measures the value of a metric at a given time.
// BETA - Please contact our support team for more information to use this feature: https://www.datadoghq.com/support/
// The value will bypass any aggregation on the client side and agent side, this is
// useful when sending points in the past.
//
// Minimum Datadog Agent version: 7.40.0
GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error

// Count tracks how many times something happened per second.
Count(name string, value int64, tags []string, rate float64) error

// CountWithTimestamp tracks how many times something happened at the given second.
// BETA - Please contact our support team for more information to use this feature: https://www.datadoghq.com/support/
// The value will bypass any aggregation on the client side and agent side, this is
// useful when sending points in the past.
//
// Minimum Datadog Agent version: 7.40.0
CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error

// Histogram tracks the statistical distribution of a set of values on each host.
Histogram(name string, value float64, tags []string, rate float64) error

Expand Down Expand Up @@ -552,6 +581,25 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64)
return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace})
}

// GaugeWithTimestamp measures the value of a metric at a given time.
// BETA - Please contact our support team for more information to use this feature: https://www.datadoghq.com/support/
// The value will bypass any aggregation on the client side and agent side, this is
// useful when sending points in the past.
//
// Minimum Datadog Agent version: 7.40.0
func (c *Client) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error {
if c == nil {
return ErrNoClient
}

if timestamp.IsZero() || timestamp.Unix() <= noTimestamp {
return InvalidTimestamp
}

atomic.AddUint64(&c.telemetry.totalMetricsGauge, 1)
return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix()})
}

// Count tracks how many times something happened per second.
func (c *Client) Count(name string, value int64, tags []string, rate float64) error {
if c == nil {
Expand All @@ -564,6 +612,25 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er
return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace})
}

// CountWithTimestamp tracks how many times something happened at the given second.
// BETA - Please contact our support team for more information to use this feature: https://www.datadoghq.com/support/
// The value will bypass any aggregation on the client side and agent side, this is
// useful when sending points in the past.
//
// Minimum Datadog Agent version: 7.40.0
func (c *Client) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error {
if c == nil {
return ErrNoClient
}

if timestamp.IsZero() || timestamp.Unix() <= noTimestamp {
return InvalidTimestamp
}

atomic.AddUint64(&c.telemetry.totalMetricsCount, 1)
return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix()})
}

// Histogram tracks the statistical distribution of a set of values on each host.
func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error {
if c == nil {
Expand Down
4 changes: 2 additions & 2 deletions statsd/worker.go
Expand Up @@ -100,9 +100,9 @@ func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte, prec
func (w *worker) writeMetricUnsafe(m metric) error {
switch m.metricType {
case gauge:
return w.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
return w.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate, m.timestamp)
case count:
return w.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate)
return w.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate, m.timestamp)
case histogram:
return w.buffer.writeHistogram(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
case distribution:
Expand Down

0 comments on commit 6391269

Please sign in to comment.