Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statsd: support sending metrics with timestamp. #262

Merged
merged 8 commits into from Oct 31, 2022
3 changes: 3 additions & 0 deletions statsd/aggregator.go
Expand Up @@ -14,6 +14,9 @@ type (
bufferedMetricMap map[string]*bufferedMetric
)

// noTimestamp is used as a value for metric without a given timestamp.
const noTimestamp = int64(0)

remeh marked this conversation as resolved.
Show resolved Hide resolved
type aggregator struct {
nbContextGauge uint64
nbContextCount uint64
Expand Down
10 changes: 8 additions & 2 deletions statsd/buffer.go
Expand Up @@ -39,22 +39,28 @@ func newStatsdBuffer(maxSize, maxElements int) *statsdBuffer {
}
}

func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error {
func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64, timestamp int64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendGauge(b.buffer, namespace, globalTags, name, value, tags, rate)
if timestamp != noTimestamp {
remeh marked this conversation as resolved.
Show resolved Hide resolved
b.buffer = appendTimestamp(b.buffer, timestamp)
}
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}

func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64) error {
func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64, timestamp int64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendCount(b.buffer, namespace, globalTags, name, value, tags, rate)
if timestamp != noTimestamp {
b.buffer = appendTimestamp(b.buffer, timestamp)
}
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}
Expand Down
2 changes: 1 addition & 1 deletion statsd/buffer_pool_test.go
Expand Up @@ -33,7 +33,7 @@ func TestBufferPoolEmpty(t *testing.T) {
func TestBufferReturn(t *testing.T) {
bufferPool := newBufferPool(1, 1024, 20)
buffer := bufferPool.borrowBuffer()
buffer.writeCount("", nil, "", 1, nil, 1)
buffer.writeCount("", nil, "", 1, nil, 1, noTimestamp)

assert.Equal(t, 0, len(bufferPool.pool))
bufferPool.returnBuffer(buffer)
Expand Down
35 changes: 24 additions & 11 deletions statsd/buffer_test.go
Expand Up @@ -8,7 +8,7 @@ import (

func TestBufferGauge(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|g|#tag:tag\n", string(buffer.bytes()))

Expand All @@ -17,14 +17,21 @@ func TestBufferGauge(t *testing.T) {
defer resetContainerID()

buffer = newStatsdBuffer(1024, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|g|#tag:tag|c:container-id\n", string(buffer.bytes()))

// with a timestamp
buffer = newStatsdBuffer(1024, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, 1658934092)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|g|#tag:tag|c:container-id|T1658934092\n", string(buffer.bytes()))

}

func TestBufferCount(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
err := buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|c|#tag:tag\n", string(buffer.bytes()))

Expand All @@ -33,9 +40,15 @@ func TestBufferCount(t *testing.T) {
defer resetContainerID()

buffer = newStatsdBuffer(1024, 1)
err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|c|#tag:tag|c:container-id\n", string(buffer.bytes()))

// with a timestamp
buffer = newStatsdBuffer(1024, 1)
err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, 1658934092)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|c|#tag:tag|c:container-id|T1658934092\n", string(buffer.bytes()))
}

func TestBufferHistogram(t *testing.T) {
Expand Down Expand Up @@ -135,18 +148,18 @@ func TestBufferServiceCheck(t *testing.T) {

func TestBufferFullSize(t *testing.T) {
buffer := newStatsdBuffer(30, 10)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Len(t, buffer.bytes(), 30)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Equal(t, errBufferFull, err)
}

func TestBufferSeparator(t *testing.T) {
buffer := newStatsdBuffer(1024, 10)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|g|#tag:tag\nnamespace.metric:1|g|#tag:tag\n", string(buffer.bytes()))
}
Expand Down Expand Up @@ -223,13 +236,13 @@ func TestBufferAggregated(t *testing.T) {
func TestBufferMaxElement(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)

err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)

err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Equal(t, errBufferFull, err)

err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Equal(t, errBufferFull, err)

err = buffer.writeHistogram("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
Expand Down
6 changes: 6 additions & 0 deletions statsd/format.go
Expand Up @@ -270,3 +270,9 @@ func appendContainerID(buffer []byte) []byte {
}
return buffer
}

func appendTimestamp(buffer []byte, timestamp int64) []byte {
buffer = append(buffer, "|T"...)
buffer = strconv.AppendInt(buffer, timestamp, 10)
return buffer
}
14 changes: 14 additions & 0 deletions statsd/metrics_test.go
Expand Up @@ -17,6 +17,13 @@ func TestNewCountMetric(t *testing.T) {
assert.Equal(t, c.tags, []string{"tag1", "tag2"})
}

func TestNewCountMetricWithTimestamp(t *testing.T) {
c := newCountMetric("test", 21, []string{"tag1", "tag2"})
assert.Equal(t, c.value, int64(21))
assert.Equal(t, c.name, "test")
assert.Equal(t, c.tags, []string{"tag1", "tag2"})
}

func TestCountMetricSample(t *testing.T) {
c := newCountMetric("test", 21, []string{"tag1", "tag2"})
c.sample(12)
Expand Down Expand Up @@ -56,6 +63,13 @@ func TestGaugeMetricSample(t *testing.T) {
assert.Equal(t, g.tags, []string{"tag1", "tag2"})
}

func TestNewGaugeMetricWithTimestamp(t *testing.T) {
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"})
assert.Equal(t, math.Float64frombits(g.value), float64(21))
assert.Equal(t, g.name, "test")
assert.Equal(t, g.tags, []string{"tag1", "tag2"})
}

func TestFlushUnsafeGaugeMetricSample(t *testing.T) {
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"})
m := g.flushUnsafe()
Expand Down
28 changes: 28 additions & 0 deletions statsd/mocks/statsd.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions statsd/noop.go
Expand Up @@ -11,11 +11,21 @@ func (n *NoOpClient) Gauge(name string, value float64, tags []string, rate float
return nil
}

// GaugeWithTimestamp does nothing and returns nil
func (n *NoOpClient) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error {
return nil
}

// Count does nothing and returns nil
func (n *NoOpClient) Count(name string, value int64, tags []string, rate float64) error {
return nil
}

// CountWithTimestamp does nothing and returns nil
func (n *NoOpClient) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error {
return nil
}

// Histogram does nothing and returns nil
func (n *NoOpClient) Histogram(name string, value float64, tags []string, rate float64) error {
return nil
Expand Down
2 changes: 2 additions & 0 deletions statsd/noop_test.go
Expand Up @@ -13,7 +13,9 @@ func TestNoOpClient(t *testing.T) {
tags := []string{"a:b"}

a.Nil(c.Gauge("asd", 123.4, tags, 56.0))
a.Nil(c.GaugeWithTimestamp("asd", 123.4, tags, 56.0, time.Now()))
a.Nil(c.Count("asd", 1234, tags, 56.0))
a.Nil(c.CountWithTimestamp("asd", 123, tags, 56.0, time.Now()))
a.Nil(c.Histogram("asd", 12.34, tags, 56.0))
a.Nil(c.Distribution("asd", 1.234, tags, 56.0))
a.Nil(c.Decr("asd", tags, 56.0))
Expand Down
33 changes: 33 additions & 0 deletions statsd/statsd.go
Expand Up @@ -140,6 +140,7 @@ type metric struct {
tags []string
stags string
rate float64
timestamp int64
}

type noClientErr string
Expand All @@ -159,9 +160,19 @@ type ClientInterface interface {
// Gauge measures the value of a metric at a particular time.
Gauge(name string, value float64, tags []string, rate float64) error

// Gauge measures the value of a metric at a given time.
// Even with client side aggregation enabled, there is no aggregation done on a metric
// sent using GaugeWithTimestamp: it is written as is in the serialization buffer.
remeh marked this conversation as resolved.
Show resolved Hide resolved
GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only gauge and count ? rate/set doesn't make send but we might want to send timing, histogram and distribution with timestamp too, no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, timed metrics ingestion will only be supported for Gauge and Count (DogStatsD server side and backend side).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should explain that somewhere, maybe the README or the Go doc string.

// Count tracks how many times something happened per second.
Count(name string, value int64, tags []string, rate float64) error

// CountWithTimestamp tracks how many times something happened at the given second.
// Even with client side aggregation enabled, there is no aggregation done on a metric
// sent using CountWithTimestamp: it is written as is in the serialization buffer.
remeh marked this conversation as resolved.
Show resolved Hide resolved
CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error

// Histogram tracks the statistical distribution of a set of values on each host.
Histogram(name string, value float64, tags []string, rate float64) error

Expand Down Expand Up @@ -552,6 +563,17 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64)
return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace})
}

// Gauge measures the value of a metric at a given time.
remeh marked this conversation as resolved.
Show resolved Hide resolved
// Even with client side aggregation enabled, there is no aggregation done on a metric
// sent using GaugeWithTimestamp: it is written as is in the serialization buffer.
func (c *Client) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error {
hush-hush marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return an error if timestamp is 0 (same for count).

Copy link
Contributor Author

@remeh remeh Jul 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, that's not an invalid timestamp. A weird one, but not an invalid one. For instance, 1 is also a weird one, you see what I mean?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's an error since go has default value. This means an non-initialized timestamp.
Sending a time.Time at 0 will produce a Unix timestamp of -62135596800.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A gotcha... Also linked to your other question about time.Time vs uint64...
You are right I'll add a test if timestamp.Unix() < 0, WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand why 0 (jan 1970) is the limit here.

0 is a valid timestamp that can be represented, separate from the go zero-value.
There is a t.IsZero() that differentiates between the two:
https://go.dev/play/p/njBnvlej3Rf

Dogstatsd parsing correctly handles negative timestamps. src

I think the limiting factor here is the intake according to this comment:

    // timestamp for this value in seconds since the UNIX epoch

So my suggestion here is to add a comment about why this is the minimum supported epoch.

BTW - I noticed that the public metrics api says:

Note: Metric timestamps cannot be more than ten minutes in the future or more than one hour in the past.

I assume this doesn't apply to us? Or should we be enforcing that condition here by rejecting timestamps older than 1hr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.. We have to draw a limit somewhere in the clients and since the intake has a "minimum timestamp" limit and we already have other places limiting to the unix epoch in the Agent, I think having int(0) as the minimal possible value is speaking to everyone in a client. Let me know if you think I'm wrong, really open to discussion with these topics.

I assume this doesn't apply to us?

It doesn't indeed, a future PR will update the public documentation, with things more related about the "features" than the technical chunks in the client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand why 0 (jan 1970) is the limit here.

The idea is to provide a hard-coded limit instead of no limit, even if it has no actual feature meaning here.
I added a mention in the comment to redirect to the feature documentation.

Please report to the Datadog documentation for the maximum age of a metric.

if c == nil {
return ErrNoClient
}
atomic.AddUint64(&c.telemetry.totalMetricsGauge, 1)
hush-hush marked this conversation as resolved.
Show resolved Hide resolved
return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix()})
}

// Count tracks how many times something happened per second.
func (c *Client) Count(name string, value int64, tags []string, rate float64) error {
if c == nil {
Expand All @@ -564,6 +586,17 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er
return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace})
}

// Count tracks how many times something happened at the given second.
remeh marked this conversation as resolved.
Show resolved Hide resolved
// Even with client side aggregation enabled, there is no aggregation done on a metric
// sent using CountWithTimestamp: it is written as is in the serialization buffer.
func (c *Client) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error {
if c == nil {
return ErrNoClient
}
atomic.AddUint64(&c.telemetry.totalMetricsCount, 1)
return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix()})
}

// Histogram tracks the statistical distribution of a set of values on each host.
func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error {
if c == nil {
Expand Down
4 changes: 2 additions & 2 deletions statsd/worker.go
Expand Up @@ -100,9 +100,9 @@ func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte, prec
func (w *worker) writeMetricUnsafe(m metric) error {
switch m.metricType {
case gauge:
return w.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
return w.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate, m.timestamp)
case count:
return w.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate)
return w.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate, m.timestamp)
case histogram:
return w.buffer.writeHistogram(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
case distribution:
Expand Down
33 changes: 33 additions & 0 deletions statsd/worker_test.go
Expand Up @@ -78,6 +78,23 @@ func TestWorkerGauge(t *testing.T) {
)
}

func TestWorkerGaugeWithTimestamp(t *testing.T) {
testWorker(
t,
metric{
metricType: gauge,
namespace: "namespace.",
globalTags: []string{"globalTags", "globalTags2"},
name: "test_gauge",
fvalue: 21,
tags: []string{"tag1", "tag2"},
rate: 1,
timestamp: 1658997712,
},
"namespace.test_gauge:21|g|#globalTags,globalTags2,tag1,tag2|T1658997712\n",
)
}

func TestWorkerCount(t *testing.T) {
testWorker(
t,
Expand All @@ -94,6 +111,22 @@ func TestWorkerCount(t *testing.T) {
)
}

func TestWorkerCountWithTimestamp(t *testing.T) {
testWorker(
t,
metric{
metricType: count,
namespace: "namespace.",
globalTags: []string{"globalTags", "globalTags2"},
name: "test_count",
ivalue: 21,
tags: []string{"tag1", "tag2"},
rate: 1,
timestamp: 1658997712,
},
"namespace.test_count:21|c|#globalTags,globalTags2,tag1,tag2|T1658997712\n",
)
}
func TestWorkerHistogram(t *testing.T) {
testWorker(
t,
Expand Down