Skip to content

Commit

Permalink
statsd: support sending metrics with timestamp.
Browse files Browse the repository at this point in the history
Adds two methods in the `ClientInterface` to support sending gauge and count with
timestamp `GaugeWithTimestamp` and `CountWithTimestamp`, for users aggregating metrics
on their side and who can provide a timestamp for a given metric.
  • Loading branch information
remeh committed Jul 28, 2022
1 parent 553de96 commit 597553e
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 38 deletions.
23 changes: 21 additions & 2 deletions statsd/aggregator.go
Expand Up @@ -14,6 +14,9 @@ type (
bufferedMetricMap map[string]*bufferedMetric
)

// noTimestamp is used as a value for metric without a given timestamp.
const noTimestamp = int64(0)

type aggregator struct {
nbContextGauge uint64
nbContextCount uint64
Expand Down Expand Up @@ -218,7 +221,15 @@ func (a *aggregator) count(name string, value int64, tags []string) error {
return nil
}

a.counts[context] = newCountMetric(name, value, tags)
a.counts[context] = newCountMetric(name, value, tags, noTimestamp)
a.countsM.Unlock()
return nil
}

func (a *aggregator) countWithTimestamp(name string, value int64, tags []string, timestamp int64) error {
context := getContext(name, tags)
a.countsM.Lock()
a.counts[context] = newCountMetric(name, value, tags, timestamp)
a.countsM.Unlock()
return nil
}
Expand All @@ -233,7 +244,7 @@ func (a *aggregator) gauge(name string, value float64, tags []string) error {
}
a.gaugesM.RUnlock()

gauge := newGaugeMetric(name, value, tags)
gauge := newGaugeMetric(name, value, tags, noTimestamp)

a.gaugesM.Lock()
// Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock'
Expand All @@ -247,6 +258,14 @@ func (a *aggregator) gauge(name string, value float64, tags []string) error {
return nil
}

func (a *aggregator) gaugeWithTimestamp(name string, value float64, tags []string, timestamp int64) error {
context := getContext(name, tags)
a.gaugesM.Lock()
a.gauges[context] = newGaugeMetric(name, value, tags, timestamp)
a.gaugesM.Unlock()
return nil
}

func (a *aggregator) set(name string, value string, tags []string) error {
context := getContext(name, tags)
a.setsM.RLock()
Expand Down
103 changes: 103 additions & 0 deletions statsd/aggregator_test.go
Expand Up @@ -5,6 +5,7 @@ import (
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -46,6 +47,36 @@ func TestAggregatorSample(t *testing.T) {
}
}

func TestAggregatorWithTimedSamples(t *testing.T) {
a := newAggregator(nil)

tags := []string{"tag1", "tag2"}

for i := 0; i < 2; i++ {
a.gauge("gaugeTest", 21, tags)
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")

for j := 0; j < 10; j++ {
// this one should only update the existing one, no new entry in a.gauges
a.gaugeWithTimestamp("gaugeTest", 21, tags, time.Now().Unix())
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")
}

a.count("countTest", 21, tags)
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")

for j := 0; j < 10; j++ {
// this one should only override the existing one, no new entry in a.counts
a.countWithTimestamp("countTest", 21, tags, time.Now().Unix())
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")
}
}
}

func TestAggregatorFlush(t *testing.T) {
a := newAggregator(nil)

Expand Down Expand Up @@ -195,6 +226,78 @@ func TestAggregatorFlush(t *testing.T) {
metrics)
}

func TestAggregatorFlushWithTimedSamplesMixed(t *testing.T) {
a := newAggregator(nil)

tags := []string{"tag1", "tag2"}

veryOld := time.Now().Add(-24 * time.Hour)
old := time.Now().Add(-6 * time.Hour)

a.gauge("gaugeTest1", 21, tags)
a.gaugeWithTimestamp("gaugeTest2", 10, tags, veryOld.Unix())
a.gaugeWithTimestamp("gaugeTest2", 15, tags, old.Unix())

a.count("countTest1", 44, tags)
a.countWithTimestamp("countTest2", 23, tags, veryOld.Unix())
a.countWithTimestamp("countTest2", 25, tags, old.Unix())

metrics := a.flushMetrics()

assert.Len(t, a.gauges, 0)
assert.Len(t, a.counts, 0)

assert.Len(t, metrics, 4)

sort.Slice(metrics, func(i, j int) bool {
if metrics[i].metricType == metrics[j].metricType {
res := strings.Compare(metrics[i].name, metrics[j].name)
// this happens fo set
if res == 0 {
return strings.Compare(metrics[i].svalue, metrics[j].svalue) != 1
}
return res != 1
}
return metrics[i].metricType < metrics[j].metricType
})

assert.Equal(t, []metric{
metric{
metricType: gauge,
name: "gaugeTest1",
tags: tags,
rate: 1,
fvalue: float64(21),
timestamp: 0,
},
metric{
metricType: gauge,
name: "gaugeTest2",
tags: tags,
rate: 1,
fvalue: float64(15),
timestamp: old.Unix(),
},
metric{
metricType: count,
name: "countTest1",
tags: tags,
rate: 1,
ivalue: 44,
timestamp: 0,
},
metric{
metricType: count,
name: "countTest2",
tags: tags,
rate: 1,
ivalue: 25,
timestamp: old.Unix(),
},
},
metrics)
}

func TestAggregatorFlushConcurrency(t *testing.T) {
a := newAggregator(nil)

Expand Down
10 changes: 8 additions & 2 deletions statsd/buffer.go
Expand Up @@ -39,22 +39,28 @@ func newStatsdBuffer(maxSize, maxElements int) *statsdBuffer {
}
}

func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error {
func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64, timestamp int64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendGauge(b.buffer, namespace, globalTags, name, value, tags, rate)
if timestamp != noTimestamp {
b.buffer = appendTimestamp(b.buffer, timestamp)
}
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}

func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64) error {
func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64, timestamp int64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendCount(b.buffer, namespace, globalTags, name, value, tags, rate)
if timestamp != noTimestamp {
b.buffer = appendTimestamp(b.buffer, timestamp)
}
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}
Expand Down
2 changes: 1 addition & 1 deletion statsd/buffer_pool_test.go
Expand Up @@ -33,7 +33,7 @@ func TestBufferPoolEmpty(t *testing.T) {
func TestBufferReturn(t *testing.T) {
bufferPool := newBufferPool(1, 1024, 20)
buffer := bufferPool.borrowBuffer()
buffer.writeCount("", nil, "", 1, nil, 1)
buffer.writeCount("", nil, "", 1, nil, 1, noTimestamp)

assert.Equal(t, 0, len(bufferPool.pool))
bufferPool.returnBuffer(buffer)
Expand Down
35 changes: 24 additions & 11 deletions statsd/buffer_test.go
Expand Up @@ -8,7 +8,7 @@ import (

func TestBufferGauge(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|g|#tag:tag\n", string(buffer.bytes()))

Expand All @@ -17,14 +17,21 @@ func TestBufferGauge(t *testing.T) {
defer resetContainerID()

buffer = newStatsdBuffer(1024, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|g|#tag:tag|c:container-id\n", string(buffer.bytes()))

// with a timestamp
buffer = newStatsdBuffer(1024, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, 1658934092)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|g|#tag:tag|c:container-id|T1658934092\n", string(buffer.bytes()))

}

func TestBufferCount(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
err := buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|c|#tag:tag\n", string(buffer.bytes()))

Expand All @@ -33,9 +40,15 @@ func TestBufferCount(t *testing.T) {
defer resetContainerID()

buffer = newStatsdBuffer(1024, 1)
err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|c|#tag:tag|c:container-id\n", string(buffer.bytes()))

// with a timestamp
buffer = newStatsdBuffer(1024, 1)
err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, 1658934092)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|c|#tag:tag|c:container-id|T1658934092\n", string(buffer.bytes()))
}

func TestBufferHistogram(t *testing.T) {
Expand Down Expand Up @@ -135,18 +148,18 @@ func TestBufferServiceCheck(t *testing.T) {

func TestBufferFullSize(t *testing.T) {
buffer := newStatsdBuffer(30, 10)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Len(t, buffer.bytes(), 30)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Equal(t, errBufferFull, err)
}

func TestBufferSeparator(t *testing.T) {
buffer := newStatsdBuffer(1024, 10)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|g|#tag:tag\nnamespace.metric:1|g|#tag:tag\n", string(buffer.bytes()))
}
Expand Down Expand Up @@ -223,13 +236,13 @@ func TestBufferAggregated(t *testing.T) {
func TestBufferMaxElement(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)

err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Nil(t, err)

err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Equal(t, errBufferFull, err)

err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1, noTimestamp)
assert.Equal(t, errBufferFull, err)

err = buffer.writeHistogram("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
Expand Down
6 changes: 6 additions & 0 deletions statsd/format.go
Expand Up @@ -270,3 +270,9 @@ func appendContainerID(buffer []byte) []byte {
}
return buffer
}

func appendTimestamp(buffer []byte, timestamp int64) []byte {
buffer = append(buffer, "|T"...)
buffer = strconv.AppendInt(buffer, timestamp, 10)
return buffer
}
34 changes: 20 additions & 14 deletions statsd/metrics.go
Expand Up @@ -14,16 +14,18 @@ Those are metrics type that can be aggregated on the client side:
*/

type countMetric struct {
value int64
name string
tags []string
value int64
timestamp int64
name string
tags []string
}

func newCountMetric(name string, value int64, tags []string) *countMetric {
func newCountMetric(name string, value int64, tags []string, timestamp int64) *countMetric {
return &countMetric{
value: value,
name: name,
tags: copySlice(tags),
value: value,
timestamp: timestamp,
name: name,
tags: copySlice(tags),
}
}

Expand All @@ -38,22 +40,25 @@ func (c *countMetric) flushUnsafe() metric {
tags: c.tags,
rate: 1,
ivalue: c.value,
timestamp: c.timestamp,
}
}

// Gauge

type gaugeMetric struct {
value uint64
name string
tags []string
value uint64
timestamp int64
name string
tags []string
}

func newGaugeMetric(name string, value float64, tags []string) *gaugeMetric {
func newGaugeMetric(name string, value float64, tags []string, timestamp int64) *gaugeMetric {
return &gaugeMetric{
value: math.Float64bits(value),
name: name,
tags: copySlice(tags),
value: math.Float64bits(value),
timestamp: timestamp,
name: name,
tags: copySlice(tags),
}
}

Expand All @@ -68,6 +73,7 @@ func (g *gaugeMetric) flushUnsafe() metric {
tags: g.tags,
rate: 1,
fvalue: math.Float64frombits(g.value),
timestamp: g.timestamp,
}
}

Expand Down

0 comments on commit 597553e

Please sign in to comment.