From 97e39ef7f8d8741ccc3a3f5f0997579bfcbb101f Mon Sep 17 00:00:00 2001 From: Remy Mathieu Date: Wed, 27 Jul 2022 16:59:09 +0200 Subject: [PATCH 1/8] statsd: support sending metrics with timestamp. Adds two methods in the `ClientInterface` to support sending gauge and count with timestamp `GaugeWithTimestamp` and `CountWithTimestamp`, for users aggregating metrics on their side and who can provide a timestamp for a given metric. --- statsd/aggregator.go | 23 ++++++++- statsd/aggregator_test.go | 103 +++++++++++++++++++++++++++++++++++++ statsd/buffer.go | 10 +++- statsd/buffer_pool_test.go | 2 +- statsd/buffer_test.go | 35 +++++++++---- statsd/format.go | 6 +++ statsd/metrics.go | 34 +++++++----- statsd/metrics_test.go | 36 ++++++++++--- statsd/mocks/statsd.go | 28 ++++++++++ statsd/noop.go | 10 ++++ statsd/noop_test.go | 2 + statsd/statsd.go | 33 ++++++++++++ statsd/worker.go | 4 +- statsd/worker_test.go | 33 ++++++++++++ 14 files changed, 321 insertions(+), 38 deletions(-) diff --git a/statsd/aggregator.go b/statsd/aggregator.go index 65c050ed..fd2460a7 100644 --- a/statsd/aggregator.go +++ b/statsd/aggregator.go @@ -14,6 +14,9 @@ type ( bufferedMetricMap map[string]*bufferedMetric ) +// noTimestamp is used as a value for metric without a given timestamp. +const noTimestamp = int64(0) + type aggregator struct { nbContextGauge uint64 nbContextCount uint64 @@ -218,7 +221,15 @@ func (a *aggregator) count(name string, value int64, tags []string) error { return nil } - a.counts[context] = newCountMetric(name, value, tags) + a.counts[context] = newCountMetric(name, value, tags, noTimestamp) + a.countsM.Unlock() + return nil +} + +func (a *aggregator) countWithTimestamp(name string, value int64, tags []string, timestamp int64) error { + context := getContext(name, tags) + a.countsM.Lock() + a.counts[context] = newCountMetric(name, value, tags, timestamp) a.countsM.Unlock() return nil } @@ -233,7 +244,7 @@ func (a *aggregator) gauge(name string, value float64, tags []string) error { } a.gaugesM.RUnlock() - gauge := newGaugeMetric(name, value, tags) + gauge := newGaugeMetric(name, value, tags, noTimestamp) a.gaugesM.Lock() // Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock' @@ -247,6 +258,14 @@ func (a *aggregator) gauge(name string, value float64, tags []string) error { return nil } +func (a *aggregator) gaugeWithTimestamp(name string, value float64, tags []string, timestamp int64) error { + context := getContext(name, tags) + a.gaugesM.Lock() + a.gauges[context] = newGaugeMetric(name, value, tags, timestamp) + a.gaugesM.Unlock() + return nil +} + func (a *aggregator) set(name string, value string, tags []string) error { context := getContext(name, tags) a.setsM.RLock() diff --git a/statsd/aggregator_test.go b/statsd/aggregator_test.go index c2e92c44..b292d17e 100644 --- a/statsd/aggregator_test.go +++ b/statsd/aggregator_test.go @@ -5,6 +5,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -46,6 +47,36 @@ func TestAggregatorSample(t *testing.T) { } } +func TestAggregatorWithTimedSamples(t *testing.T) { + a := newAggregator(nil) + + tags := []string{"tag1", "tag2"} + + for i := 0; i < 2; i++ { + a.gauge("gaugeTest", 21, tags) + assert.Len(t, a.gauges, 1) + assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2") + + for j := 0; j < 10; j++ { + // this one should only update the existing one, no new entry in a.gauges + a.gaugeWithTimestamp("gaugeTest", 21, tags, time.Now().Unix()) + assert.Len(t, a.gauges, 1) + assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2") + } + + a.count("countTest", 21, tags) + assert.Len(t, a.counts, 1) + assert.Contains(t, a.counts, "countTest:tag1,tag2") + + for j := 0; j < 10; j++ { + // this one should only override the existing one, no new entry in a.counts + a.countWithTimestamp("countTest", 21, tags, time.Now().Unix()) + assert.Len(t, a.counts, 1) + assert.Contains(t, a.counts, "countTest:tag1,tag2") + } + } +} + func TestAggregatorFlush(t *testing.T) { a := newAggregator(nil) @@ -195,6 +226,78 @@ func TestAggregatorFlush(t *testing.T) { metrics) } +func TestAggregatorFlushWithTimedSamplesMixed(t *testing.T) { + a := newAggregator(nil) + + tags := []string{"tag1", "tag2"} + + veryOld := time.Now().Add(-24 * time.Hour) + old := time.Now().Add(-6 * time.Hour) + + a.gauge("gaugeTest1", 21, tags) + a.gaugeWithTimestamp("gaugeTest2", 10, tags, veryOld.Unix()) + a.gaugeWithTimestamp("gaugeTest2", 15, tags, old.Unix()) + + a.count("countTest1", 44, tags) + a.countWithTimestamp("countTest2", 23, tags, veryOld.Unix()) + a.countWithTimestamp("countTest2", 25, tags, old.Unix()) + + metrics := a.flushMetrics() + + assert.Len(t, a.gauges, 0) + assert.Len(t, a.counts, 0) + + assert.Len(t, metrics, 4) + + sort.Slice(metrics, func(i, j int) bool { + if metrics[i].metricType == metrics[j].metricType { + res := strings.Compare(metrics[i].name, metrics[j].name) + // this happens fo set + if res == 0 { + return strings.Compare(metrics[i].svalue, metrics[j].svalue) != 1 + } + return res != 1 + } + return metrics[i].metricType < metrics[j].metricType + }) + + assert.Equal(t, []metric{ + metric{ + metricType: gauge, + name: "gaugeTest1", + tags: tags, + rate: 1, + fvalue: float64(21), + timestamp: 0, + }, + metric{ + metricType: gauge, + name: "gaugeTest2", + tags: tags, + rate: 1, + fvalue: float64(15), + timestamp: old.Unix(), + }, + metric{ + metricType: count, + name: "countTest1", + tags: tags, + rate: 1, + ivalue: 44, + timestamp: 0, + }, + metric{ + metricType: count, + name: "countTest2", + tags: tags, + rate: 1, + ivalue: 25, + timestamp: old.Unix(), + }, + }, + metrics) +} + func TestAggregatorFlushConcurrency(t *testing.T) { a := newAggregator(nil) diff --git a/statsd/buffer.go b/statsd/buffer.go index 0e4ea2b2..9a01fbb3 100644 --- a/statsd/buffer.go +++ b/statsd/buffer.go @@ -39,22 +39,28 @@ func newStatsdBuffer(maxSize, maxElements int) *statsdBuffer { } } -func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error { +func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64, timestamp int64) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer b.buffer = appendGauge(b.buffer, namespace, globalTags, name, value, tags, rate) + if timestamp != noTimestamp { + b.buffer = appendTimestamp(b.buffer, timestamp) + } b.writeSeparator() return b.validateNewElement(originalBuffer) } -func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64) error { +func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64, timestamp int64) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer b.buffer = appendCount(b.buffer, namespace, globalTags, name, value, tags, rate) + if timestamp != noTimestamp { + b.buffer = appendTimestamp(b.buffer, timestamp) + } b.writeSeparator() return b.validateNewElement(originalBuffer) } diff --git a/statsd/buffer_pool_test.go b/statsd/buffer_pool_test.go index f150af75..ab2c614b 100644 --- a/statsd/buffer_pool_test.go +++ b/statsd/buffer_pool_test.go @@ -33,7 +33,7 @@ func TestBufferPoolEmpty(t *testing.T) { func TestBufferReturn(t *testing.T) { bufferPool := newBufferPool(1, 1024, 20) buffer := bufferPool.borrowBuffer() - buffer.writeCount("", nil, "", 1, nil, 1) + buffer.writeCount("", nil, "", 1, nil, 1, noTimestamp) assert.Equal(t, 0, len(bufferPool.pool)) bufferPool.returnBuffer(buffer) diff --git a/statsd/buffer_test.go b/statsd/buffer_test.go index 9096407a..83d147e0 100644 --- a/statsd/buffer_test.go +++ b/statsd/buffer_test.go @@ -8,7 +8,7 @@ import ( func TestBufferGauge(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|g|#tag:tag\n", string(buffer.bytes())) @@ -17,14 +17,21 @@ func TestBufferGauge(t *testing.T) { defer resetContainerID() buffer = newStatsdBuffer(1024, 1) - err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|g|#tag:tag|c:container-id\n", string(buffer.bytes())) + + // with a timestamp + buffer = newStatsdBuffer(1024, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, 1658934092) + assert.Nil(t, err) + assert.Equal(t, "namespace.metric:1|g|#tag:tag|c:container-id|T1658934092\n", string(buffer.bytes())) + } func TestBufferCount(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err := buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|c|#tag:tag\n", string(buffer.bytes())) @@ -33,9 +40,15 @@ func TestBufferCount(t *testing.T) { defer resetContainerID() buffer = newStatsdBuffer(1024, 1) - err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|c|#tag:tag|c:container-id\n", string(buffer.bytes())) + + // with a timestamp + buffer = newStatsdBuffer(1024, 1) + err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, 1658934092) + assert.Nil(t, err) + assert.Equal(t, "namespace.metric:1|c|#tag:tag|c:container-id|T1658934092\n", string(buffer.bytes())) } func TestBufferHistogram(t *testing.T) { @@ -135,18 +148,18 @@ func TestBufferServiceCheck(t *testing.T) { func TestBufferFullSize(t *testing.T) { buffer := newStatsdBuffer(30, 10) - err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Len(t, buffer.bytes(), 30) - err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Equal(t, errBufferFull, err) } func TestBufferSeparator(t *testing.T) { buffer := newStatsdBuffer(1024, 10) - err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) - err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|g|#tag:tag\nnamespace.metric:1|g|#tag:tag\n", string(buffer.bytes())) } @@ -223,13 +236,13 @@ func TestBufferAggregated(t *testing.T) { func TestBufferMaxElement(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Nil(t, err) - err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Equal(t, errBufferFull, err) - err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) + err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp) assert.Equal(t, errBufferFull, err) err = buffer.writeHistogram("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) diff --git a/statsd/format.go b/statsd/format.go index 6e05ad56..43ea2730 100644 --- a/statsd/format.go +++ b/statsd/format.go @@ -270,3 +270,9 @@ func appendContainerID(buffer []byte) []byte { } return buffer } + +func appendTimestamp(buffer []byte, timestamp int64) []byte { + buffer = append(buffer, "|T"...) + buffer = strconv.AppendInt(buffer, timestamp, 10) + return buffer +} diff --git a/statsd/metrics.go b/statsd/metrics.go index 82f11ac1..46b99968 100644 --- a/statsd/metrics.go +++ b/statsd/metrics.go @@ -14,16 +14,18 @@ Those are metrics type that can be aggregated on the client side: */ type countMetric struct { - value int64 - name string - tags []string + value int64 + timestamp int64 + name string + tags []string } -func newCountMetric(name string, value int64, tags []string) *countMetric { +func newCountMetric(name string, value int64, tags []string, timestamp int64) *countMetric { return &countMetric{ - value: value, - name: name, - tags: copySlice(tags), + value: value, + timestamp: timestamp, + name: name, + tags: copySlice(tags), } } @@ -38,22 +40,25 @@ func (c *countMetric) flushUnsafe() metric { tags: c.tags, rate: 1, ivalue: c.value, + timestamp: c.timestamp, } } // Gauge type gaugeMetric struct { - value uint64 - name string - tags []string + value uint64 + timestamp int64 + name string + tags []string } -func newGaugeMetric(name string, value float64, tags []string) *gaugeMetric { +func newGaugeMetric(name string, value float64, tags []string, timestamp int64) *gaugeMetric { return &gaugeMetric{ - value: math.Float64bits(value), - name: name, - tags: copySlice(tags), + value: math.Float64bits(value), + timestamp: timestamp, + name: name, + tags: copySlice(tags), } } @@ -68,6 +73,7 @@ func (g *gaugeMetric) flushUnsafe() metric { tags: g.tags, rate: 1, fvalue: math.Float64frombits(g.value), + timestamp: g.timestamp, } } diff --git a/statsd/metrics_test.go b/statsd/metrics_test.go index 41bb743d..281ff2e2 100644 --- a/statsd/metrics_test.go +++ b/statsd/metrics_test.go @@ -11,27 +11,38 @@ import ( ) func TestNewCountMetric(t *testing.T) { - c := newCountMetric("test", 21, []string{"tag1", "tag2"}) + c := newCountMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) assert.Equal(t, c.value, int64(21)) assert.Equal(t, c.name, "test") assert.Equal(t, c.tags, []string{"tag1", "tag2"}) + assert.Equal(t, c.timestamp, noTimestamp) +} + +func TestNewCountMetricWithTimestamp(t *testing.T) { + c := newCountMetric("test", 21, []string{"tag1", "tag2"}, 1658934956) + assert.Equal(t, c.value, int64(21)) + assert.Equal(t, c.name, "test") + assert.Equal(t, c.tags, []string{"tag1", "tag2"}) + assert.Equal(t, c.timestamp, int64(1658934956)) } func TestCountMetricSample(t *testing.T) { - c := newCountMetric("test", 21, []string{"tag1", "tag2"}) + c := newCountMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) c.sample(12) assert.Equal(t, c.value, int64(33)) assert.Equal(t, c.name, "test") assert.Equal(t, c.tags, []string{"tag1", "tag2"}) + assert.Equal(t, c.timestamp, noTimestamp) } func TestFlushUnsafeCountMetricSample(t *testing.T) { - c := newCountMetric("test", 21, []string{"tag1", "tag2"}) + c := newCountMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) m := c.flushUnsafe() assert.Equal(t, m.metricType, count) assert.Equal(t, m.ivalue, int64(21)) assert.Equal(t, m.name, "test") assert.Equal(t, m.tags, []string{"tag1", "tag2"}) + assert.Equal(t, m.timestamp, noTimestamp) c.sample(12) m = c.flushUnsafe() @@ -39,30 +50,42 @@ func TestFlushUnsafeCountMetricSample(t *testing.T) { assert.Equal(t, m.ivalue, int64(33)) assert.Equal(t, m.name, "test") assert.Equal(t, m.tags, []string{"tag1", "tag2"}) + assert.Equal(t, m.timestamp, noTimestamp) } func TestNewGaugeMetric(t *testing.T) { - g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}) + g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) assert.Equal(t, math.Float64frombits(g.value), float64(21)) assert.Equal(t, g.name, "test") assert.Equal(t, g.tags, []string{"tag1", "tag2"}) + assert.Equal(t, g.timestamp, noTimestamp) } func TestGaugeMetricSample(t *testing.T) { - g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}) + g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) g.sample(12) assert.Equal(t, math.Float64frombits(g.value), float64(12)) assert.Equal(t, g.name, "test") assert.Equal(t, g.tags, []string{"tag1", "tag2"}) + assert.Equal(t, g.timestamp, noTimestamp) +} + +func TestNewGaugeMetricWithTimestamp(t *testing.T) { + g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, 1658934956) + assert.Equal(t, math.Float64frombits(g.value), float64(21)) + assert.Equal(t, g.name, "test") + assert.Equal(t, g.tags, []string{"tag1", "tag2"}) + assert.Equal(t, g.timestamp, int64(1658934956)) } func TestFlushUnsafeGaugeMetricSample(t *testing.T) { - g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}) + g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) m := g.flushUnsafe() assert.Equal(t, m.metricType, gauge) assert.Equal(t, m.fvalue, float64(21)) assert.Equal(t, m.name, "test") assert.Equal(t, m.tags, []string{"tag1", "tag2"}) + assert.Equal(t, m.timestamp, noTimestamp) g.sample(12) m = g.flushUnsafe() @@ -70,6 +93,7 @@ func TestFlushUnsafeGaugeMetricSample(t *testing.T) { assert.Equal(t, m.fvalue, float64(12)) assert.Equal(t, m.name, "test") assert.Equal(t, m.tags, []string{"tag1", "tag2"}) + assert.Equal(t, m.timestamp, noTimestamp) } func TestNewSetMetric(t *testing.T) { diff --git a/statsd/mocks/statsd.go b/statsd/mocks/statsd.go index 80135c77..dd783df5 100644 --- a/statsd/mocks/statsd.go +++ b/statsd/mocks/statsd.go @@ -63,6 +63,20 @@ func (mr *MockClientInterfaceMockRecorder) Count(name, value, tags, rate interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Count", reflect.TypeOf((*MockClientInterface)(nil).Count), name, value, tags, rate) } +// CountWithTimestamp mocks base method. +func (m *MockClientInterface) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CountWithTimestamp", name, value, tags, rate, timestamp) + ret0, _ := ret[0].(error) + return ret0 +} + +// CountWithTimestamp indicates an expected call of CountWithTimestamp. +func (mr *MockClientInterfaceMockRecorder) CountWithTimestamp(name, value, tags, rate, timestamp interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountWithTimestamp", reflect.TypeOf((*MockClientInterface)(nil).CountWithTimestamp), name, value, tags, rate, timestamp) +} + // Decr mocks base method. func (m *MockClientInterface) Decr(name string, tags []string, rate float64) error { m.ctrl.T.Helper() @@ -133,6 +147,20 @@ func (mr *MockClientInterfaceMockRecorder) Gauge(name, value, tags, rate interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Gauge", reflect.TypeOf((*MockClientInterface)(nil).Gauge), name, value, tags, rate) } +// GaugeWithTimestamp mocks base method. +func (m *MockClientInterface) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GaugeWithTimestamp", name, value, tags, rate, timestamp) + ret0, _ := ret[0].(error) + return ret0 +} + +// GaugeWithTimestamp indicates an expected call of GaugeWithTimestamp. +func (mr *MockClientInterfaceMockRecorder) GaugeWithTimestamp(name, value, tags, rate, timestamp interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GaugeWithTimestamp", reflect.TypeOf((*MockClientInterface)(nil).GaugeWithTimestamp), name, value, tags, rate, timestamp) +} + // GetTelemetry mocks base method. func (m *MockClientInterface) GetTelemetry() statsd.Telemetry { m.ctrl.T.Helper() diff --git a/statsd/noop.go b/statsd/noop.go index 5c093988..e92744f4 100644 --- a/statsd/noop.go +++ b/statsd/noop.go @@ -11,11 +11,21 @@ func (n *NoOpClient) Gauge(name string, value float64, tags []string, rate float return nil } +// GaugeWithTimestamp does nothing and returns nil +func (n *NoOpClient) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error { + return nil +} + // Count does nothing and returns nil func (n *NoOpClient) Count(name string, value int64, tags []string, rate float64) error { return nil } +// CountWithTimestamp does nothing and returns nil +func (n *NoOpClient) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error { + return nil +} + // Histogram does nothing and returns nil func (n *NoOpClient) Histogram(name string, value float64, tags []string, rate float64) error { return nil diff --git a/statsd/noop_test.go b/statsd/noop_test.go index 2aaf0133..4df17019 100644 --- a/statsd/noop_test.go +++ b/statsd/noop_test.go @@ -13,7 +13,9 @@ func TestNoOpClient(t *testing.T) { tags := []string{"a:b"} a.Nil(c.Gauge("asd", 123.4, tags, 56.0)) + a.Nil(c.GaugeWithTimestamp("asd", 123.4, tags, 56.0, time.Now())) a.Nil(c.Count("asd", 1234, tags, 56.0)) + a.Nil(c.CountWithTimestamp("asd", 123, tags, 56.0, time.Now())) a.Nil(c.Histogram("asd", 12.34, tags, 56.0)) a.Nil(c.Distribution("asd", 1.234, tags, 56.0)) a.Nil(c.Decr("asd", tags, 56.0)) diff --git a/statsd/statsd.go b/statsd/statsd.go index b1bcce01..b807ffd4 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -140,6 +140,7 @@ type metric struct { tags []string stags string rate float64 + timestamp int64 } type noClientErr string @@ -159,9 +160,15 @@ type ClientInterface interface { // Gauge measures the value of a metric at a particular time. Gauge(name string, value float64, tags []string, rate float64) error + // Gauge measures the value of a metric at a given time. + GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error + // Count tracks how many times something happened per second. Count(name string, value int64, tags []string, rate float64) error + // Count tracks how many times something happened at the given second. + CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error + // Histogram tracks the statistical distribution of a set of values on each host. Histogram(name string, value float64, tags []string, rate float64) error @@ -552,6 +559,19 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64) return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) } +// Gauge measures the value of a metric at a given time. +func (c *Client) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error { + if c == nil { + return ErrNoClient + } + ts := timestamp.Unix() + atomic.AddUint64(&c.telemetry.totalMetricsGauge, 1) + if c.agg != nil { + return c.agg.gaugeWithTimestamp(name, value, tags, ts) + } + return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: ts}) +} + // Count tracks how many times something happened per second. func (c *Client) Count(name string, value int64, tags []string, rate float64) error { if c == nil { @@ -564,6 +584,19 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) } +// Count tracks how many times something happened at the given second. +func (c *Client) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error { + if c == nil { + return ErrNoClient + } + ts := timestamp.Unix() + atomic.AddUint64(&c.telemetry.totalMetricsCount, 1) + if c.agg != nil { + return c.agg.countWithTimestamp(name, value, tags, ts) + } + return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: ts}) +} + // Histogram tracks the statistical distribution of a set of values on each host. func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error { if c == nil { diff --git a/statsd/worker.go b/statsd/worker.go index 5446d506..952a9fe3 100644 --- a/statsd/worker.go +++ b/statsd/worker.go @@ -100,9 +100,9 @@ func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte, prec func (w *worker) writeMetricUnsafe(m metric) error { switch m.metricType { case gauge: - return w.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) + return w.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate, m.timestamp) case count: - return w.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate) + return w.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate, m.timestamp) case histogram: return w.buffer.writeHistogram(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) case distribution: diff --git a/statsd/worker_test.go b/statsd/worker_test.go index 67e7de5a..b402e2e1 100644 --- a/statsd/worker_test.go +++ b/statsd/worker_test.go @@ -78,6 +78,23 @@ func TestWorkerGauge(t *testing.T) { ) } +func TestWorkerGaugeWithTimestamp(t *testing.T) { + testWorker( + t, + metric{ + metricType: gauge, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_gauge", + fvalue: 21, + tags: []string{"tag1", "tag2"}, + rate: 1, + timestamp: 1658997712, + }, + "namespace.test_gauge:21|g|#globalTags,globalTags2,tag1,tag2|T1658997712\n", + ) +} + func TestWorkerCount(t *testing.T) { testWorker( t, @@ -94,6 +111,22 @@ func TestWorkerCount(t *testing.T) { ) } +func TestWorkerCountWithTimestamp(t *testing.T) { + testWorker( + t, + metric{ + metricType: count, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_count", + ivalue: 21, + tags: []string{"tag1", "tag2"}, + rate: 1, + timestamp: 1658997712, + }, + "namespace.test_count:21|c|#globalTags,globalTags2,tag1,tag2|T1658997712\n", + ) +} func TestWorkerHistogram(t *testing.T) { testWorker( t, From 35293312710f55b3e0559b008c2e65776886d473 Mon Sep 17 00:00:00 2001 From: Remy Mathieu Date: Thu, 28 Jul 2022 14:29:33 +0200 Subject: [PATCH 2/8] statsd: the metrics with timestamp are directly written in the serialization buffer. They are not going through the client aggregator. --- statsd/aggregator.go | 20 +------- statsd/aggregator_test.go | 103 -------------------------------------- statsd/metrics.go | 34 ++++++------- statsd/metrics_test.go | 26 +++------- statsd/statsd.go | 22 ++++---- 5 files changed, 35 insertions(+), 170 deletions(-) diff --git a/statsd/aggregator.go b/statsd/aggregator.go index fd2460a7..b9b9d15e 100644 --- a/statsd/aggregator.go +++ b/statsd/aggregator.go @@ -221,15 +221,7 @@ func (a *aggregator) count(name string, value int64, tags []string) error { return nil } - a.counts[context] = newCountMetric(name, value, tags, noTimestamp) - a.countsM.Unlock() - return nil -} - -func (a *aggregator) countWithTimestamp(name string, value int64, tags []string, timestamp int64) error { - context := getContext(name, tags) - a.countsM.Lock() - a.counts[context] = newCountMetric(name, value, tags, timestamp) + a.counts[context] = newCountMetric(name, value, tags) a.countsM.Unlock() return nil } @@ -244,7 +236,7 @@ func (a *aggregator) gauge(name string, value float64, tags []string) error { } a.gaugesM.RUnlock() - gauge := newGaugeMetric(name, value, tags, noTimestamp) + gauge := newGaugeMetric(name, value, tags) a.gaugesM.Lock() // Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock' @@ -258,14 +250,6 @@ func (a *aggregator) gauge(name string, value float64, tags []string) error { return nil } -func (a *aggregator) gaugeWithTimestamp(name string, value float64, tags []string, timestamp int64) error { - context := getContext(name, tags) - a.gaugesM.Lock() - a.gauges[context] = newGaugeMetric(name, value, tags, timestamp) - a.gaugesM.Unlock() - return nil -} - func (a *aggregator) set(name string, value string, tags []string) error { context := getContext(name, tags) a.setsM.RLock() diff --git a/statsd/aggregator_test.go b/statsd/aggregator_test.go index b292d17e..c2e92c44 100644 --- a/statsd/aggregator_test.go +++ b/statsd/aggregator_test.go @@ -5,7 +5,6 @@ import ( "strings" "sync" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -47,36 +46,6 @@ func TestAggregatorSample(t *testing.T) { } } -func TestAggregatorWithTimedSamples(t *testing.T) { - a := newAggregator(nil) - - tags := []string{"tag1", "tag2"} - - for i := 0; i < 2; i++ { - a.gauge("gaugeTest", 21, tags) - assert.Len(t, a.gauges, 1) - assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2") - - for j := 0; j < 10; j++ { - // this one should only update the existing one, no new entry in a.gauges - a.gaugeWithTimestamp("gaugeTest", 21, tags, time.Now().Unix()) - assert.Len(t, a.gauges, 1) - assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2") - } - - a.count("countTest", 21, tags) - assert.Len(t, a.counts, 1) - assert.Contains(t, a.counts, "countTest:tag1,tag2") - - for j := 0; j < 10; j++ { - // this one should only override the existing one, no new entry in a.counts - a.countWithTimestamp("countTest", 21, tags, time.Now().Unix()) - assert.Len(t, a.counts, 1) - assert.Contains(t, a.counts, "countTest:tag1,tag2") - } - } -} - func TestAggregatorFlush(t *testing.T) { a := newAggregator(nil) @@ -226,78 +195,6 @@ func TestAggregatorFlush(t *testing.T) { metrics) } -func TestAggregatorFlushWithTimedSamplesMixed(t *testing.T) { - a := newAggregator(nil) - - tags := []string{"tag1", "tag2"} - - veryOld := time.Now().Add(-24 * time.Hour) - old := time.Now().Add(-6 * time.Hour) - - a.gauge("gaugeTest1", 21, tags) - a.gaugeWithTimestamp("gaugeTest2", 10, tags, veryOld.Unix()) - a.gaugeWithTimestamp("gaugeTest2", 15, tags, old.Unix()) - - a.count("countTest1", 44, tags) - a.countWithTimestamp("countTest2", 23, tags, veryOld.Unix()) - a.countWithTimestamp("countTest2", 25, tags, old.Unix()) - - metrics := a.flushMetrics() - - assert.Len(t, a.gauges, 0) - assert.Len(t, a.counts, 0) - - assert.Len(t, metrics, 4) - - sort.Slice(metrics, func(i, j int) bool { - if metrics[i].metricType == metrics[j].metricType { - res := strings.Compare(metrics[i].name, metrics[j].name) - // this happens fo set - if res == 0 { - return strings.Compare(metrics[i].svalue, metrics[j].svalue) != 1 - } - return res != 1 - } - return metrics[i].metricType < metrics[j].metricType - }) - - assert.Equal(t, []metric{ - metric{ - metricType: gauge, - name: "gaugeTest1", - tags: tags, - rate: 1, - fvalue: float64(21), - timestamp: 0, - }, - metric{ - metricType: gauge, - name: "gaugeTest2", - tags: tags, - rate: 1, - fvalue: float64(15), - timestamp: old.Unix(), - }, - metric{ - metricType: count, - name: "countTest1", - tags: tags, - rate: 1, - ivalue: 44, - timestamp: 0, - }, - metric{ - metricType: count, - name: "countTest2", - tags: tags, - rate: 1, - ivalue: 25, - timestamp: old.Unix(), - }, - }, - metrics) -} - func TestAggregatorFlushConcurrency(t *testing.T) { a := newAggregator(nil) diff --git a/statsd/metrics.go b/statsd/metrics.go index 46b99968..82f11ac1 100644 --- a/statsd/metrics.go +++ b/statsd/metrics.go @@ -14,18 +14,16 @@ Those are metrics type that can be aggregated on the client side: */ type countMetric struct { - value int64 - timestamp int64 - name string - tags []string + value int64 + name string + tags []string } -func newCountMetric(name string, value int64, tags []string, timestamp int64) *countMetric { +func newCountMetric(name string, value int64, tags []string) *countMetric { return &countMetric{ - value: value, - timestamp: timestamp, - name: name, - tags: copySlice(tags), + value: value, + name: name, + tags: copySlice(tags), } } @@ -40,25 +38,22 @@ func (c *countMetric) flushUnsafe() metric { tags: c.tags, rate: 1, ivalue: c.value, - timestamp: c.timestamp, } } // Gauge type gaugeMetric struct { - value uint64 - timestamp int64 - name string - tags []string + value uint64 + name string + tags []string } -func newGaugeMetric(name string, value float64, tags []string, timestamp int64) *gaugeMetric { +func newGaugeMetric(name string, value float64, tags []string) *gaugeMetric { return &gaugeMetric{ - value: math.Float64bits(value), - timestamp: timestamp, - name: name, - tags: copySlice(tags), + value: math.Float64bits(value), + name: name, + tags: copySlice(tags), } } @@ -73,7 +68,6 @@ func (g *gaugeMetric) flushUnsafe() metric { tags: g.tags, rate: 1, fvalue: math.Float64frombits(g.value), - timestamp: g.timestamp, } } diff --git a/statsd/metrics_test.go b/statsd/metrics_test.go index 281ff2e2..93026d15 100644 --- a/statsd/metrics_test.go +++ b/statsd/metrics_test.go @@ -11,38 +11,34 @@ import ( ) func TestNewCountMetric(t *testing.T) { - c := newCountMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) + c := newCountMetric("test", 21, []string{"tag1", "tag2"}) assert.Equal(t, c.value, int64(21)) assert.Equal(t, c.name, "test") assert.Equal(t, c.tags, []string{"tag1", "tag2"}) - assert.Equal(t, c.timestamp, noTimestamp) } func TestNewCountMetricWithTimestamp(t *testing.T) { - c := newCountMetric("test", 21, []string{"tag1", "tag2"}, 1658934956) + c := newCountMetric("test", 21, []string{"tag1", "tag2"}) assert.Equal(t, c.value, int64(21)) assert.Equal(t, c.name, "test") assert.Equal(t, c.tags, []string{"tag1", "tag2"}) - assert.Equal(t, c.timestamp, int64(1658934956)) } func TestCountMetricSample(t *testing.T) { - c := newCountMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) + c := newCountMetric("test", 21, []string{"tag1", "tag2"}) c.sample(12) assert.Equal(t, c.value, int64(33)) assert.Equal(t, c.name, "test") assert.Equal(t, c.tags, []string{"tag1", "tag2"}) - assert.Equal(t, c.timestamp, noTimestamp) } func TestFlushUnsafeCountMetricSample(t *testing.T) { - c := newCountMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) + c := newCountMetric("test", 21, []string{"tag1", "tag2"}) m := c.flushUnsafe() assert.Equal(t, m.metricType, count) assert.Equal(t, m.ivalue, int64(21)) assert.Equal(t, m.name, "test") assert.Equal(t, m.tags, []string{"tag1", "tag2"}) - assert.Equal(t, m.timestamp, noTimestamp) c.sample(12) m = c.flushUnsafe() @@ -50,42 +46,37 @@ func TestFlushUnsafeCountMetricSample(t *testing.T) { assert.Equal(t, m.ivalue, int64(33)) assert.Equal(t, m.name, "test") assert.Equal(t, m.tags, []string{"tag1", "tag2"}) - assert.Equal(t, m.timestamp, noTimestamp) } func TestNewGaugeMetric(t *testing.T) { - g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) + g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}) assert.Equal(t, math.Float64frombits(g.value), float64(21)) assert.Equal(t, g.name, "test") assert.Equal(t, g.tags, []string{"tag1", "tag2"}) - assert.Equal(t, g.timestamp, noTimestamp) } func TestGaugeMetricSample(t *testing.T) { - g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) + g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}) g.sample(12) assert.Equal(t, math.Float64frombits(g.value), float64(12)) assert.Equal(t, g.name, "test") assert.Equal(t, g.tags, []string{"tag1", "tag2"}) - assert.Equal(t, g.timestamp, noTimestamp) } func TestNewGaugeMetricWithTimestamp(t *testing.T) { - g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, 1658934956) + g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}) assert.Equal(t, math.Float64frombits(g.value), float64(21)) assert.Equal(t, g.name, "test") assert.Equal(t, g.tags, []string{"tag1", "tag2"}) - assert.Equal(t, g.timestamp, int64(1658934956)) } func TestFlushUnsafeGaugeMetricSample(t *testing.T) { - g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, noTimestamp) + g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}) m := g.flushUnsafe() assert.Equal(t, m.metricType, gauge) assert.Equal(t, m.fvalue, float64(21)) assert.Equal(t, m.name, "test") assert.Equal(t, m.tags, []string{"tag1", "tag2"}) - assert.Equal(t, m.timestamp, noTimestamp) g.sample(12) m = g.flushUnsafe() @@ -93,7 +84,6 @@ func TestFlushUnsafeGaugeMetricSample(t *testing.T) { assert.Equal(t, m.fvalue, float64(12)) assert.Equal(t, m.name, "test") assert.Equal(t, m.tags, []string{"tag1", "tag2"}) - assert.Equal(t, m.timestamp, noTimestamp) } func TestNewSetMetric(t *testing.T) { diff --git a/statsd/statsd.go b/statsd/statsd.go index b807ffd4..fb85a19b 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -161,12 +161,16 @@ type ClientInterface interface { Gauge(name string, value float64, tags []string, rate float64) error // Gauge measures the value of a metric at a given time. + // Even with client side aggregation enabled, there is no aggregation done on a metric + // sent using GaugeWithTimestamp: it is written as is in the serialization buffer. GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error // Count tracks how many times something happened per second. Count(name string, value int64, tags []string, rate float64) error - // Count tracks how many times something happened at the given second. + // CountWithTimestamp tracks how many times something happened at the given second. + // Even with client side aggregation enabled, there is no aggregation done on a metric + // sent using CountWithTimestamp: it is written as is in the serialization buffer. CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error // Histogram tracks the statistical distribution of a set of values on each host. @@ -560,16 +564,14 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64) } // Gauge measures the value of a metric at a given time. +// Even with client side aggregation enabled, there is no aggregation done on a metric +// sent using GaugeWithTimestamp: it is written as is in the serialization buffer. func (c *Client) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error { if c == nil { return ErrNoClient } - ts := timestamp.Unix() atomic.AddUint64(&c.telemetry.totalMetricsGauge, 1) - if c.agg != nil { - return c.agg.gaugeWithTimestamp(name, value, tags, ts) - } - return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: ts}) + return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix()}) } // Count tracks how many times something happened per second. @@ -585,16 +587,14 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er } // Count tracks how many times something happened at the given second. +// Even with client side aggregation enabled, there is no aggregation done on a metric +// sent using CountWithTimestamp: it is written as is in the serialization buffer. func (c *Client) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error { if c == nil { return ErrNoClient } - ts := timestamp.Unix() atomic.AddUint64(&c.telemetry.totalMetricsCount, 1) - if c.agg != nil { - return c.agg.countWithTimestamp(name, value, tags, ts) - } - return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: ts}) + return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix()}) } // Histogram tracks the statistical distribution of a set of values on each host. From 87572b0e97ddf48dc8f0bfb923b5ac08f1485096 Mon Sep 17 00:00:00 2001 From: Remy Mathieu Date: Fri, 29 Jul 2022 14:22:26 +0200 Subject: [PATCH 3/8] statsd: address review feedback. --- statsd/aggregator.go | 3 --- statsd/buffer.go | 8 ++------ statsd/format.go | 6 ++++-- statsd/statsd.go | 25 ++++++++++++++----------- 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/statsd/aggregator.go b/statsd/aggregator.go index b9b9d15e..65c050ed 100644 --- a/statsd/aggregator.go +++ b/statsd/aggregator.go @@ -14,9 +14,6 @@ type ( bufferedMetricMap map[string]*bufferedMetric ) -// noTimestamp is used as a value for metric without a given timestamp. -const noTimestamp = int64(0) - type aggregator struct { nbContextGauge uint64 nbContextCount uint64 diff --git a/statsd/buffer.go b/statsd/buffer.go index 9a01fbb3..f7bb8b0a 100644 --- a/statsd/buffer.go +++ b/statsd/buffer.go @@ -45,9 +45,7 @@ func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name st } originalBuffer := b.buffer b.buffer = appendGauge(b.buffer, namespace, globalTags, name, value, tags, rate) - if timestamp != noTimestamp { - b.buffer = appendTimestamp(b.buffer, timestamp) - } + b.buffer = appendTimestamp(b.buffer, timestamp) b.writeSeparator() return b.validateNewElement(originalBuffer) } @@ -58,9 +56,7 @@ func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name st } originalBuffer := b.buffer b.buffer = appendCount(b.buffer, namespace, globalTags, name, value, tags, rate) - if timestamp != noTimestamp { - b.buffer = appendTimestamp(b.buffer, timestamp) - } + b.buffer = appendTimestamp(b.buffer, timestamp) b.writeSeparator() return b.validateNewElement(originalBuffer) } diff --git a/statsd/format.go b/statsd/format.go index 43ea2730..7513edd6 100644 --- a/statsd/format.go +++ b/statsd/format.go @@ -272,7 +272,9 @@ func appendContainerID(buffer []byte) []byte { } func appendTimestamp(buffer []byte, timestamp int64) []byte { - buffer = append(buffer, "|T"...) - buffer = strconv.AppendInt(buffer, timestamp, 10) + if timestamp != noTimestamp { + buffer = append(buffer, "|T"...) + buffer = strconv.AppendInt(buffer, timestamp, 10) + } return buffer } diff --git a/statsd/statsd.go b/statsd/statsd.go index fb85a19b..3d1d99fb 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -126,6 +126,9 @@ const ( writerWindowsPipe string = "pipe" ) +// noTimestamp is used as a value for metric without a given timestamp. +const noTimestamp = int64(0) + type metric struct { metricType metricType namespace string @@ -160,17 +163,17 @@ type ClientInterface interface { // Gauge measures the value of a metric at a particular time. Gauge(name string, value float64, tags []string, rate float64) error - // Gauge measures the value of a metric at a given time. - // Even with client side aggregation enabled, there is no aggregation done on a metric - // sent using GaugeWithTimestamp: it is written as is in the serialization buffer. + // GaugeWithTimestamp measures the value of a metric at a given time. + // The value will bypass any aggregation on the client side and agent side. + // This is useful when sending points in the past. GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error // Count tracks how many times something happened per second. Count(name string, value int64, tags []string, rate float64) error // CountWithTimestamp tracks how many times something happened at the given second. - // Even with client side aggregation enabled, there is no aggregation done on a metric - // sent using CountWithTimestamp: it is written as is in the serialization buffer. + // The value will bypass any aggregation on the client side and agent side. + // This is useful when sending points in the past. CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error // Histogram tracks the statistical distribution of a set of values on each host. @@ -563,9 +566,9 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64) return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) } -// Gauge measures the value of a metric at a given time. -// Even with client side aggregation enabled, there is no aggregation done on a metric -// sent using GaugeWithTimestamp: it is written as is in the serialization buffer. +// GaugeWithTimestamp measures the value of a metric at a given time. +// The value will bypass any aggregation on the client side and agent side. +// This is useful when sending points in the past. func (c *Client) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error { if c == nil { return ErrNoClient @@ -586,9 +589,9 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) } -// Count tracks how many times something happened at the given second. -// Even with client side aggregation enabled, there is no aggregation done on a metric -// sent using CountWithTimestamp: it is written as is in the serialization buffer. +// CountWithTimestamp tracks how many times something happened at the given second. +// The value will bypass any aggregation on the client side and agent side. +// This is useful when sending points in the past. func (c *Client) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error { if c == nil { return ErrNoClient From 68308a5c1c6e6867acbcd9d1acd0f1826433b633 Mon Sep 17 00:00:00 2001 From: Remy Mathieu Date: Fri, 29 Jul 2022 14:43:25 +0200 Subject: [PATCH 4/8] statsd: add minimum Datadog Agent version for feature support. --- statsd/statsd.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/statsd/statsd.go b/statsd/statsd.go index 3d1d99fb..0fadd6a8 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -166,6 +166,7 @@ type ClientInterface interface { // GaugeWithTimestamp measures the value of a metric at a given time. // The value will bypass any aggregation on the client side and agent side. // This is useful when sending points in the past. + // Minimum Datadog Agent version: 7.39.0 GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error // Count tracks how many times something happened per second. @@ -174,6 +175,7 @@ type ClientInterface interface { // CountWithTimestamp tracks how many times something happened at the given second. // The value will bypass any aggregation on the client side and agent side. // This is useful when sending points in the past. + // Minimum Datadog Agent version: 7.39.0 CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error // Histogram tracks the statistical distribution of a set of values on each host. From 6e8a1efc9d663ba65aac9153fb8671c5b9300877 Mon Sep 17 00:00:00 2001 From: Remy Mathieu Date: Tue, 2 Aug 2022 15:43:35 +0200 Subject: [PATCH 5/8] statsd: return an error if an invalid timestamp (< 0) is provided. --- statsd/statsd.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/statsd/statsd.go b/statsd/statsd.go index 0fadd6a8..98e44c87 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -156,6 +156,15 @@ func (e noClientErr) Error() string { return string(e) } +type invalidTimestampErr string + +// InvalidTimestamp is returned if a provided timestamp is invalid. +const InvalidTimestamp = invalidTimestampErr("invalid timestamp") + +func (e invalidTimestampErr) Error() string { + return string(e) +} + // ClientInterface is an interface that exposes the common client functions for the // purpose of being able to provide a no-op client or even mocking. This can aid // downstream users' with their testing. @@ -575,6 +584,11 @@ func (c *Client) GaugeWithTimestamp(name string, value float64, tags []string, r if c == nil { return ErrNoClient } + + if timestamp.Unix() < 0 { + return InvalidTimestamp + } + atomic.AddUint64(&c.telemetry.totalMetricsGauge, 1) return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix()}) } @@ -598,6 +612,11 @@ func (c *Client) CountWithTimestamp(name string, value int64, tags []string, rat if c == nil { return ErrNoClient } + + if timestamp.Unix() < 0 { + return InvalidTimestamp + } + atomic.AddUint64(&c.telemetry.totalMetricsCount, 1) return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix()}) } From 0325b2a672b5b7046b41b2df3f76c89073885531 Mon Sep 17 00:00:00 2001 From: Remy Mathieu Date: Thu, 25 Aug 2022 15:02:24 +0200 Subject: [PATCH 6/8] statsd: mention a minimum datadog agent version --- statsd/statsd.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/statsd/statsd.go b/statsd/statsd.go index 98e44c87..fe3b6d83 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -580,6 +580,8 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64) // GaugeWithTimestamp measures the value of a metric at a given time. // The value will bypass any aggregation on the client side and agent side. // This is useful when sending points in the past. +// +// Minimum Datadog Agent version: 7.40.0 func (c *Client) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error { if c == nil { return ErrNoClient @@ -608,6 +610,8 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er // CountWithTimestamp tracks how many times something happened at the given second. // The value will bypass any aggregation on the client side and agent side. // This is useful when sending points in the past. +// +// Minimum Datadog Agent version: 7.40.0 func (c *Client) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error { if c == nil { return ErrNoClient From 4d2780780c1bfcb61701ed6173053a72c0c6bcb2 Mon Sep 17 00:00:00 2001 From: Remy Mathieu Date: Mon, 19 Sep 2022 14:41:11 +0200 Subject: [PATCH 7/8] statsd: address feedback concerning `noTimestamp` use. --- statsd/format.go | 2 +- statsd/statsd.go | 31 ++++++++++++++++++------------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/statsd/format.go b/statsd/format.go index 7513edd6..f3ab9231 100644 --- a/statsd/format.go +++ b/statsd/format.go @@ -272,7 +272,7 @@ func appendContainerID(buffer []byte) []byte { } func appendTimestamp(buffer []byte, timestamp int64) []byte { - if timestamp != noTimestamp { + if timestamp > noTimestamp { buffer = append(buffer, "|T"...) buffer = strconv.AppendInt(buffer, timestamp, 10) } diff --git a/statsd/statsd.go b/statsd/statsd.go index fe3b6d83..8ada7f3f 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -173,18 +173,21 @@ type ClientInterface interface { Gauge(name string, value float64, tags []string, rate float64) error // GaugeWithTimestamp measures the value of a metric at a given time. - // The value will bypass any aggregation on the client side and agent side. - // This is useful when sending points in the past. - // Minimum Datadog Agent version: 7.39.0 + // The value will bypass any aggregation on the client side and agent side, this is + // useful when sending points in the past. + // Please report to the Datadog documentation for the maximum age of a metric. + // + // Minimum Datadog Agent version: 7.40.0 GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error // Count tracks how many times something happened per second. Count(name string, value int64, tags []string, rate float64) error - // CountWithTimestamp tracks how many times something happened at the given second. - // The value will bypass any aggregation on the client side and agent side. - // This is useful when sending points in the past. - // Minimum Datadog Agent version: 7.39.0 + // The value will bypass any aggregation on the client side and agent side, this is + // useful when sending points in the past. + // Please report to the Datadog documentation for the maximum age of a metric. + // + // Minimum Datadog Agent version: 7.40.0 CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error // Histogram tracks the statistical distribution of a set of values on each host. @@ -578,8 +581,9 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64) } // GaugeWithTimestamp measures the value of a metric at a given time. -// The value will bypass any aggregation on the client side and agent side. -// This is useful when sending points in the past. +// The value will bypass any aggregation on the client side and agent side, this is +// useful when sending points in the past. +// Please report to the Datadog documentation for the maximum age of a metric. // // Minimum Datadog Agent version: 7.40.0 func (c *Client) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error { @@ -587,7 +591,7 @@ func (c *Client) GaugeWithTimestamp(name string, value float64, tags []string, r return ErrNoClient } - if timestamp.Unix() < 0 { + if timestamp.IsZero() || timestamp.Unix() <= noTimestamp { return InvalidTimestamp } @@ -608,8 +612,9 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er } // CountWithTimestamp tracks how many times something happened at the given second. -// The value will bypass any aggregation on the client side and agent side. -// This is useful when sending points in the past. +// The value will bypass any aggregation on the client side and agent side, this is +// useful when sending points in the past. +// Please report to the Datadog documentation for the maximum age of a metric. // // Minimum Datadog Agent version: 7.40.0 func (c *Client) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error { @@ -617,7 +622,7 @@ func (c *Client) CountWithTimestamp(name string, value int64, tags []string, rat return ErrNoClient } - if timestamp.Unix() < 0 { + if timestamp.IsZero() || timestamp.Unix() <= noTimestamp { return InvalidTimestamp } From da9d9c6c5f26f72a879b1797f9956975aa55326a Mon Sep 17 00:00:00 2001 From: Remy Mathieu Date: Thu, 6 Oct 2022 10:25:49 +0200 Subject: [PATCH 8/8] statsd: comments phrasing --- statsd/statsd.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/statsd/statsd.go b/statsd/statsd.go index 8ada7f3f..e6db34fa 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -173,9 +173,9 @@ type ClientInterface interface { Gauge(name string, value float64, tags []string, rate float64) error // GaugeWithTimestamp measures the value of a metric at a given time. + // BETA - Please contact our support team for more information to use this feature: https://www.datadoghq.com/support/ // The value will bypass any aggregation on the client side and agent side, this is // useful when sending points in the past. - // Please report to the Datadog documentation for the maximum age of a metric. // // Minimum Datadog Agent version: 7.40.0 GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error @@ -183,9 +183,10 @@ type ClientInterface interface { // Count tracks how many times something happened per second. Count(name string, value int64, tags []string, rate float64) error + // CountWithTimestamp tracks how many times something happened at the given second. + // BETA - Please contact our support team for more information to use this feature: https://www.datadoghq.com/support/ // The value will bypass any aggregation on the client side and agent side, this is // useful when sending points in the past. - // Please report to the Datadog documentation for the maximum age of a metric. // // Minimum Datadog Agent version: 7.40.0 CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error @@ -581,9 +582,9 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64) } // GaugeWithTimestamp measures the value of a metric at a given time. +// BETA - Please contact our support team for more information to use this feature: https://www.datadoghq.com/support/ // The value will bypass any aggregation on the client side and agent side, this is // useful when sending points in the past. -// Please report to the Datadog documentation for the maximum age of a metric. // // Minimum Datadog Agent version: 7.40.0 func (c *Client) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error { @@ -612,9 +613,9 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er } // CountWithTimestamp tracks how many times something happened at the given second. +// BETA - Please contact our support team for more information to use this feature: https://www.datadoghq.com/support/ // The value will bypass any aggregation on the client side and agent side, this is // useful when sending points in the past. -// Please report to the Datadog documentation for the maximum age of a metric. // // Minimum Datadog Agent version: 7.40.0 func (c *Client) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error {