From d052db788050e039d17e15cdff944290b4950878 Mon Sep 17 00:00:00 2001 From: maxime mouial Date: Thu, 15 Apr 2021 14:37:49 +0200 Subject: [PATCH] Implement ChannelMode and sampling rate for extended aggregation (#187) When using extended aggregation we still want to respect sampling rate for histograms, distribution and timing as it will have direct consequences on the Agent (number of point to aggregate). For apps sending a high number of metrics the aggregator implements ChannelMode to avoid lock contention when generating random numbers. --- statsd/aggregator.go | 139 +++++++++---------- statsd/aggregator_test.go | 30 ++--- statsd/buffered_metric_context.go | 75 +++++++++++ statsd/options.go | 4 + statsd/statsd.go | 87 ++++++++---- statsd/statsd_test.go | 216 ++++++++++++++++++++++-------- statsd/telemetry_test.go | 2 +- statsd/utils.go | 23 ++++ statsd/worker.go | 15 +-- statsd/worker_test.go | 4 +- 10 files changed, 404 insertions(+), 191 deletions(-) create mode 100644 statsd/buffered_metric_context.go create mode 100644 statsd/utils.go diff --git a/statsd/aggregator.go b/statsd/aggregator.go index 7d1d44f5..2adb5fa2 100644 --- a/statsd/aggregator.go +++ b/statsd/aggregator.go @@ -11,59 +11,9 @@ type ( countsMap map[string]*countMetric gaugesMap map[string]*gaugeMetric setsMap map[string]*setMetric - bufferedMetricMap map[string]*histogramMetric + bufferedMetricMap map[string]*bufferedMetric ) -// bufferedMetricContexts represent the contexts for Histograms, Distributions -// and Timing. Since those 3 metric types behave the same way and are sampled -// with the same type they're represented by the same class. -type bufferedMetricContexts struct { - nbContext int32 - mutex sync.RWMutex - values bufferedMetricMap - newMetric func(string, float64, string) *bufferedMetric -} - -func newBufferedContexts(newMetric func(string, float64, string) *bufferedMetric) bufferedMetricContexts { - return bufferedMetricContexts{ - values: bufferedMetricMap{}, - newMetric: newMetric, - } -} - -func (bc *bufferedMetricContexts) flush(metrics []metric) []metric { - bc.mutex.Lock() - values := bc.values - bc.values = bufferedMetricMap{} - bc.mutex.Unlock() - - for _, d := range values { - metrics = append(metrics, d.flushUnsafe()) - } - atomic.AddInt32(&bc.nbContext, int32(len(values))) - return metrics -} - -func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string) error { - context, stringTags := getContextAndTags(name, tags) - bc.mutex.RLock() - if v, found := bc.values[context]; found { - v.sample(value) - bc.mutex.RUnlock() - return nil - } - bc.mutex.RUnlock() - - bc.mutex.Lock() - bc.values[context] = bc.newMetric(name, value, stringTags) - bc.mutex.Unlock() - return nil -} - -func (bc *bufferedMetricContexts) resetAndGetNbContext() int32 { - return atomic.SwapInt32(&bc.nbContext, 0) -} - type aggregator struct { nbContextGauge int32 nbContextCount int32 @@ -81,9 +31,15 @@ type aggregator struct { timings bufferedMetricContexts closed chan struct{} - exited chan struct{} client *Client + + // aggregator implements ChannelMode mechanism to receive histograms, + // distributions and timings. Since they need sampling they need to + // lock for random. When using both ChannelMode and ExtendedAggregation + // we don't want goroutine to fight over the lock. + inputMetrics chan metric + stopChannelMode chan struct{} } type aggregatorMetrics struct { @@ -98,15 +54,15 @@ type aggregatorMetrics struct { func newAggregator(c *Client) *aggregator { return &aggregator{ - client: c, - counts: countsMap{}, - gauges: gaugesMap{}, - sets: setsMap{}, - histograms: newBufferedContexts(newHistogramMetric), - distributions: newBufferedContexts(newDistributionMetric), - timings: newBufferedContexts(newTimingMetric), - closed: make(chan struct{}), - exited: make(chan struct{}), + client: c, + counts: countsMap{}, + gauges: gaugesMap{}, + sets: setsMap{}, + histograms: newBufferedContexts(newHistogramMetric), + distributions: newBufferedContexts(newDistributionMetric), + timings: newBufferedContexts(newTimingMetric), + closed: make(chan struct{}), + stopChannelMode: make(chan struct{}), } } @@ -117,25 +73,49 @@ func (a *aggregator) start(flushInterval time.Duration) { for { select { case <-ticker.C: - a.sendMetrics() + a.flush() case <-a.closed: - close(a.exited) return } } }() } -func (a *aggregator) sendMetrics() { - for _, m := range a.flushMetrics() { - a.client.send(m) - } +func (a *aggregator) startReceivingMetric(bufferSize int) { + a.inputMetrics = make(chan metric, bufferSize) + go a.pullMetric() +} + +func (a *aggregator) stopReceivingMetric() { + a.stopChannelMode <- struct{}{} } func (a *aggregator) stop() { - close(a.closed) - <-a.exited - a.sendMetrics() + a.closed <- struct{}{} +} + +func (a *aggregator) pullMetric() { + for { + select { + case m := <-a.inputMetrics: + switch m.metricType { + case histogram: + a.histogram(m.name, m.fvalue, m.tags, m.rate) + case distribution: + a.distribution(m.name, m.fvalue, m.tags, m.rate) + case timing: + a.timing(m.name, m.fvalue, m.tags, m.rate) + } + case <-a.stopChannelMode: + return + } + } +} + +func (a *aggregator) flush() { + for _, m := range a.flushMetrics() { + a.client.sendBlocking(m) + } } func (a *aggregator) flushTelemetryMetrics() *aggregatorMetrics { @@ -258,14 +238,21 @@ func (a *aggregator) set(name string, value string, tags []string) error { return nil } -func (a *aggregator) histogram(name string, value float64, tags []string) error { - return a.histograms.sample(name, value, tags) +// Only histograms, distributions and timings are sampled with a rate since we +// only pack them in on message instead of aggregating them. Discarding the +// sample rate will have impacts on the CPU and memory usage of the Agent. + +// type alias for Client.sendToAggregator +type bufferedMetricSampleFunc func(name string, value float64, tags []string, rate float64) error + +func (a *aggregator) histogram(name string, value float64, tags []string, rate float64) error { + return a.histograms.sample(name, value, tags, rate) } -func (a *aggregator) distribution(name string, value float64, tags []string) error { - return a.distributions.sample(name, value, tags) +func (a *aggregator) distribution(name string, value float64, tags []string, rate float64) error { + return a.distributions.sample(name, value, tags, rate) } -func (a *aggregator) timing(name string, value float64, tags []string) error { - return a.timings.sample(name, value, tags) +func (a *aggregator) timing(name string, value float64, tags []string, rate float64) error { + return a.timings.sample(name, value, tags, rate) } diff --git a/statsd/aggregator_test.go b/statsd/aggregator_test.go index 4eb50c98..99290115 100644 --- a/statsd/aggregator_test.go +++ b/statsd/aggregator_test.go @@ -31,15 +31,15 @@ func TestAggregatorSample(t *testing.T) { assert.Len(t, a.sets, 1) assert.Contains(t, a.sets, "setTest:tag1,tag2") - a.histogram("histogramTest", 21, tags) + a.histogram("histogramTest", 21, tags, 1) assert.Len(t, a.histograms.values, 1) assert.Contains(t, a.histograms.values, "histogramTest:tag1,tag2") - a.distribution("distributionTest", 21, tags) + a.distribution("distributionTest", 21, tags, 1) assert.Len(t, a.distributions.values, 1) assert.Contains(t, a.distributions.values, "distributionTest:tag1,tag2") - a.timing("timingTest", 21, tags) + a.timing("timingTest", 21, tags, 1) assert.Len(t, a.timings.values, 1) assert.Contains(t, a.timings.values, "timingTest:tag1,tag2") } @@ -63,17 +63,17 @@ func TestAggregatorFlush(t *testing.T) { a.set("setTest1", "value2", tags) a.set("setTest2", "value1", tags) - a.histogram("histogramTest1", 21, tags) - a.histogram("histogramTest1", 22, tags) - a.histogram("histogramTest2", 23, tags) + a.histogram("histogramTest1", 21, tags, 1) + a.histogram("histogramTest1", 22, tags, 1) + a.histogram("histogramTest2", 23, tags, 1) - a.distribution("distributionTest1", 21, tags) - a.distribution("distributionTest1", 22, tags) - a.distribution("distributionTest2", 23, tags) + a.distribution("distributionTest1", 21, tags, 1) + a.distribution("distributionTest1", 22, tags, 1) + a.distribution("distributionTest2", 23, tags, 1) - a.timing("timingTest1", 21, tags) - a.timing("timingTest1", 22, tags) - a.timing("timingTest2", 23, tags) + a.timing("timingTest1", 21, tags, 1) + a.timing("timingTest1", 22, tags, 1) + a.timing("timingTest2", 23, tags, 1) metrics := a.flushMetrics() @@ -210,9 +210,9 @@ func TestAggregatorFlushConcurrency(t *testing.T) { a.gauge("gaugeTest1", 21, tags) a.count("countTest1", 21, tags) a.set("setTest1", "value1", tags) - a.histogram("histogramTest1", 21, tags) - a.distribution("distributionTest1", 21, tags) - a.timing("timingTest1", 21, tags) + a.histogram("histogramTest1", 21, tags, 1) + a.distribution("distributionTest1", 21, tags, 1) + a.timing("timingTest1", 21, tags, 1) }() } diff --git a/statsd/buffered_metric_context.go b/statsd/buffered_metric_context.go new file mode 100644 index 00000000..ea099f27 --- /dev/null +++ b/statsd/buffered_metric_context.go @@ -0,0 +1,75 @@ +package statsd + +import ( + "math/rand" + "sync" + "sync/atomic" + "time" +) + +// bufferedMetricContexts represent the contexts for Histograms, Distributions +// and Timing. Since those 3 metric types behave the same way and are sampled +// with the same type they're represented by the same class. +type bufferedMetricContexts struct { + nbContext int32 + mutex sync.RWMutex + values bufferedMetricMap + newMetric func(string, float64, string) *bufferedMetric + + // Each bufferedMetricContexts uses its own random source and random + // lock to prevent goroutines from contending for the lock on the + // "math/rand" package-global random source (e.g. calls like + // "rand.Float64()" must acquire a shared lock to get the next + // pseudorandom number). + random *rand.Rand + randomLock sync.Mutex +} + +func newBufferedContexts(newMetric func(string, float64, string) *bufferedMetric) bufferedMetricContexts { + return bufferedMetricContexts{ + values: bufferedMetricMap{}, + newMetric: newMetric, + // Note that calling "time.Now().UnixNano()" repeatedly quickly may return + // very similar values. That's fine for seeding the worker-specific random + // source because we just need an evenly distributed stream of float values. + // Do not use this random source for cryptographic randomness. + random: rand.New(rand.NewSource(time.Now().UnixNano())), + } +} + +func (bc *bufferedMetricContexts) flush(metrics []metric) []metric { + bc.mutex.Lock() + values := bc.values + bc.values = bufferedMetricMap{} + bc.mutex.Unlock() + + for _, d := range values { + metrics = append(metrics, d.flushUnsafe()) + } + atomic.AddInt32(&bc.nbContext, int32(len(values))) + return metrics +} + +func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string, rate float64) error { + if !shouldSample(rate, bc.random, &bc.randomLock) { + return nil + } + + context, stringTags := getContextAndTags(name, tags) + bc.mutex.RLock() + if v, found := bc.values[context]; found { + v.sample(value) + bc.mutex.RUnlock() + return nil + } + bc.mutex.RUnlock() + + bc.mutex.Lock() + bc.values[context] = bc.newMetric(name, value, stringTags) + bc.mutex.Unlock() + return nil +} + +func (bc *bufferedMetricContexts) resetAndGetNbContext() int32 { + return atomic.SwapInt32(&bc.nbContext, 0) +} diff --git a/statsd/options.go b/statsd/options.go index 886a8238..b8dd7f5c 100644 --- a/statsd/options.go +++ b/statsd/options.go @@ -1,6 +1,7 @@ package statsd import ( + "fmt" "math" "strings" "time" @@ -199,6 +200,9 @@ func WithBufferFlushInterval(bufferFlushInterval time.Duration) Option { // WithBufferShardCount sets the BufferShardCount option. func WithBufferShardCount(bufferShardCount int) Option { return func(o *Options) error { + if bufferShardCount < 1 { + return fmt.Errorf("BufferShardCount must be a positive integer") + } o.BufferShardCount = bufferShardCount return nil } diff --git a/statsd/statsd.go b/statsd/statsd.go index 37dd31e3..55fa0e79 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -212,7 +212,7 @@ type Client struct { closerLock sync.Mutex receiveMode ReceivingMode agg *aggregator - aggHistDist *aggregator + aggExtended *aggregator options []Option addrOption string } @@ -330,15 +330,6 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro Tags: o.Tags, metrics: &ClientMetrics{}, } - if o.Aggregation || o.ExtendedAggregation { - c.agg = newAggregator(&c) - c.agg.start(o.AggregationFlushInterval) - - if o.ExtendedAggregation { - c.aggHistDist = c.agg - } - } - // Inject values of DD_* environment variables as global tags. for envName, tagName := range ddEnvTagsMapping { if value := os.Getenv(envName); value != "" { @@ -371,10 +362,29 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro bufferPool := newBufferPool(o.BufferPoolSize, o.MaxBytesPerPayload, o.MaxMessagesPerPayload) c.sender = newSender(w, o.SenderQueueSize, bufferPool) c.receiveMode = o.ReceiveMode + + if o.Aggregation || o.ExtendedAggregation { + c.agg = newAggregator(&c) + c.agg.start(o.AggregationFlushInterval) + + if o.ExtendedAggregation { + c.aggExtended = c.agg + + if c.receiveMode == ChannelMode { + c.agg.startReceivingMetric(o.ChannelModeBufferSize) + } + } + } + for i := 0; i < o.BufferShardCount; i++ { w := newWorker(bufferPool, c.sender) c.workers = append(c.workers, w) - if c.receiveMode == ChannelMode { + + // ChannelMode mode at the worker level is not enabled when + // ExtendedAggregation is since the user app will not directly + // use the worker (the aggregator sit between the app and the + // workers). + if c.receiveMode == ChannelMode && !o.ExtendedAggregation { w.startReceivingMetric(o.ChannelModeBufferSize) } } @@ -447,7 +457,7 @@ func (c *Client) Flush() error { return ErrNoClient } if c.agg != nil { - c.agg.sendMetrics() + c.agg.flush() } for _, w := range c.workers { w.pause() @@ -481,10 +491,6 @@ func (c *Client) FlushTelemetryMetrics() ClientMetrics { } func (c *Client) send(m metric) error { - if c == nil { - return ErrNoClient - } - m.globalTags = c.Tags m.namespace = c.Namespace @@ -502,6 +508,28 @@ func (c *Client) send(m metric) error { return worker.processMetric(m) } +// sendBlocking is used by the aggregator to inject aggregated metrics. +func (c *Client) sendBlocking(m metric) error { + m.globalTags = c.Tags + m.namespace = c.Namespace + + h := hashString32(m.name) + worker := c.workers[h%uint32(len(c.workers))] + return worker.processMetric(m) +} + +func (c *Client) sendToAggregator(mType metricType, name string, value float64, tags []string, rate float64, f bufferedMetricSampleFunc) error { + if c.receiveMode == ChannelMode { + select { + case c.aggExtended.inputMetrics <- metric{metricType: mType, name: name, fvalue: value, tags: tags, rate: rate}: + default: + atomic.AddUint64(&c.metrics.TotalDroppedOnReceive, 1) + } + return nil + } + return f(name, value, tags, rate) +} + // Gauge measures the value of a metric at a particular time. func (c *Client) Gauge(name string, value float64, tags []string, rate float64) error { if c == nil { @@ -532,8 +560,8 @@ func (c *Client) Histogram(name string, value float64, tags []string, rate float return ErrNoClient } atomic.AddUint64(&c.metrics.TotalMetricsHistogram, 1) - if c.aggHistDist != nil { - return c.agg.histogram(name, value, tags) + if c.aggExtended != nil { + return c.sendToAggregator(histogram, name, value, tags, rate, c.aggExtended.histogram) } return c.send(metric{metricType: histogram, name: name, fvalue: value, tags: tags, rate: rate}) } @@ -544,8 +572,8 @@ func (c *Client) Distribution(name string, value float64, tags []string, rate fl return ErrNoClient } atomic.AddUint64(&c.metrics.TotalMetricsDistribution, 1) - if c.aggHistDist != nil { - return c.agg.distribution(name, value, tags) + if c.aggExtended != nil { + return c.sendToAggregator(distribution, name, value, tags, rate, c.aggExtended.distribution) } return c.send(metric{metricType: distribution, name: name, fvalue: value, tags: tags, rate: rate}) } @@ -584,8 +612,8 @@ func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, r return ErrNoClient } atomic.AddUint64(&c.metrics.TotalMetricsTiming, 1) - if c.aggHistDist != nil { - return c.agg.timing(name, value, tags) + if c.aggExtended != nil { + return c.sendToAggregator(timing, name, value, tags, rate, c.aggExtended.timing) } return c.send(metric{metricType: timing, name: name, fvalue: value, tags: tags, rate: rate}) } @@ -638,20 +666,23 @@ func (c *Client) Close() error { } close(c.stop) - if c.receiveMode == ChannelMode { + if c.receiveMode == ChannelMode && c.aggExtended == nil { for _, w := range c.workers { w.stopReceivingMetric() } } - // Wait for the threads to stop - c.wg.Wait() - - // Finally flush any remaining metrics that may have come in at the last moment + // flush the aggregator first if c.agg != nil { + if c.aggExtended != nil && c.receiveMode == ChannelMode { + c.agg.stopReceivingMetric() + } c.agg.stop() } - c.Flush() + // Wait for the threads to stop + c.wg.Wait() + + c.Flush() return c.sender.close() } diff --git a/statsd/statsd_test.go b/statsd/statsd_test.go index ada16472..be106d98 100644 --- a/statsd/statsd_test.go +++ b/statsd/statsd_test.go @@ -13,6 +13,10 @@ import ( "github.com/stretchr/testify/require" ) +var ( + defaultAddr = "localhost:1201" +) + type statsdWriterWrapper struct{} func (statsdWriterWrapper) SetWriteTimeout(time.Duration) error { @@ -102,14 +106,90 @@ func getTestServer(t *testing.T, addr string) *net.UDPConn { return server } -func testStatsdPipeline(t *testing.T, client *Client, addr string) { - server := getTestServer(t, addr) - defer server.Close() +func TestCloneWithExtraOptions(t *testing.T) { + client, err := New(defaultAddr, WithTags([]string{"tag1", "tag2"})) + require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err)) + + assert.Equal(t, client.Tags, []string{"tag1", "tag2"}) + assert.Equal(t, client.Namespace, "") + assert.Equal(t, client.receiveMode, MutexMode) + assert.Equal(t, client.addrOption, defaultAddr) + assert.Len(t, client.options, 1) + cloneClient, err := CloneWithExtraOptions(client, WithNamespace("test"), WithChannelMode()) + require.Nil(t, err, fmt.Sprintf("failed to clone client: %s", err)) + + assert.Equal(t, cloneClient.Tags, []string{"tag1", "tag2"}) + assert.Equal(t, cloneClient.Namespace, "test.") + assert.Equal(t, cloneClient.receiveMode, ChannelMode) + assert.Equal(t, cloneClient.addrOption, defaultAddr) + assert.Len(t, cloneClient.options, 3) +} + +func sendOneMetrics(client *Client) string { client.Count("name", 1, []string{"tag"}, 1) + return "name:1|c|#tag" +} - err := client.Close() - require.Nil(t, err, fmt.Sprintf("failed to close client: %s", err)) +func sendBasicMetrics(client *Client) string { + client.Gauge("gauge", 1, []string{"tag"}, 1) + client.Gauge("gauge", 21, []string{"tag"}, 1) + client.Count("count", 1, []string{"tag"}, 1) + client.Count("count", 3, []string{"tag"}, 1) + client.Set("set", "my_id", []string{"tag"}, 1) + client.Set("set", "my_id", []string{"tag"}, 1) + + return "set:my_id|s|#tag\ngauge:21|g|#tag\ncount:4|c|#tag" +} + +func sendAllMetrics(client *Client) string { + client.Gauge("gauge", 1, []string{"tag"}, 1) + client.Count("count", 2, []string{"tag"}, 1) + client.Set("set", "3_id", []string{"tag"}, 1) + client.Histogram("histo", 4, []string{"tag"}, 1) + client.Distribution("distro", 5, []string{"tag"}, 1) + client.Timing("timing", 6*time.Second, []string{"tag"}, 1) + + return "gauge:1|g|#tag\ncount:2|c|#tag\nset:3_id|s|#tag\nhisto:4|h|#tag\ndistro:5|d|#tag\ntiming:6000.000000|ms|#tag" +} + +func sendAllMetricsWithBasicAggregation(client *Client) string { + client.Gauge("gauge", 1, []string{"tag"}, 1) + client.Gauge("gauge", 21, []string{"tag"}, 1) + client.Count("count", 1, []string{"tag"}, 1) + client.Count("count", 3, []string{"tag"}, 1) + client.Set("set", "my_id", []string{"tag"}, 1) + client.Set("set", "my_id", []string{"tag"}, 1) + client.Histogram("histo", 3, []string{"tag"}, 1) + client.Histogram("histo", 31, []string{"tag"}, 1) + client.Distribution("distro", 3, []string{"tag"}, 1) + client.Distribution("distro", 22, []string{"tag"}, 1) + client.Timing("timing", 3*time.Second, []string{"tag"}, 1) + client.Timing("timing", 12*time.Second, []string{"tag"}, 1) + + return "histo:3|h|#tag\nhisto:31|h|#tag\ndistro:3|d|#tag\ndistro:22|d|#tag\ntiming:3000.000000|ms|#tag\ntiming:12000.000000|ms|#tag\nset:my_id|s|#tag\ngauge:21|g|#tag\ncount:4|c|#tag" +} + +func sendExtendedMetricsWithExtentedAggregation(client *Client) string { + client.Gauge("gauge", 1, []string{"tag"}, 1) + client.Gauge("gauge", 21, []string{"tag"}, 1) + client.Count("count", 1, []string{"tag"}, 1) + client.Count("count", 3, []string{"tag"}, 1) + client.Set("set", "my_id", []string{"tag"}, 1) + client.Set("set", "my_id", []string{"tag"}, 1) + client.Histogram("histo", 3, []string{"tag"}, 1) + client.Histogram("histo", 31, []string{"tag"}, 1) + client.Distribution("distro", 3, []string{"tag"}, 1) + client.Distribution("distro", 22, []string{"tag"}, 1) + client.Timing("timing", 3*time.Second, []string{"tag"}, 1) + client.Timing("timing", 12*time.Second, []string{"tag"}, 1) + + return "set:my_id|s|#tag\ngauge:21|g|#tag\ncount:4|c|#tag\nhisto:3:31|h|#tag\ndistro:3:22|d|#tag\ntiming:3000:12000|ms|#tag" +} + +func testStatsdPipeline(t *testing.T, client *Client, genMetric func(*Client) string, flush func(*Client)) { + server := getTestServer(t, defaultAddr) + defer server.Close() readDone := make(chan struct{}) buffer := make([]byte, 4096) @@ -119,66 +199,92 @@ func testStatsdPipeline(t *testing.T, client *Client, addr string) { close(readDone) }() + expectedResults := genMetric(client) + + flush(client) + select { case <-readDone: case <-time.After(2 * time.Second): require.Fail(t, "No data was flush on Close") } - result := string(buffer[:n]) - assert.Equal(t, "name:1|c|#tag", result) + assert.Equal(t, expectedResults, string(buffer[:n])) } -func TestChannelMode(t *testing.T) { - addr := "localhost:1201" - - client, err := New(addr, WithChannelMode()) - require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err)) - assert.False(t, client.telemetry.devMode) - - testStatsdPipeline(t, client, addr) -} - -func TestMutexMode(t *testing.T) { - addr := "localhost:1201" - - client, err := New(addr) - require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err)) - assert.False(t, client.telemetry.devMode) - - testStatsdPipeline(t, client, addr) -} - -func TestDevMode(t *testing.T) { - addr := "localhost:1201" - - client, err := New(addr, WithDevMode()) - require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err)) - assert.True(t, client.telemetry.devMode) - - testStatsdPipeline(t, client, addr) -} - -func TestCloneWithExtraOptions(t *testing.T) { - addr := "localhost:1201" - - client, err := New(addr, WithTags([]string{"tag1", "tag2"})) - require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err)) - - assert.Equal(t, client.Tags, []string{"tag1", "tag2"}) - assert.Equal(t, client.Namespace, "") - assert.Equal(t, client.receiveMode, MutexMode) - assert.Equal(t, client.addrOption, addr) - assert.Len(t, client.options, 1) +func TestGroupClient(t *testing.T) { + type testCase struct { + opt []Option + genMetric func(*Client) string + flushFunc func(*Client) + } - cloneClient, err := CloneWithExtraOptions(client, WithNamespace("test"), WithChannelMode()) - require.Nil(t, err, fmt.Sprintf("failed to clone client: %s", err)) + testMap := map[string]testCase{ + "MutexMode": testCase{ + []Option{WithBufferShardCount(1)}, + sendAllMetrics, + func(*Client) {}, + }, + "ChannelMode": testCase{ + []Option{WithChannelMode(), WithBufferShardCount(1)}, + sendAllMetrics, + func(*Client) {}, + }, + "DevMode": testCase{ + []Option{WithDevMode()}, + sendOneMetrics, + func(*Client) {}, + }, + "BasicAggregation + Close": testCase{ + []Option{WithClientSideAggregation(), WithBufferShardCount(1)}, + sendBasicMetrics, + func(c *Client) { c.Close() }, + }, + "BasicAggregation all metric + Close": testCase{ + []Option{WithClientSideAggregation(), WithBufferShardCount(1)}, + sendAllMetricsWithBasicAggregation, + func(c *Client) { c.Close() }, + }, + "BasicAggregation + Flush": testCase{ + []Option{WithClientSideAggregation(), WithBufferShardCount(1)}, + sendBasicMetrics, + func(c *Client) { c.Flush() }, + }, + "BasicAggregationChannelMode + Close": testCase{ + []Option{WithClientSideAggregation(), WithBufferShardCount(1), WithChannelMode()}, + sendBasicMetrics, + func(c *Client) { c.Close() }, + }, + "BasicAggregationChannelMode + Flush": testCase{ + []Option{WithClientSideAggregation(), WithBufferShardCount(1), WithChannelMode()}, + sendBasicMetrics, + func(c *Client) { c.Flush() }, + }, + "ExtendedAggregation + Close": testCase{ + []Option{WithExtendedClientSideAggregation(), WithBufferShardCount(1)}, + sendExtendedMetricsWithExtentedAggregation, + func(c *Client) { c.Close() }, + }, + "ExtendedAggregation + Close + ChannelMode": testCase{ + []Option{WithExtendedClientSideAggregation(), WithBufferShardCount(1), WithChannelMode()}, + sendExtendedMetricsWithExtentedAggregation, + func(c *Client) { + // since we're using ChannelMode we give a second to the worker to + // empty the channel. A second should be more than enough to pull 6 + // items from a channel. + time.Sleep(1 * time.Second) + c.Close() + }, + }, + } - assert.Equal(t, cloneClient.Tags, []string{"tag1", "tag2"}) - assert.Equal(t, cloneClient.Namespace, "test.") - assert.Equal(t, cloneClient.receiveMode, ChannelMode) - assert.Equal(t, cloneClient.addrOption, addr) - assert.Len(t, cloneClient.options, 3) + for testName, c := range testMap { + t.Run(testName, func(t *testing.T) { + client, err := New(defaultAddr, c.opt...) + require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err)) + testStatsdPipeline(t, client, c.genMetric, c.flushFunc) + }) + } } func TestResolveAddressFromEnvironment(t *testing.T) { diff --git a/statsd/telemetry_test.go b/statsd/telemetry_test.go index 9dd18389..db3d9e0d 100644 --- a/statsd/telemetry_test.go +++ b/statsd/telemetry_test.go @@ -169,7 +169,7 @@ func testTelemetry(t *testing.T, telemetry *telemetryClient, expectedMetrics []m submitTestMetrics(telemetry.c) if telemetry.c.agg != nil { - telemetry.c.agg.sendMetrics() + telemetry.c.agg.flush() } metrics := telemetry.flush() diff --git a/statsd/utils.go b/statsd/utils.go new file mode 100644 index 00000000..a2829d94 --- /dev/null +++ b/statsd/utils.go @@ -0,0 +1,23 @@ +package statsd + +import ( + "math/rand" + "sync" +) + +func shouldSample(rate float64, r *rand.Rand, lock *sync.Mutex) bool { + if rate >= 1 { + return true + } + // sources created by rand.NewSource() (ie. w.random) are not thread safe. + // TODO: use defer once the lowest Go version we support is 1.14 (defer + // has an overhead before that). + lock.Lock() + if r.Float64() > rate { + lock.Unlock() + return false + } + lock.Unlock() + return true + +} diff --git a/statsd/worker.go b/statsd/worker.go index 2ab0c75f..4f6369a0 100644 --- a/statsd/worker.go +++ b/statsd/worker.go @@ -59,7 +59,7 @@ func (w *worker) pullMetric() { } func (w *worker) processMetric(m metric) error { - if !w.shouldSample(m.rate) { + if !shouldSample(m.rate, w.random, &w.randomLock) { return nil } w.Lock() @@ -72,19 +72,6 @@ func (w *worker) processMetric(m metric) error { return err } -func (w *worker) shouldSample(rate float64) bool { - // sources created by rand.NewSource() (ie. w.random) are not thread safe. - // TODO: use defer once the lowest Go version we support is 1.14 (defer - // has an overhead before that). - w.randomLock.Lock() - if rate < 1 && w.random.Float64() > rate { - w.randomLock.Unlock() - return false - } - w.randomLock.Unlock() - return true -} - func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte) error { globalPos := 0 diff --git a/statsd/worker_test.go b/statsd/worker_test.go index 2f9f5c74..cf2c19f0 100644 --- a/statsd/worker_test.go +++ b/statsd/worker_test.go @@ -19,7 +19,7 @@ func TestShouldSample(t *testing.T) { worker := newWorker(newBufferPool(1, 1, 1), nil) count := 0 for i := 0; i < iterations; i++ { - if worker.shouldSample(rate) { + if shouldSample(rate, worker.random, &worker.randomLock) { count++ } } @@ -32,7 +32,7 @@ func BenchmarkShouldSample(b *testing.B) { b.RunParallel(func(pb *testing.PB) { worker := newWorker(newBufferPool(1, 1, 1), nil) for pb.Next() { - worker.shouldSample(0.1) + shouldSample(0.1, worker.random, &worker.randomLock) } }) }