Skip to content

Commit

Permalink
Implement ChannelMode and sampling rate for extended aggregation
Browse files Browse the repository at this point in the history
When using extended aggregation we still want to respect sampling rate
for histograms, distribution and timing as it will have direct
consequences on the Agent (number of point to aggregate).

For apps sending a high number of metrics the aggregator implements
ChannelMode to avoid lock contention when generating random numbers.
  • Loading branch information
hush-hush committed Mar 16, 2021
1 parent fe76948 commit f2fb331
Show file tree
Hide file tree
Showing 11 changed files with 406 additions and 191 deletions.
2 changes: 2 additions & 0 deletions go.sum
@@ -1,6 +1,7 @@
github.com/Microsoft/go-winio v0.4.16/go.mod h1:XB6nPKklQyQ7GC9LdcBEcBl8PF76WugXOPRXwdLnMv0=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand All @@ -9,6 +10,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
Expand Down
139 changes: 63 additions & 76 deletions statsd/aggregator.go
Expand Up @@ -11,59 +11,9 @@ type (
countsMap map[string]*countMetric
gaugesMap map[string]*gaugeMetric
setsMap map[string]*setMetric
bufferedMetricMap map[string]*histogramMetric
bufferedMetricMap map[string]*bufferedMetric
)

// bufferedMetricContexts represent the contexts for Histograms, Distributions
// and Timing. Since those 3 metric types behave the same way and are sampled
// with the same type they're represented by the same class.
type bufferedMetricContexts struct {
nbContext int32
mutex sync.RWMutex
values bufferedMetricMap
newMetric func(string, float64, string) *bufferedMetric
}

func newBufferedContexts(newMetric func(string, float64, string) *bufferedMetric) bufferedMetricContexts {
return bufferedMetricContexts{
values: bufferedMetricMap{},
newMetric: newMetric,
}
}

func (bc *bufferedMetricContexts) flush(metrics []metric) []metric {
bc.mutex.Lock()
values := bc.values
bc.values = bufferedMetricMap{}
bc.mutex.Unlock()

for _, d := range values {
metrics = append(metrics, d.flushUnsafe())
}
atomic.AddInt32(&bc.nbContext, int32(len(values)))
return metrics
}

func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string) error {
context, stringTags := getContextAndTags(name, tags)
bc.mutex.RLock()
if v, found := bc.values[context]; found {
v.sample(value)
bc.mutex.RUnlock()
return nil
}
bc.mutex.RUnlock()

bc.mutex.Lock()
bc.values[context] = bc.newMetric(name, value, stringTags)
bc.mutex.Unlock()
return nil
}

func (bc *bufferedMetricContexts) resetAndGetNbContext() int32 {
return atomic.SwapInt32(&bc.nbContext, 0)
}

type aggregator struct {
nbContextGauge int32
nbContextCount int32
Expand All @@ -81,9 +31,15 @@ type aggregator struct {
timings bufferedMetricContexts

closed chan struct{}
exited chan struct{}

client *Client

// aggregator implements ChannelMode mechanism to receive histograms,
// distributions and timings. Since they need sampling they need to
// lock for random. When using both ChannelMode and ExtendedAggregation
// we don't want goroutine to fight over the lock.
inputMetrics chan metric
stopChannelMode chan struct{}
}

type aggregatorMetrics struct {
Expand All @@ -98,15 +54,15 @@ type aggregatorMetrics struct {

func newAggregator(c *Client) *aggregator {
return &aggregator{
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
histograms: newBufferedContexts(newHistogramMetric),
distributions: newBufferedContexts(newDistributionMetric),
timings: newBufferedContexts(newTimingMetric),
closed: make(chan struct{}),
exited: make(chan struct{}),
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
histograms: newBufferedContexts(newHistogramMetric),
distributions: newBufferedContexts(newDistributionMetric),
timings: newBufferedContexts(newTimingMetric),
closed: make(chan struct{}),
stopChannelMode: make(chan struct{}),
}
}

Expand All @@ -117,25 +73,49 @@ func (a *aggregator) start(flushInterval time.Duration) {
for {
select {
case <-ticker.C:
a.sendMetrics()
a.flush()
case <-a.closed:
close(a.exited)
return
}
}
}()
}

func (a *aggregator) sendMetrics() {
for _, m := range a.flushMetrics() {
a.client.send(m)
}
func (a *aggregator) startReceivingMetric(bufferSize int) {
a.inputMetrics = make(chan metric, bufferSize)
go a.pullMetric()
}

func (a *aggregator) stopReceivingMetric() {
a.stopChannelMode <- struct{}{}
}

func (a *aggregator) stop() {
close(a.closed)
<-a.exited
a.sendMetrics()
a.closed <- struct{}{}
}

func (a *aggregator) pullMetric() {
for {
select {
case m := <-a.inputMetrics:
switch m.metricType {
case histogram:
a.histogram(m.name, m.fvalue, m.tags, m.rate)
case distribution:
a.distribution(m.name, m.fvalue, m.tags, m.rate)
case timing:
a.timing(m.name, m.fvalue, m.tags, m.rate)
}
case <-a.stopChannelMode:
return
}
}
}

func (a *aggregator) flush() {
for _, m := range a.flushMetrics() {
a.client.sendBlocking(m)
}
}

func (a *aggregator) flushTelemetryMetrics() *aggregatorMetrics {
Expand Down Expand Up @@ -258,14 +238,21 @@ func (a *aggregator) set(name string, value string, tags []string) error {
return nil
}

func (a *aggregator) histogram(name string, value float64, tags []string) error {
return a.histograms.sample(name, value, tags)
// Only histograms, distributions and timings are sampled with a rate since we
// only pack them in on message instead of aggregating them. Discarding the
// sample rate will have impacts on the CPU and memory usage of the Agent.

// type alias for Client.sendToAggregator
type bufferedMetricSampleFunc func(name string, value float64, tags []string, rate float64) error

func (a *aggregator) histogram(name string, value float64, tags []string, rate float64) error {
return a.histograms.sample(name, value, tags, rate)
}

func (a *aggregator) distribution(name string, value float64, tags []string) error {
return a.distributions.sample(name, value, tags)
func (a *aggregator) distribution(name string, value float64, tags []string, rate float64) error {
return a.distributions.sample(name, value, tags, rate)
}

func (a *aggregator) timing(name string, value float64, tags []string) error {
return a.timings.sample(name, value, tags)
func (a *aggregator) timing(name string, value float64, tags []string, rate float64) error {
return a.timings.sample(name, value, tags, rate)
}
30 changes: 15 additions & 15 deletions statsd/aggregator_test.go
Expand Up @@ -31,15 +31,15 @@ func TestAggregatorSample(t *testing.T) {
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")

a.histogram("histogramTest", 21, tags)
a.histogram("histogramTest", 21, tags, 1)
assert.Len(t, a.histograms.values, 1)
assert.Contains(t, a.histograms.values, "histogramTest:tag1,tag2")

a.distribution("distributionTest", 21, tags)
a.distribution("distributionTest", 21, tags, 1)
assert.Len(t, a.distributions.values, 1)
assert.Contains(t, a.distributions.values, "distributionTest:tag1,tag2")

a.timing("timingTest", 21, tags)
a.timing("timingTest", 21, tags, 1)
assert.Len(t, a.timings.values, 1)
assert.Contains(t, a.timings.values, "timingTest:tag1,tag2")
}
Expand All @@ -63,17 +63,17 @@ func TestAggregatorFlush(t *testing.T) {
a.set("setTest1", "value2", tags)
a.set("setTest2", "value1", tags)

a.histogram("histogramTest1", 21, tags)
a.histogram("histogramTest1", 22, tags)
a.histogram("histogramTest2", 23, tags)
a.histogram("histogramTest1", 21, tags, 1)
a.histogram("histogramTest1", 22, tags, 1)
a.histogram("histogramTest2", 23, tags, 1)

a.distribution("distributionTest1", 21, tags)
a.distribution("distributionTest1", 22, tags)
a.distribution("distributionTest2", 23, tags)
a.distribution("distributionTest1", 21, tags, 1)
a.distribution("distributionTest1", 22, tags, 1)
a.distribution("distributionTest2", 23, tags, 1)

a.timing("timingTest1", 21, tags)
a.timing("timingTest1", 22, tags)
a.timing("timingTest2", 23, tags)
a.timing("timingTest1", 21, tags, 1)
a.timing("timingTest1", 22, tags, 1)
a.timing("timingTest2", 23, tags, 1)

metrics := a.flushMetrics()

Expand Down Expand Up @@ -210,9 +210,9 @@ func TestAggregatorFlushConcurrency(t *testing.T) {
a.gauge("gaugeTest1", 21, tags)
a.count("countTest1", 21, tags)
a.set("setTest1", "value1", tags)
a.histogram("histogramTest1", 21, tags)
a.distribution("distributionTest1", 21, tags)
a.timing("timingTest1", 21, tags)
a.histogram("histogramTest1", 21, tags, 1)
a.distribution("distributionTest1", 21, tags, 1)
a.timing("timingTest1", 21, tags, 1)
}()
}

Expand Down
75 changes: 75 additions & 0 deletions statsd/buffered_metric_context.go
@@ -0,0 +1,75 @@
package statsd

import (
"math/rand"
"sync"
"sync/atomic"
"time"
)

// bufferedMetricContexts represent the contexts for Histograms, Distributions
// and Timing. Since those 3 metric types behave the same way and are sampled
// with the same type they're represented by the same class.
type bufferedMetricContexts struct {
nbContext int32
mutex sync.RWMutex
values bufferedMetricMap
newMetric func(string, float64, string) *bufferedMetric

// Each bufferedMetricContexts uses its own random source and random
// lock to prevent goroutines from contending for the lock on the
// "math/rand" package-global random source (e.g. calls like
// "rand.Float64()" must acquire a shared lock to get the next
// pseudorandom number).
random *rand.Rand
randomLock sync.Mutex
}

func newBufferedContexts(newMetric func(string, float64, string) *bufferedMetric) bufferedMetricContexts {
return bufferedMetricContexts{
values: bufferedMetricMap{},
newMetric: newMetric,
// Note that calling "time.Now().UnixNano()" repeatedly quickly may return
// very similar values. That's fine for seeding the worker-specific random
// source because we just need an evenly distributed stream of float values.
// Do not use this random source for cryptographic randomness.
random: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

func (bc *bufferedMetricContexts) flush(metrics []metric) []metric {
bc.mutex.Lock()
values := bc.values
bc.values = bufferedMetricMap{}
bc.mutex.Unlock()

for _, d := range values {
metrics = append(metrics, d.flushUnsafe())
}
atomic.AddInt32(&bc.nbContext, int32(len(values)))
return metrics
}

func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string, rate float64) error {
if !shouldSample(rate, bc.random, &bc.randomLock) {
return nil
}

context, stringTags := getContextAndTags(name, tags)
bc.mutex.RLock()
if v, found := bc.values[context]; found {
v.sample(value)
bc.mutex.RUnlock()
return nil
}
bc.mutex.RUnlock()

bc.mutex.Lock()
bc.values[context] = bc.newMetric(name, value, stringTags)
bc.mutex.Unlock()
return nil
}

func (bc *bufferedMetricContexts) resetAndGetNbContext() int32 {
return atomic.SwapInt32(&bc.nbContext, 0)
}
4 changes: 4 additions & 0 deletions statsd/options.go
@@ -1,6 +1,7 @@
package statsd

import (
"fmt"
"math"
"strings"
"time"
Expand Down Expand Up @@ -199,6 +200,9 @@ func WithBufferFlushInterval(bufferFlushInterval time.Duration) Option {
// WithBufferShardCount sets the BufferShardCount option.
func WithBufferShardCount(bufferShardCount int) Option {
return func(o *Options) error {
if bufferShardCount < 1 {
return fmt.Errorf("BufferShardCount must be a positif integer")
}
o.BufferShardCount = bufferShardCount
return nil
}
Expand Down

0 comments on commit f2fb331

Please sign in to comment.