diff --git a/statsd/buffer.go b/statsd/buffer.go index 0e4ea2b2..f7bb8b0a 100644 --- a/statsd/buffer.go +++ b/statsd/buffer.go @@ -39,22 +39,24 @@ func newStatsdBuffer(maxSize, maxElements int) *statsdBuffer { } } -func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error { +func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64, timestamp int64) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer b.buffer = appendGauge(b.buffer, namespace, globalTags, name, value, tags, rate) + b.buffer = appendTimestamp(b.buffer, timestamp) b.writeSeparator() return b.validateNewElement(originalBuffer) } -func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64) error { +func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64, timestamp int64) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer b.buffer = appendCount(b.buffer, namespace, globalTags, name, value, tags, rate) + b.buffer = appendTimestamp(b.buffer, timestamp) b.writeSeparator() return b.validateNewElement(originalBuffer) } diff --git a/statsd/buffer_pool_test.go b/statsd/buffer_pool_test.go index f150af75..ab2c614b 100644 --- a/statsd/buffer_pool_test.go +++ b/statsd/buffer_pool_test.go @@ -33,7 +33,7 @@ func TestBufferPoolEmpty(t *testing.T) { func TestBufferReturn(t *testing.T) { bufferPool := newBufferPool(1, 1024, 20) buffer := bufferPool.borrowBuffer() - buffer.writeCount("", nil, "", 1, nil, 1) + buffer.writeCount("", nil, "", 1, nil, 1, noTimestamp) assert.Equal(t, 0, len(bufferPool.pool)) bufferPool.returnBuffer(buffer) diff --git a/statsd/buffer_test.go b/statsd/buffer_test.go index 9096407a..83d147e0 100644 --- a/statsd/buffer_test.go +++ b/statsd/buffer_test.go @@ -8,7 +8,7 @@ import ( func TestBufferGauge(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|g|#tag:tag\n", string(buffer.bytes())) @@ -17,14 +17,21 @@ func TestBufferGauge(t *testing.T) { defer resetContainerID() buffer = newStatsdBuffer(1024, 1) - err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|g|#tag:tag|c:container-id\n", string(buffer.bytes())) + + // with a timestamp + buffer = newStatsdBuffer(1024, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, 1658934092) + assert.Nil(t, err) + assert.Equal(t, "namespace.metric:1|g|#tag:tag|c:container-id|T1658934092\n", string(buffer.bytes())) + } func TestBufferCount(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err := buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|c|#tag:tag\n", string(buffer.bytes())) @@ -33,9 +40,15 @@ func TestBufferCount(t *testing.T) { defer resetContainerID() buffer = newStatsdBuffer(1024, 1) - err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|c|#tag:tag|c:container-id\n", string(buffer.bytes())) + + // with a timestamp + buffer = newStatsdBuffer(1024, 1) + err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, 1658934092) + assert.Nil(t, err) + assert.Equal(t, "namespace.metric:1|c|#tag:tag|c:container-id|T1658934092\n", string(buffer.bytes())) } func TestBufferHistogram(t *testing.T) { @@ -135,18 +148,18 @@ func TestBufferServiceCheck(t *testing.T) { func TestBufferFullSize(t *testing.T) { buffer := newStatsdBuffer(30, 10) - err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Len(t, buffer.bytes(), 30) - err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Equal(t, errBufferFull, err) } func TestBufferSeparator(t *testing.T) { buffer := newStatsdBuffer(1024, 10) - err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) - err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|g|#tag:tag\nnamespace.metric:1|g|#tag:tag\n", string(buffer.bytes())) } @@ -223,13 +236,13 @@ func TestBufferAggregated(t *testing.T) { func TestBufferMaxElement(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) - err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Equal(t, errBufferFull, err) - err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Equal(t, errBufferFull, err) err = buffer.writeHistogram("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) diff --git a/statsd/format.go b/statsd/format.go index 6e05ad56..f3ab9231 100644 --- a/statsd/format.go +++ b/statsd/format.go @@ -270,3 +270,11 @@ func appendContainerID(buffer []byte) []byte { } return buffer } + +func appendTimestamp(buffer []byte, timestamp int64) []byte { + if timestamp > noTimestamp { + buffer = append(buffer, "|T"...) + buffer = strconv.AppendInt(buffer, timestamp, 10) + } + return buffer +} diff --git a/statsd/metrics_test.go b/statsd/metrics_test.go index 41bb743d..93026d15 100644 --- a/statsd/metrics_test.go +++ b/statsd/metrics_test.go @@ -17,6 +17,13 @@ func TestNewCountMetric(t *testing.T) { assert.Equal(t, c.tags, []string{"tag1", "tag2"}) } +func TestNewCountMetricWithTimestamp(t *testing.T) { + c := newCountMetric("test", 21, []string{"tag1", "tag2"}) + assert.Equal(t, c.value, int64(21)) + assert.Equal(t, c.name, "test") + assert.Equal(t, c.tags, []string{"tag1", "tag2"}) +} + func TestCountMetricSample(t *testing.T) { c := newCountMetric("test", 21, []string{"tag1", "tag2"}) c.sample(12) @@ -56,6 +63,13 @@ func TestGaugeMetricSample(t *testing.T) { assert.Equal(t, g.tags, []string{"tag1", "tag2"}) } +func TestNewGaugeMetricWithTimestamp(t *testing.T) { + g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}) + assert.Equal(t, math.Float64frombits(g.value), float64(21)) + assert.Equal(t, g.name, "test") + assert.Equal(t, g.tags, []string{"tag1", "tag2"}) +} + func TestFlushUnsafeGaugeMetricSample(t *testing.T) { g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}) m := g.flushUnsafe() diff --git a/statsd/mocks/statsd.go b/statsd/mocks/statsd.go index 80135c77..dd783df5 100644 --- a/statsd/mocks/statsd.go +++ b/statsd/mocks/statsd.go @@ -63,6 +63,20 @@ func (mr *MockClientInterfaceMockRecorder) Count(name, value, tags, rate interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Count", reflect.TypeOf((*MockClientInterface)(nil).Count), name, value, tags, rate) } +// CountWithTimestamp mocks base method. +func (m *MockClientInterface) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CountWithTimestamp", name, value, tags, rate, timestamp) + ret0, _ := ret[0].(error) + return ret0 +} + +// CountWithTimestamp indicates an expected call of CountWithTimestamp. +func (mr *MockClientInterfaceMockRecorder) CountWithTimestamp(name, value, tags, rate, timestamp interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountWithTimestamp", reflect.TypeOf((*MockClientInterface)(nil).CountWithTimestamp), name, value, tags, rate, timestamp) +} + // Decr mocks base method. func (m *MockClientInterface) Decr(name string, tags []string, rate float64) error { m.ctrl.T.Helper() @@ -133,6 +147,20 @@ func (mr *MockClientInterfaceMockRecorder) Gauge(name, value, tags, rate interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Gauge", reflect.TypeOf((*MockClientInterface)(nil).Gauge), name, value, tags, rate) } +// GaugeWithTimestamp mocks base method. +func (m *MockClientInterface) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GaugeWithTimestamp", name, value, tags, rate, timestamp) + ret0, _ := ret[0].(error) + return ret0 +} + +// GaugeWithTimestamp indicates an expected call of GaugeWithTimestamp. +func (mr *MockClientInterfaceMockRecorder) GaugeWithTimestamp(name, value, tags, rate, timestamp interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GaugeWithTimestamp", reflect.TypeOf((*MockClientInterface)(nil).GaugeWithTimestamp), name, value, tags, rate, timestamp) +} + // GetTelemetry mocks base method. func (m *MockClientInterface) GetTelemetry() statsd.Telemetry { m.ctrl.T.Helper() diff --git a/statsd/noop.go b/statsd/noop.go index 5c093988..e92744f4 100644 --- a/statsd/noop.go +++ b/statsd/noop.go @@ -11,11 +11,21 @@ func (n *NoOpClient) Gauge(name string, value float64, tags []string, rate float return nil } +// GaugeWithTimestamp does nothing and returns nil +func (n *NoOpClient) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error { + return nil +} + // Count does nothing and returns nil func (n *NoOpClient) Count(name string, value int64, tags []string, rate float64) error { return nil } +// CountWithTimestamp does nothing and returns nil +func (n *NoOpClient) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error { + return nil +} + // Histogram does nothing and returns nil func (n *NoOpClient) Histogram(name string, value float64, tags []string, rate float64) error { return nil diff --git a/statsd/noop_test.go b/statsd/noop_test.go index 2aaf0133..4df17019 100644 --- a/statsd/noop_test.go +++ b/statsd/noop_test.go @@ -13,7 +13,9 @@ func TestNoOpClient(t *testing.T) { tags := []string{"a:b"} a.Nil(c.Gauge("asd", 123.4, tags, 56.0)) + a.Nil(c.GaugeWithTimestamp("asd", 123.4, tags, 56.0, time.Now())) a.Nil(c.Count("asd", 1234, tags, 56.0)) + a.Nil(c.CountWithTimestamp("asd", 123, tags, 56.0, time.Now())) a.Nil(c.Histogram("asd", 12.34, tags, 56.0)) a.Nil(c.Distribution("asd", 1.234, tags, 56.0)) a.Nil(c.Decr("asd", tags, 56.0)) diff --git a/statsd/statsd.go b/statsd/statsd.go index b1bcce01..e6db34fa 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -126,6 +126,9 @@ const ( writerWindowsPipe string = "pipe" ) +// noTimestamp is used as a value for metric without a given timestamp. +const noTimestamp = int64(0) + type metric struct { metricType metricType namespace string @@ -140,6 +143,7 @@ type metric struct { tags []string stags string rate float64 + timestamp int64 } type noClientErr string @@ -152,6 +156,15 @@ func (e noClientErr) Error() string { return string(e) } +type invalidTimestampErr string + +// InvalidTimestamp is returned if a provided timestamp is invalid. +const InvalidTimestamp = invalidTimestampErr("invalid timestamp") + +func (e invalidTimestampErr) Error() string { + return string(e) +} + // ClientInterface is an interface that exposes the common client functions for the // purpose of being able to provide a no-op client or even mocking. This can aid // downstream users' with their testing. @@ -159,9 +172,25 @@ type ClientInterface interface { // Gauge measures the value of a metric at a particular time. Gauge(name string, value float64, tags []string, rate float64) error + // GaugeWithTimestamp measures the value of a metric at a given time. + // BETA - Please contact our support team for more information to use this feature: https://www.datadoghq.com/support/ + // The value will bypass any aggregation on the client side and agent side, this is + // useful when sending points in the past. + // + // Minimum Datadog Agent version: 7.40.0 + GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error + // Count tracks how many times something happened per second. Count(name string, value int64, tags []string, rate float64) error + // CountWithTimestamp tracks how many times something happened at the given second. + // BETA - Please contact our support team for more information to use this feature: https://www.datadoghq.com/support/ + // The value will bypass any aggregation on the client side and agent side, this is + // useful when sending points in the past. + // + // Minimum Datadog Agent version: 7.40.0 + CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error + // Histogram tracks the statistical distribution of a set of values on each host. Histogram(name string, value float64, tags []string, rate float64) error @@ -552,6 +581,25 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64) return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) } +// GaugeWithTimestamp measures the value of a metric at a given time. +// BETA - Please contact our support team for more information to use this feature: https://www.datadoghq.com/support/ +// The value will bypass any aggregation on the client side and agent side, this is +// useful when sending points in the past. +// +// Minimum Datadog Agent version: 7.40.0 +func (c *Client) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error { + if c == nil { + return ErrNoClient + } + + if timestamp.IsZero() || timestamp.Unix() <= noTimestamp { + return InvalidTimestamp + } + + atomic.AddUint64(&c.telemetry.totalMetricsGauge, 1) + return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix()}) +} + // Count tracks how many times something happened per second. func (c *Client) Count(name string, value int64, tags []string, rate float64) error { if c == nil { @@ -564,6 +612,25 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) } +// CountWithTimestamp tracks how many times something happened at the given second. +// BETA - Please contact our support team for more information to use this feature: https://www.datadoghq.com/support/ +// The value will bypass any aggregation on the client side and agent side, this is +// useful when sending points in the past. +// +// Minimum Datadog Agent version: 7.40.0 +func (c *Client) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error { + if c == nil { + return ErrNoClient + } + + if timestamp.IsZero() || timestamp.Unix() <= noTimestamp { + return InvalidTimestamp + } + + atomic.AddUint64(&c.telemetry.totalMetricsCount, 1) + return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix()}) +} + // Histogram tracks the statistical distribution of a set of values on each host. func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error { if c == nil { diff --git a/statsd/worker.go b/statsd/worker.go index 5446d506..952a9fe3 100644 --- a/statsd/worker.go +++ b/statsd/worker.go @@ -100,9 +100,9 @@ func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte, prec func (w *worker) writeMetricUnsafe(m metric) error { switch m.metricType { case gauge: - return w.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) + return w.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate, m.timestamp) case count: - return w.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate) + return w.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate, m.timestamp) case histogram: return w.buffer.writeHistogram(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) case distribution: diff --git a/statsd/worker_test.go b/statsd/worker_test.go index 67e7de5a..b402e2e1 100644 --- a/statsd/worker_test.go +++ b/statsd/worker_test.go @@ -78,6 +78,23 @@ func TestWorkerGauge(t *testing.T) { ) } +func TestWorkerGaugeWithTimestamp(t *testing.T) { + testWorker( + t, + metric{ + metricType: gauge, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_gauge", + fvalue: 21, + tags: []string{"tag1", "tag2"}, + rate: 1, + timestamp: 1658997712, + }, + "namespace.test_gauge:21|g|#globalTags,globalTags2,tag1,tag2|T1658997712\n", + ) +} + func TestWorkerCount(t *testing.T) { testWorker( t, @@ -94,6 +111,22 @@ func TestWorkerCount(t *testing.T) { ) } +func TestWorkerCountWithTimestamp(t *testing.T) { + testWorker( + t, + metric{ + metricType: count, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_count", + ivalue: 21, + tags: []string{"tag1", "tag2"}, + rate: 1, + timestamp: 1658997712, + }, + "namespace.test_count:21|c|#globalTags,globalTags2,tag1,tag2|T1658997712\n", + ) +} func TestWorkerHistogram(t *testing.T) { testWorker( t,