Skip to content

Commit

Permalink
Drop sampling rate when aggregation is enabled
Browse files Browse the repository at this point in the history
Mixing sampling rate and aggregation doesn't make sens and could be
erroneous if users sends the same metrics with different sampling rate.
  • Loading branch information
hush-hush committed Oct 15, 2020
1 parent c7df0ea commit 1e50f7d
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 61 deletions.
12 changes: 6 additions & 6 deletions statsd/aggregator.go
Expand Up @@ -127,7 +127,7 @@ func getContext(name string, tags []string) string {
return name + ":" + strings.Join(tags, ",")
}

func (a *aggregator) count(name string, value int64, tags []string, rate float64) error {
func (a *aggregator) count(name string, value int64, tags []string) error {
context := getContext(name, tags)
a.countsM.RLock()
if count, found := a.counts[context]; found {
Expand All @@ -138,12 +138,12 @@ func (a *aggregator) count(name string, value int64, tags []string, rate float64
a.countsM.RUnlock()

a.countsM.Lock()
a.counts[context] = newCountMetric(name, value, tags, rate)
a.counts[context] = newCountMetric(name, value, tags)
a.countsM.Unlock()
return nil
}

func (a *aggregator) gauge(name string, value float64, tags []string, rate float64) error {
func (a *aggregator) gauge(name string, value float64, tags []string) error {
context := getContext(name, tags)
a.gaugesM.RLock()
if gauge, found := a.gauges[context]; found {
Expand All @@ -153,15 +153,15 @@ func (a *aggregator) gauge(name string, value float64, tags []string, rate float
}
a.gaugesM.RUnlock()

gauge := newGaugeMetric(name, value, tags, rate)
gauge := newGaugeMetric(name, value, tags)

a.gaugesM.Lock()
a.gauges[context] = gauge
a.gaugesM.Unlock()
return nil
}

func (a *aggregator) set(name string, value string, tags []string, rate float64) error {
func (a *aggregator) set(name string, value string, tags []string) error {
context := getContext(name, tags)
a.setsM.RLock()
if set, found := a.sets[context]; found {
Expand All @@ -172,7 +172,7 @@ func (a *aggregator) set(name string, value string, tags []string, rate float64)
a.setsM.RUnlock()

a.setsM.Lock()
a.sets[context] = newSetMetric(name, value, tags, rate)
a.sets[context] = newSetMetric(name, value, tags)
a.setsM.Unlock()
return nil
}
38 changes: 19 additions & 19 deletions statsd/aggregator_test.go
Expand Up @@ -14,27 +14,27 @@ func TestAggregatorSample(t *testing.T) {

tags := []string{"tag1", "tag2"}

a.gauge("gaugeTest", 21, tags, 1)
a.gauge("gaugeTest", 21, tags)
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")

a.count("countTest", 21, tags, 1)
a.count("countTest", 21, tags)
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")

a.set("setTest", "value1", tags, 1)
a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")

a.gauge("gaugeTest", 123, tags, 1)
a.gauge("gaugeTest", 123, tags)
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")

a.count("countTest", 10, tags, 1)
a.count("countTest", 10, tags)
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")

a.set("setTest", "value1", tags, 1)
a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")
}
Expand All @@ -44,18 +44,18 @@ func TestAggregatorFlush(t *testing.T) {

tags := []string{"tag1", "tag2"}

a.gauge("gaugeTest1", 21, tags, 1)
a.gauge("gaugeTest1", 10, tags, 1)
a.gauge("gaugeTest2", 15, tags, 1)
a.gauge("gaugeTest1", 21, tags)
a.gauge("gaugeTest1", 10, tags)
a.gauge("gaugeTest2", 15, tags)

a.count("countTest1", 21, tags, 1)
a.count("countTest1", 10, tags, 1)
a.count("countTest2", 1, tags, 1)
a.count("countTest1", 21, tags)
a.count("countTest1", 10, tags)
a.count("countTest2", 1, tags)

a.set("setTest1", "value1", tags, 1)
a.set("setTest1", "value1", tags, 1)
a.set("setTest1", "value2", tags, 1)
a.set("setTest2", "value1", tags, 1)
a.set("setTest1", "value1", tags)
a.set("setTest1", "value1", tags)
a.set("setTest1", "value2", tags)
a.set("setTest2", "value1", tags)

metrics := a.flushMetrics()

Expand Down Expand Up @@ -143,9 +143,9 @@ func TestAggregatorFlushConcurrency(t *testing.T) {
go func() {
defer wg.Done()

a.gauge("gaugeTest1", 21, tags, 1)
a.count("countTest1", 21, tags, 1)
a.set("setTest1", "value1", tags, 1)
a.gauge("gaugeTest1", 21, tags)
a.count("countTest1", 21, tags)
a.set("setTest1", "value1", tags)
}()
}

Expand Down
16 changes: 5 additions & 11 deletions statsd/metrics.go
Expand Up @@ -17,15 +17,13 @@ type countMetric struct {
value int64
name string
tags []string
rate float64
}

func newCountMetric(name string, value int64, tags []string, rate float64) *countMetric {
func newCountMetric(name string, value int64, tags []string) *countMetric {
return &countMetric{
value: value,
name: name,
tags: tags,
rate: rate,
}
}

Expand All @@ -38,7 +36,7 @@ func (c *countMetric) flushUnsafe() metric {
metricType: count,
name: c.name,
tags: c.tags,
rate: c.rate,
rate: 1,
ivalue: c.value,
}
}
Expand All @@ -49,15 +47,13 @@ type gaugeMetric struct {
value uint64
name string
tags []string
rate float64
}

func newGaugeMetric(name string, value float64, tags []string, rate float64) *gaugeMetric {
func newGaugeMetric(name string, value float64, tags []string) *gaugeMetric {
return &gaugeMetric{
value: math.Float64bits(value),
name: name,
tags: tags,
rate: rate,
}
}

Expand All @@ -70,7 +66,7 @@ func (g *gaugeMetric) flushUnsafe() metric {
metricType: gauge,
name: g.name,
tags: g.tags,
rate: g.rate,
rate: 1,
fvalue: math.Float64frombits(g.value),
}
}
Expand All @@ -81,16 +77,14 @@ type setMetric struct {
data map[string]struct{}
name string
tags []string
rate float64
sync.Mutex
}

func newSetMetric(name string, value string, tags []string, rate float64) *setMetric {
func newSetMetric(name string, value string, tags []string) *setMetric {
set := &setMetric{
data: map[string]struct{}{},
name: name,
tags: tags,
rate: rate,
}
set.data[value] = struct{}{}
return set
Expand Down
31 changes: 9 additions & 22 deletions statsd/metrics_test.go
Expand Up @@ -11,94 +11,84 @@ import (
)

func TestNewCountMetric(t *testing.T) {
c := newCountMetric("test", 21, []string{"tag1", "tag2"}, 1)
c := newCountMetric("test", 21, []string{"tag1", "tag2"})
assert.Equal(t, c.value, int64(21))
assert.Equal(t, c.name, "test")
assert.Equal(t, c.tags, []string{"tag1", "tag2"})
assert.Equal(t, c.rate, 1.0)
}

func TestCountMetricSample(t *testing.T) {
c := newCountMetric("test", 21, []string{"tag1", "tag2"}, 1)
c := newCountMetric("test", 21, []string{"tag1", "tag2"})
c.sample(12)
assert.Equal(t, c.value, int64(33))
assert.Equal(t, c.name, "test")
assert.Equal(t, c.tags, []string{"tag1", "tag2"})
assert.Equal(t, c.rate, 1.0)
}

func TestFlushUnsafeCountMetricSample(t *testing.T) {
c := newCountMetric("test", 21, []string{"tag1", "tag2"}, 1)
c := newCountMetric("test", 21, []string{"tag1", "tag2"})
m := c.flushUnsafe()
assert.Equal(t, m.metricType, count)
assert.Equal(t, m.ivalue, int64(21))
assert.Equal(t, m.name, "test")
assert.Equal(t, m.tags, []string{"tag1", "tag2"})
assert.Equal(t, m.rate, 1.0)

c.sample(12)
m = c.flushUnsafe()
assert.Equal(t, m.metricType, count)
assert.Equal(t, m.ivalue, int64(33))
assert.Equal(t, m.name, "test")
assert.Equal(t, m.tags, []string{"tag1", "tag2"})
assert.Equal(t, m.rate, 1.0)
}

func TestNewGaugeMetric(t *testing.T) {
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, 1)
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"})
assert.Equal(t, math.Float64frombits(g.value), float64(21))
assert.Equal(t, g.name, "test")
assert.Equal(t, g.tags, []string{"tag1", "tag2"})
assert.Equal(t, g.rate, 1.0)
}

func TestGaugeMetricSample(t *testing.T) {
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, 1)
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"})
g.sample(12)
assert.Equal(t, math.Float64frombits(g.value), float64(12))
assert.Equal(t, g.name, "test")
assert.Equal(t, g.tags, []string{"tag1", "tag2"})
assert.Equal(t, g.rate, 1.0)
}

func TestFlushUnsafeGaugeMetricSample(t *testing.T) {
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, 1)
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"})
m := g.flushUnsafe()
assert.Equal(t, m.metricType, gauge)
assert.Equal(t, m.fvalue, float64(21))
assert.Equal(t, m.name, "test")
assert.Equal(t, m.tags, []string{"tag1", "tag2"})
assert.Equal(t, m.rate, 1.0)

g.sample(12)
m = g.flushUnsafe()
assert.Equal(t, m.metricType, gauge)
assert.Equal(t, m.fvalue, float64(12))
assert.Equal(t, m.name, "test")
assert.Equal(t, m.tags, []string{"tag1", "tag2"})
assert.Equal(t, m.rate, 1.0)
}

func TestNewSetMetric(t *testing.T) {
s := newSetMetric("test", "value1", []string{"tag1", "tag2"}, 1)
s := newSetMetric("test", "value1", []string{"tag1", "tag2"})
assert.Equal(t, s.data, map[string]struct{}{"value1": struct{}{}})
assert.Equal(t, s.name, "test")
assert.Equal(t, s.tags, []string{"tag1", "tag2"})
assert.Equal(t, s.rate, 1.0)
}

func TestSetMetricSample(t *testing.T) {
s := newSetMetric("test", "value1", []string{"tag1", "tag2"}, 1)
s := newSetMetric("test", "value1", []string{"tag1", "tag2"})
s.sample("value2")
assert.Equal(t, s.data, map[string]struct{}{"value1": struct{}{}, "value2": struct{}{}})
assert.Equal(t, s.name, "test")
assert.Equal(t, s.tags, []string{"tag1", "tag2"})
assert.Equal(t, s.rate, 1.0)
}

func TestFlushUnsafeSetMetricSample(t *testing.T) {
s := newSetMetric("test", "value1", []string{"tag1", "tag2"}, 1)
s := newSetMetric("test", "value1", []string{"tag1", "tag2"})
m := s.flushUnsafe()

require.Len(t, m, 1)
Expand All @@ -107,7 +97,6 @@ func TestFlushUnsafeSetMetricSample(t *testing.T) {
assert.Equal(t, m[0].svalue, "value1")
assert.Equal(t, m[0].name, "test")
assert.Equal(t, m[0].tags, []string{"tag1", "tag2"})
assert.Equal(t, m[0].rate, 1.0)

s.sample("value1")
s.sample("value2")
Expand All @@ -122,10 +111,8 @@ func TestFlushUnsafeSetMetricSample(t *testing.T) {
assert.Equal(t, m[0].svalue, "value1")
assert.Equal(t, m[0].name, "test")
assert.Equal(t, m[0].tags, []string{"tag1", "tag2"})
assert.Equal(t, m[0].rate, 1.0)
assert.Equal(t, m[1].metricType, set)
assert.Equal(t, m[1].svalue, "value2")
assert.Equal(t, m[1].name, "test")
assert.Equal(t, m[1].tags, []string{"tag1", "tag2"})
assert.Equal(t, m[1].rate, 1.0)
}
6 changes: 3 additions & 3 deletions statsd/statsd.go
Expand Up @@ -433,7 +433,7 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64)
}
atomic.AddUint64(&c.metrics.TotalMetrics, 1)
if c.agg != nil {
return c.agg.gauge(name, value, tags, rate)
return c.agg.gauge(name, value, tags)
}
return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate})
}
Expand All @@ -445,7 +445,7 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er
}
atomic.AddUint64(&c.metrics.TotalMetrics, 1)
if c.agg != nil {
return c.agg.count(name, value, tags, rate)
return c.agg.count(name, value, tags)
}
return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate})
}
Expand Down Expand Up @@ -485,7 +485,7 @@ func (c *Client) Set(name string, value string, tags []string, rate float64) err
}
atomic.AddUint64(&c.metrics.TotalMetrics, 1)
if c.agg != nil {
return c.agg.set(name, value, tags, rate)
return c.agg.set(name, value, tags)
}
return c.send(metric{metricType: set, name: name, svalue: value, tags: tags, rate: rate})
}
Expand Down

0 comments on commit 1e50f7d

Please sign in to comment.