Skip to content

Commit

Permalink
statsd: the metrics with timestamp are directly written in the serial…
Browse files Browse the repository at this point in the history
…ization buffer.

They are not going through the client aggregator.
  • Loading branch information
remeh committed Jul 28, 2022
1 parent 97e39ef commit a64f8dc
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 130 deletions.
16 changes: 0 additions & 16 deletions statsd/aggregator.go
Expand Up @@ -226,14 +226,6 @@ func (a *aggregator) count(name string, value int64, tags []string) error {
return nil
}

func (a *aggregator) countWithTimestamp(name string, value int64, tags []string, timestamp int64) error {
context := getContext(name, tags)
a.countsM.Lock()
a.counts[context] = newCountMetric(name, value, tags, timestamp)
a.countsM.Unlock()
return nil
}

func (a *aggregator) gauge(name string, value float64, tags []string) error {
context := getContext(name, tags)
a.gaugesM.RLock()
Expand All @@ -258,14 +250,6 @@ func (a *aggregator) gauge(name string, value float64, tags []string) error {
return nil
}

func (a *aggregator) gaugeWithTimestamp(name string, value float64, tags []string, timestamp int64) error {
context := getContext(name, tags)
a.gaugesM.Lock()
a.gauges[context] = newGaugeMetric(name, value, tags, timestamp)
a.gaugesM.Unlock()
return nil
}

func (a *aggregator) set(name string, value string, tags []string) error {
context := getContext(name, tags)
a.setsM.RLock()
Expand Down
103 changes: 0 additions & 103 deletions statsd/aggregator_test.go
Expand Up @@ -5,7 +5,6 @@ import (
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -47,36 +46,6 @@ func TestAggregatorSample(t *testing.T) {
}
}

func TestAggregatorWithTimedSamples(t *testing.T) {
a := newAggregator(nil)

tags := []string{"tag1", "tag2"}

for i := 0; i < 2; i++ {
a.gauge("gaugeTest", 21, tags)
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")

for j := 0; j < 10; j++ {
// this one should only update the existing one, no new entry in a.gauges
a.gaugeWithTimestamp("gaugeTest", 21, tags, time.Now().Unix())
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")
}

a.count("countTest", 21, tags)
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")

for j := 0; j < 10; j++ {
// this one should only override the existing one, no new entry in a.counts
a.countWithTimestamp("countTest", 21, tags, time.Now().Unix())
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")
}
}
}

func TestAggregatorFlush(t *testing.T) {
a := newAggregator(nil)

Expand Down Expand Up @@ -226,78 +195,6 @@ func TestAggregatorFlush(t *testing.T) {
metrics)
}

func TestAggregatorFlushWithTimedSamplesMixed(t *testing.T) {
a := newAggregator(nil)

tags := []string{"tag1", "tag2"}

veryOld := time.Now().Add(-24 * time.Hour)
old := time.Now().Add(-6 * time.Hour)

a.gauge("gaugeTest1", 21, tags)
a.gaugeWithTimestamp("gaugeTest2", 10, tags, veryOld.Unix())
a.gaugeWithTimestamp("gaugeTest2", 15, tags, old.Unix())

a.count("countTest1", 44, tags)
a.countWithTimestamp("countTest2", 23, tags, veryOld.Unix())
a.countWithTimestamp("countTest2", 25, tags, old.Unix())

metrics := a.flushMetrics()

assert.Len(t, a.gauges, 0)
assert.Len(t, a.counts, 0)

assert.Len(t, metrics, 4)

sort.Slice(metrics, func(i, j int) bool {
if metrics[i].metricType == metrics[j].metricType {
res := strings.Compare(metrics[i].name, metrics[j].name)
// this happens fo set
if res == 0 {
return strings.Compare(metrics[i].svalue, metrics[j].svalue) != 1
}
return res != 1
}
return metrics[i].metricType < metrics[j].metricType
})

assert.Equal(t, []metric{
metric{
metricType: gauge,
name: "gaugeTest1",
tags: tags,
rate: 1,
fvalue: float64(21),
timestamp: 0,
},
metric{
metricType: gauge,
name: "gaugeTest2",
tags: tags,
rate: 1,
fvalue: float64(15),
timestamp: old.Unix(),
},
metric{
metricType: count,
name: "countTest1",
tags: tags,
rate: 1,
ivalue: 44,
timestamp: 0,
},
metric{
metricType: count,
name: "countTest2",
tags: tags,
rate: 1,
ivalue: 25,
timestamp: old.Unix(),
},
},
metrics)
}

func TestAggregatorFlushConcurrency(t *testing.T) {
a := newAggregator(nil)

Expand Down
22 changes: 11 additions & 11 deletions statsd/statsd.go
Expand Up @@ -161,12 +161,16 @@ type ClientInterface interface {
Gauge(name string, value float64, tags []string, rate float64) error

// Gauge measures the value of a metric at a given time.
// Even with client side aggregation enabled, there is no aggregation done on a metric
// sent using GaugeWithTimestamp: it is written as is in the serialization buffer.
GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error

// Count tracks how many times something happened per second.
Count(name string, value int64, tags []string, rate float64) error

// Count tracks how many times something happened at the given second.
// CountWithTimestamp tracks how many times something happened at the given second.
// Even with client side aggregation enabled, there is no aggregation done on a metric
// sent using CountWithTimestamp: it is written as is in the serialization buffer.
CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error

// Histogram tracks the statistical distribution of a set of values on each host.
Expand Down Expand Up @@ -560,16 +564,14 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64)
}

// Gauge measures the value of a metric at a given time.
// Even with client side aggregation enabled, there is no aggregation done on a metric
// sent using GaugeWithTimestamp: it is written as is in the serialization buffer.
func (c *Client) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error {
if c == nil {
return ErrNoClient
}
ts := timestamp.Unix()
atomic.AddUint64(&c.telemetry.totalMetricsGauge, 1)
if c.agg != nil {
return c.agg.gaugeWithTimestamp(name, value, tags, ts)
}
return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: ts})
return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix()})
}

// Count tracks how many times something happened per second.
Expand All @@ -585,16 +587,14 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er
}

// Count tracks how many times something happened at the given second.
// Even with client side aggregation enabled, there is no aggregation done on a metric
// sent using CountWithTimestamp: it is written as is in the serialization buffer.
func (c *Client) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error {
if c == nil {
return ErrNoClient
}
ts := timestamp.Unix()
atomic.AddUint64(&c.telemetry.totalMetricsCount, 1)
if c.agg != nil {
return c.agg.countWithTimestamp(name, value, tags, ts)
}
return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: ts})
return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix()})
}

// Histogram tracks the statistical distribution of a set of values on each host.
Expand Down

0 comments on commit a64f8dc

Please sign in to comment.