Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lossy aggregator mode to reduce contention #282

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
101 changes: 53 additions & 48 deletions statsd/aggregator.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package statsd

import (
"strings"
"sync"
"sync/atomic"
"time"
)

type (
countsMap map[string]*countMetric
gaugesMap map[string]*gaugeMetric
setsMap map[string]*setMetric
bufferedMetricMap map[string]*bufferedMetric
metricContext struct{ name, tags string }
countsMap map[metricContext]*countMetric
gaugesMap map[metricContext]*gaugeMetric
setsMap map[metricContext]*setMetric
bufferedMetricMap map[metricContext]*bufferedMetric
)

type aggregator struct {
Expand All @@ -30,6 +30,10 @@ type aggregator struct {
distributions bufferedMetricContexts
timings bufferedMetricContexts

lossyHistogramBufferPool *sync.Pool
lossyDistributionBufferPool *sync.Pool
lossyTimingBufferPool *sync.Pool

closed chan struct{}

client *Client
Expand All @@ -43,17 +47,20 @@ type aggregator struct {
wg sync.WaitGroup
}

func newAggregator(c *Client) *aggregator {
func newAggregator(c *Client, o *Options) *aggregator {
return &aggregator{
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
histograms: newBufferedContexts(newHistogramMetric),
distributions: newBufferedContexts(newDistributionMetric),
timings: newBufferedContexts(newTimingMetric),
closed: make(chan struct{}),
stopChannelMode: make(chan struct{}),
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
histograms: newBufferedContexts(newHistogramMetric),
distributions: newBufferedContexts(newDistributionMetric),
timings: newBufferedContexts(newTimingMetric),
lossyHistogramBufferPool: newLossyBufferPool(newHistogramMetric, o.flushSampleThreshold),
lossyDistributionBufferPool: newLossyBufferPool(newDistributionMetric, o.flushSampleThreshold),
lossyTimingBufferPool: newLossyBufferPool(newTimingMetric, o.flushSampleThreshold),
closed: make(chan struct{}),
stopChannelMode: make(chan struct{}),
}
}

Expand Down Expand Up @@ -127,6 +134,9 @@ func (a *aggregator) flushTelemetryMetrics(t *Telemetry) {
t.AggregationNbContextHistogram = a.histograms.getNbContext()
t.AggregationNbContextDistribution = a.distributions.getNbContext()
t.AggregationNbContextTiming = a.timings.getNbContext()
t.AggregationNbSampleHistogram = a.histograms.getNbSample()
t.AggregationNbSampleDistribution = a.distributions.getNbSample()
t.AggregationNbSampleTiming = a.timings.getNbSample()
}

func (a *aggregator) flushMetrics() []metric {
Expand Down Expand Up @@ -172,33 +182,14 @@ func (a *aggregator) flushMetrics() []metric {
return metrics
}

func getContext(name string, tags []string) string {
func getContext(name string, tags []string) metricContext {
c, _ := getContextAndTags(name, tags)
return c
}

func getContextAndTags(name string, tags []string) (string, string) {
if len(tags) == 0 {
return name + nameSeparatorSymbol, ""
}
n := len(name) + len(nameSeparatorSymbol) + len(tagSeparatorSymbol)*(len(tags)-1)
for _, s := range tags {
n += len(s)
}

var sb strings.Builder
sb.Grow(n)
sb.WriteString(name)
sb.WriteString(nameSeparatorSymbol)
sb.WriteString(tags[0])
for _, s := range tags[1:] {
sb.WriteString(tagSeparatorSymbol)
sb.WriteString(s)
}

s := sb.String()

return s, s[len(name)+len(nameSeparatorSymbol):]
func getContextAndTags(name string, tags []string) (metricContext, string) {
tagString := joinTags(tags)
return metricContext{name, tagString}, tagString
}

func (a *aggregator) count(name string, value int64, tags []string) error {
Expand All @@ -212,7 +203,7 @@ func (a *aggregator) count(name string, value int64, tags []string) error {
a.countsM.RUnlock()

a.countsM.Lock()
// Check if another goroutines hasn't created the value betwen the RUnlock and 'Lock'
// Check if another goroutine hasn't created the value betwen the RUnlock and 'Lock'
if count, found := a.counts[context]; found {
count.sample(value)
a.countsM.Unlock()
Expand All @@ -237,7 +228,7 @@ func (a *aggregator) gauge(name string, value float64, tags []string) error {
gauge := newGaugeMetric(name, value, tags)

a.gaugesM.Lock()
// Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock'
// Check if another goroutine hasn't created the value betwen the 'RUnlock' and 'Lock'
if gauge, found := a.gauges[context]; found {
gauge.sample(value)
a.gaugesM.Unlock()
Expand All @@ -259,7 +250,7 @@ func (a *aggregator) set(name string, value string, tags []string) error {
a.setsM.RUnlock()

a.setsM.Lock()
// Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock'
// Check if another goroutine hasn't created the value betwen the 'RUnlock' and 'Lock'
if set, found := a.sets[context]; found {
set.sample(value)
a.setsM.Unlock()
Expand All @@ -275,16 +266,30 @@ func (a *aggregator) set(name string, value string, tags []string) error {
// sample rate will have impacts on the CPU and memory usage of the Agent.

// type alias for Client.sendToAggregator
type bufferedMetricSampleFunc func(name string, value float64, tags []string, rate float64) error
type bufferedMetricSampleFunc func(name string, value float64, tags []string, rate float64)

func (a *aggregator) histogram(name string, value float64, tags []string, rate float64) {
a.histograms.sample(name, value, tags, rate)
}

func (a *aggregator) distribution(name string, value float64, tags []string, rate float64) {
a.distributions.sample(name, value, tags, rate)
}

func (a *aggregator) timing(name string, value float64, tags []string, rate float64) {
a.timings.sample(name, value, tags, rate)
}

type bufferedMetricSampleBulkFunc func(bulkMap bufferedMetricMap)

func (a *aggregator) histogram(name string, value float64, tags []string, rate float64) error {
return a.histograms.sample(name, value, tags, rate)
func (a *aggregator) histogramBulk(bulkMap bufferedMetricMap) {
a.histograms.sampleBulk(bulkMap)
}

func (a *aggregator) distribution(name string, value float64, tags []string, rate float64) error {
return a.distributions.sample(name, value, tags, rate)
func (a *aggregator) distributionBulk(bulkMap bufferedMetricMap) {
a.distributions.sampleBulk(bulkMap)
}

func (a *aggregator) timing(name string, value float64, tags []string, rate float64) error {
return a.timings.sample(name, value, tags, rate)
func (a *aggregator) timingBulk(bulkMap bufferedMetricMap) {
a.timings.sampleBulk(bulkMap)
}
69 changes: 33 additions & 36 deletions statsd/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,43 @@ import (
)

func TestAggregatorSample(t *testing.T) {
a := newAggregator(nil)
a := newAggregator(nil, &Options{})

tags := []string{"tag1", "tag2"}

for i := 0; i < 2; i++ {
a.gauge("gaugeTest", 21, tags)
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")
assert.Contains(t, a.gauges, metricContext{"gaugeTest", "tag1,tag2"})

a.count("countTest", 21, tags)
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")
assert.Contains(t, a.counts, metricContext{"countTest", "tag1,tag2"})

a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")
assert.Contains(t, a.sets, metricContext{"setTest", "tag1,tag2"})

a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")
assert.Contains(t, a.sets, metricContext{"setTest", "tag1,tag2"})

a.histogram("histogramTest", 21, tags, 1)
assert.Len(t, a.histograms.values, 1)
assert.Contains(t, a.histograms.values, "histogramTest:tag1,tag2")
assert.Contains(t, a.histograms.values, metricContext{"histogramTest", "tag1,tag2"})

a.distribution("distributionTest", 21, tags, 1)
assert.Len(t, a.distributions.values, 1)
assert.Contains(t, a.distributions.values, "distributionTest:tag1,tag2")
assert.Contains(t, a.distributions.values, metricContext{"distributionTest", "tag1,tag2"})

a.timing("timingTest", 21, tags, 1)
assert.Len(t, a.timings.values, 1)
assert.Contains(t, a.timings.values, "timingTest:tag1,tag2")
assert.Contains(t, a.timings.values, metricContext{"timingTest", "tag1,tag2"})
}
}

func TestAggregatorFlush(t *testing.T) {
a := newAggregator(nil)
a := newAggregator(nil, &Options{})

tags := []string{"tag1", "tag2"}

Expand Down Expand Up @@ -196,7 +196,7 @@ func TestAggregatorFlush(t *testing.T) {
}

func TestAggregatorFlushConcurrency(t *testing.T) {
a := newAggregator(nil)
a := newAggregator(nil, &Options{})

var wg sync.WaitGroup
wg.Add(10)
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestAggregatorFlushConcurrency(t *testing.T) {
}

func TestAggregatorTagsCopy(t *testing.T) {
a := newAggregator(nil)
a := newAggregator(nil, &Options{})
tags := []string{"tag1", "tag2"}

a.gauge("gauge", 21, tags)
Expand All @@ -244,41 +244,38 @@ func TestAggregatorTagsCopy(t *testing.T) {
}
}

func TestGetContextAndTags(t *testing.T) {
func BenchmarkGetContextAndTags(b *testing.B) {
tests := []struct {
testName string
name string
tags []string
wantContext string
wantTags string
testName string
name string
tags []string
}{
{
testName: "no tags",
name: "name",
tags: nil,
wantContext: "name:",
wantTags: "",
testName: "no tags",
name: "name",
tags: nil,
},
{
testName: "one tag",
name: "name",
tags: []string{"tag1"},
wantContext: "name:tag1",
wantTags: "tag1",
testName: "one tag",
name: "name",
tags: []string{"tag1"},
},
{
testName: "two tags",
name: "name",
tags: []string{"tag1", "tag2"},
wantContext: "name:tag1,tag2",
wantTags: "tag1,tag2",
testName: "two tags",
name: "name",
tags: []string{"tag1", "tag2"},
},
{
testName: "many tags",
name: "name",
tags: []string{"tag1", "tag2", "tag3", "tag4", "tag5", "tag7", "tag8", "tag9", "tag10"},
},
}
for _, test := range tests {
t.Run(test.testName, func(t *testing.T) {
gotContext, gotTags := getContextAndTags(test.name, test.tags)
assert.Equal(t, test.wantContext, gotContext)
assert.Equal(t, test.wantTags, gotTags)
b.Run(test.testName, func(b *testing.B) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be curious to have results of both versions. This is a crucial part of the client performance, it might be interesting to write them in the PR description.
I don't think there will be any difference but I'm also wondering how's the hashing performing once in the map.

for i := 0; i < b.N; i++ {
_, _ = getContextAndTags(test.name, test.tags)
}
})
}
}