Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BETA] Full client-side aggregate with histogram, distribution and timing #176

Merged
merged 1 commit into from Jan 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
126 changes: 105 additions & 21 deletions statsd/aggregator.go
Expand Up @@ -8,11 +8,62 @@ import (
)

type (
countsMap map[string]*countMetric
gaugesMap map[string]*gaugeMetric
setsMap map[string]*setMetric
countsMap map[string]*countMetric
gaugesMap map[string]*gaugeMetric
setsMap map[string]*setMetric
bufferedMetricMap map[string]*histogramMetric
)

// bufferedMetricContexts represent the contexts for Histograms, Distributions
// and Timing. Since those 3 metric types behave the same way and are sampled
// with the same type they're represented by the same class.
type bufferedMetricContexts struct {
nbContext int32
mutex sync.RWMutex
values bufferedMetricMap
newMetric func(string, float64, string) *bufferedMetric
}

func newBufferedContexts(newMetric func(string, float64, string) *bufferedMetric) bufferedMetricContexts {
return bufferedMetricContexts{
values: bufferedMetricMap{},
newMetric: newMetric,
}
}

func (bc *bufferedMetricContexts) flush(metrics []metric) []metric {
bc.mutex.Lock()
values := bc.values
bc.values = bufferedMetricMap{}
bc.mutex.Unlock()

for _, d := range values {
metrics = append(metrics, d.flushUnsafe())
}
atomic.AddInt32(&bc.nbContext, int32(len(values)))
return metrics
}

func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string) error {
context, stringTags := getContextAndTags(name, tags)
bc.mutex.RLock()
if v, found := bc.values[context]; found {
v.sample(value)
bc.mutex.RUnlock()
return nil
}
bc.mutex.RUnlock()

bc.mutex.Lock()
bc.values[context] = bc.newMetric(name, value, stringTags)
bc.mutex.Unlock()
return nil
}

func (bc *bufferedMetricContexts) resetAndGetNbContext() int32 {
return atomic.SwapInt32(&bc.nbContext, 0)
}

type aggregator struct {
nbContextGauge int32
nbContextCount int32
Expand All @@ -22,9 +73,12 @@ type aggregator struct {
gaugesM sync.RWMutex
setsM sync.RWMutex

gauges gaugesMap
counts countsMap
sets setsMap
gauges gaugesMap
counts countsMap
sets setsMap
histograms bufferedMetricContexts
distributions bufferedMetricContexts
timings bufferedMetricContexts

closed chan struct{}
exited chan struct{}
Expand All @@ -33,20 +87,26 @@ type aggregator struct {
}

type aggregatorMetrics struct {
nbContext int32
nbContextGauge int32
nbContextCount int32
nbContextSet int32
nbContext int32
nbContextGauge int32
nbContextCount int32
nbContextSet int32
nbContextHistogram int32
nbContextDistribution int32
nbContextTiming int32
}

func newAggregator(c *Client) *aggregator {
return &aggregator{
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
closed: make(chan struct{}),
exited: make(chan struct{}),
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
histograms: newBufferedContexts(newHistogramMetric),
distributions: newBufferedContexts(newDistributionMetric),
timings: newBufferedContexts(newTimingMetric),
closed: make(chan struct{}),
exited: make(chan struct{}),
}
}

Expand Down Expand Up @@ -84,12 +144,15 @@ func (a *aggregator) flushTelemetryMetrics() *aggregatorMetrics {
}

am := &aggregatorMetrics{
nbContextGauge: atomic.SwapInt32(&a.nbContextGauge, 0),
nbContextCount: atomic.SwapInt32(&a.nbContextCount, 0),
nbContextSet: atomic.SwapInt32(&a.nbContextSet, 0),
nbContextGauge: atomic.SwapInt32(&a.nbContextGauge, 0),
nbContextCount: atomic.SwapInt32(&a.nbContextCount, 0),
nbContextSet: atomic.SwapInt32(&a.nbContextSet, 0),
nbContextHistogram: a.histograms.resetAndGetNbContext(),
nbContextDistribution: a.distributions.resetAndGetNbContext(),
nbContextTiming: a.timings.resetAndGetNbContext(),
}

am.nbContext = am.nbContextGauge + am.nbContextCount + am.nbContextSet
am.nbContext = am.nbContextGauge + am.nbContextCount + am.nbContextSet + am.nbContextHistogram + am.nbContextDistribution + am.nbContextTiming
return am
}

Expand Down Expand Up @@ -126,14 +189,23 @@ func (a *aggregator) flushMetrics() []metric {
metrics = append(metrics, c.flushUnsafe())
}

metrics = a.histograms.flush(metrics)
metrics = a.distributions.flush(metrics)
metrics = a.timings.flush(metrics)

atomic.AddInt32(&a.nbContextCount, int32(len(counts)))
atomic.AddInt32(&a.nbContextGauge, int32(len(gauges)))
atomic.AddInt32(&a.nbContextSet, int32(len(sets)))
return metrics
}

func getContext(name string, tags []string) string {
return name + ":" + strings.Join(tags, ",")
return name + ":" + strings.Join(tags, tagSeparatorSymbol)
}

func getContextAndTags(name string, tags []string) (string, string) {
stringTags := strings.Join(tags, tagSeparatorSymbol)
return name + ":" + stringTags, stringTags
}

func (a *aggregator) count(name string, value int64, tags []string) error {
Expand Down Expand Up @@ -185,3 +257,15 @@ func (a *aggregator) set(name string, value string, tags []string) error {
a.setsM.Unlock()
return nil
}

func (a *aggregator) histogram(name string, value float64, tags []string) error {
return a.histograms.sample(name, value, tags)
}

func (a *aggregator) distribution(name string, value float64, tags []string) error {
return a.distributions.sample(name, value, tags)
}

func (a *aggregator) timing(name string, value float64, tags []string) error {
return a.timings.sample(name, value, tags)
}
119 changes: 93 additions & 26 deletions statsd/aggregator_test.go
Expand Up @@ -14,29 +14,35 @@ func TestAggregatorSample(t *testing.T) {

tags := []string{"tag1", "tag2"}

a.gauge("gaugeTest", 21, tags)
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")

a.count("countTest", 21, tags)
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")

a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")

a.gauge("gaugeTest", 123, tags)
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")

a.count("countTest", 10, tags)
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")

a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")
for i := 0; i < 2; i++ {
a.gauge("gaugeTest", 21, tags)
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")

a.count("countTest", 21, tags)
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")

a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")

a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")

a.histogram("histogramTest", 21, tags)
assert.Len(t, a.histograms.values, 1)
assert.Contains(t, a.histograms.values, "histogramTest:tag1,tag2")

a.distribution("distributionTest", 21, tags)
assert.Len(t, a.distributions.values, 1)
assert.Contains(t, a.distributions.values, "distributionTest:tag1,tag2")

a.timing("timingTest", 21, tags)
assert.Len(t, a.timings.values, 1)
assert.Contains(t, a.timings.values, "timingTest:tag1,tag2")
}
}

func TestAggregatorFlush(t *testing.T) {
Expand All @@ -57,13 +63,28 @@ func TestAggregatorFlush(t *testing.T) {
a.set("setTest1", "value2", tags)
a.set("setTest2", "value1", tags)

a.histogram("histogramTest1", 21, tags)
a.histogram("histogramTest1", 22, tags)
a.histogram("histogramTest2", 23, tags)

a.distribution("distributionTest1", 21, tags)
a.distribution("distributionTest1", 22, tags)
a.distribution("distributionTest2", 23, tags)

a.timing("timingTest1", 21, tags)
a.timing("timingTest1", 22, tags)
a.timing("timingTest2", 23, tags)

metrics := a.flushMetrics()

assert.Len(t, a.gauges, 0)
assert.Len(t, a.counts, 0)
assert.Len(t, a.sets, 0)
assert.Len(t, a.histograms.values, 0)
assert.Len(t, a.distributions.values, 0)
assert.Len(t, a.timings.values, 0)

assert.Len(t, metrics, 7)
assert.Len(t, metrics, 13)

sort.Slice(metrics, func(i, j int) bool {
if metrics[i].metricType == metrics[j].metricType {
Expand All @@ -77,7 +98,7 @@ func TestAggregatorFlush(t *testing.T) {
return metrics[i].metricType < metrics[j].metricType
})

assert.Equal(t, metrics, []metric{
assert.Equal(t, []metric{
metric{
metricType: gauge,
name: "gaugeTest1",
Expand Down Expand Up @@ -106,6 +127,34 @@ func TestAggregatorFlush(t *testing.T) {
rate: 1,
ivalue: int64(1),
},
metric{
metricType: histogramAggregated,
name: "histogramTest1",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: 1,
fvalues: []float64{21.0, 22.0},
},
metric{
metricType: histogramAggregated,
name: "histogramTest2",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: 1,
fvalues: []float64{23.0},
},
metric{
metricType: distributionAggregated,
name: "distributionTest1",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: 1,
fvalues: []float64{21.0, 22.0},
},
metric{
metricType: distributionAggregated,
name: "distributionTest2",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: 1,
fvalues: []float64{23.0},
},
metric{
metricType: set,
name: "setTest1",
Expand All @@ -127,7 +176,22 @@ func TestAggregatorFlush(t *testing.T) {
rate: 1,
svalue: "value1",
},
})
metric{
metricType: timingAggregated,
name: "timingTest1",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: 1,
fvalues: []float64{21.0, 22.0},
},
metric{
metricType: timingAggregated,
name: "timingTest2",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: 1,
fvalues: []float64{23.0},
},
},
metrics)

}

Expand All @@ -146,6 +210,9 @@ func TestAggregatorFlushConcurrency(t *testing.T) {
a.gauge("gaugeTest1", 21, tags)
a.count("countTest1", 21, tags)
a.set("setTest1", "value1", tags)
a.histogram("histogramTest1", 21, tags)
a.distribution("distributionTest1", 21, tags)
a.timing("timingTest1", 21, tags)
}()
}

Expand Down