Skip to content

Commit

Permalink
CLient side aggregation for distribution and histogram
Browse files Browse the repository at this point in the history
  • Loading branch information
hush-hush committed Nov 26, 2020
1 parent 70f33d5 commit cae8d57
Show file tree
Hide file tree
Showing 13 changed files with 755 additions and 41 deletions.
129 changes: 100 additions & 29 deletions statsd/aggregator.go
Expand Up @@ -8,23 +8,31 @@ import (
)

type (
countsMap map[string]*countMetric
gaugesMap map[string]*gaugeMetric
setsMap map[string]*setMetric
countsMap map[string]*countMetric
gaugesMap map[string]*gaugeMetric
setsMap map[string]*setMetric
histogramMap map[string]*histogramMetric
distributionMap map[string]*distributionMetric
)

type aggregator struct {
nbContextGauge int32
nbContextCount int32
nbContextSet int32

countsM sync.RWMutex
gaugesM sync.RWMutex
setsM sync.RWMutex

gauges gaugesMap
counts countsMap
sets setsMap
nbContextGauge int32
nbContextCount int32
nbContextSet int32
nbContextHistogram int32
nbContextDistribution int32

countsM sync.RWMutex
gaugesM sync.RWMutex
setsM sync.RWMutex
histogramsM sync.RWMutex
distributionM sync.RWMutex

gauges gaugesMap
counts countsMap
sets setsMap
histograms histogramMap
distributions distributionMap

closed chan struct{}
exited chan struct{}
Expand All @@ -33,20 +41,24 @@ type aggregator struct {
}

type aggregatorMetrics struct {
nbContext int32
nbContextGauge int32
nbContextCount int32
nbContextSet int32
nbContext int32
nbContextGauge int32
nbContextCount int32
nbContextSet int32
nbContextHistogram int32
nbContextDistribution int32
}

func newAggregator(c *Client) *aggregator {
return &aggregator{
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
closed: make(chan struct{}),
exited: make(chan struct{}),
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
histograms: histogramMap{},
distributions: distributionMap{},
closed: make(chan struct{}),
exited: make(chan struct{}),
}
}

Expand Down Expand Up @@ -84,12 +96,14 @@ func (a *aggregator) flushTelemetryMetrics() *aggregatorMetrics {
}

am := &aggregatorMetrics{
nbContextGauge: atomic.SwapInt32(&a.nbContextGauge, 0),
nbContextCount: atomic.SwapInt32(&a.nbContextCount, 0),
nbContextSet: atomic.SwapInt32(&a.nbContextSet, 0),
nbContextGauge: atomic.SwapInt32(&a.nbContextGauge, 0),
nbContextCount: atomic.SwapInt32(&a.nbContextCount, 0),
nbContextSet: atomic.SwapInt32(&a.nbContextSet, 0),
nbContextHistogram: atomic.SwapInt32(&a.nbContextHistogram, 0),
nbContextDistribution: atomic.SwapInt32(&a.nbContextDistribution, 0),
}

am.nbContext = am.nbContextGauge + am.nbContextCount + am.nbContextSet
am.nbContext = am.nbContextGauge + am.nbContextCount + am.nbContextSet + am.nbContextHistogram + am.nbContextDistribution
return am
}

Expand Down Expand Up @@ -126,14 +140,39 @@ func (a *aggregator) flushMetrics() []metric {
metrics = append(metrics, c.flushUnsafe())
}

a.histogramsM.Lock()
histograms := a.histograms
a.histograms = histogramMap{}
a.histogramsM.Unlock()

for _, h := range histograms {
metrics = append(metrics, h.flushUnsafe())
}

a.distributionM.Lock()
distributions := a.distributions
a.distributions = distributionMap{}
a.distributionM.Unlock()

for _, d := range distributions {
metrics = append(metrics, d.flushUnsafe())
}

atomic.AddInt32(&a.nbContextCount, int32(len(counts)))
atomic.AddInt32(&a.nbContextGauge, int32(len(gauges)))
atomic.AddInt32(&a.nbContextSet, int32(len(sets)))
atomic.AddInt32(&a.nbContextHistogram, int32(len(histograms)))
atomic.AddInt32(&a.nbContextDistribution, int32(len(distributions)))
return metrics
}

func getContext(name string, tags []string) string {
return name + ":" + strings.Join(tags, ",")
return name + ":" + strings.Join(tags, tagSeparatorSymbol)
}

func getContextAndTags(name string, tags []string) (string, string) {
stringTags := strings.Join(tags, tagSeparatorSymbol)
return name + ":" + stringTags, stringTags
}

func (a *aggregator) count(name string, value int64, tags []string) error {
Expand Down Expand Up @@ -185,3 +224,35 @@ func (a *aggregator) set(name string, value string, tags []string) error {
a.setsM.Unlock()
return nil
}

func (a *aggregator) histogram(name string, value float64, tags []string) error {
context, stringTags := getContextAndTags(name, tags)
a.histogramsM.RLock()
if histogram, found := a.histograms[context]; found {
histogram.sample(value)
a.histogramsM.RUnlock()
return nil
}
a.histogramsM.RUnlock()

a.histogramsM.Lock()
a.histograms[context] = newHistogramMetric(name, value, stringTags)
a.histogramsM.Unlock()
return nil
}

func (a *aggregator) distribution(name string, value float64, tags []string) error {
context, stringTags := getContextAndTags(name, tags)
a.distributionM.RLock()
if distribution, found := a.distributions[context]; found {
distribution.sample(value)
a.distributionM.RUnlock()
return nil
}
a.distributionM.RUnlock()

a.distributionM.Lock()
a.distributions[context] = newDistributionMetric(name, value, stringTags)
a.distributionM.Unlock()
return nil
}
60 changes: 57 additions & 3 deletions statsd/aggregator_test.go
Expand Up @@ -26,6 +26,14 @@ func TestAggregatorSample(t *testing.T) {
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")

a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")

a.histogram("histogramTest", 21, tags)
assert.Len(t, a.histograms, 1)
assert.Contains(t, a.histograms, "histogramTest:tag1,tag2")

a.gauge("gaugeTest", 123, tags)
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")
Expand All @@ -37,6 +45,11 @@ func TestAggregatorSample(t *testing.T) {
a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")

a.histogram("histogramTest", 21, tags)
assert.Len(t, a.histograms, 1)
assert.Contains(t, a.histograms, "histogramTest:tag1,tag2")

}

func TestAggregatorFlush(t *testing.T) {
Expand All @@ -57,13 +70,23 @@ func TestAggregatorFlush(t *testing.T) {
a.set("setTest1", "value2", tags)
a.set("setTest2", "value1", tags)

a.histogram("histogramTest1", 21, tags)
a.histogram("histogramTest1", 22, tags)
a.histogram("histogramTest2", 23, tags)

a.distribution("distributionTest1", 21, tags)
a.distribution("distributionTest1", 22, tags)
a.distribution("distributionTest2", 23, tags)

metrics := a.flushMetrics()

assert.Len(t, a.gauges, 0)
assert.Len(t, a.counts, 0)
assert.Len(t, a.sets, 0)
assert.Len(t, a.histograms, 0)
assert.Len(t, a.distributions, 0)

assert.Len(t, metrics, 7)
assert.Len(t, metrics, 11)

sort.Slice(metrics, func(i, j int) bool {
if metrics[i].metricType == metrics[j].metricType {
Expand All @@ -77,7 +100,7 @@ func TestAggregatorFlush(t *testing.T) {
return metrics[i].metricType < metrics[j].metricType
})

assert.Equal(t, metrics, []metric{
assert.Equal(t, []metric{
metric{
metricType: gauge,
name: "gaugeTest1",
Expand Down Expand Up @@ -106,6 +129,34 @@ func TestAggregatorFlush(t *testing.T) {
rate: 1,
ivalue: int64(1),
},
metric{
metricType: histogramAggregated,
name: "histogramTest1",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: 1,
fvalues: []float64{21.0, 22.0},
},
metric{
metricType: histogramAggregated,
name: "histogramTest2",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: 1,
fvalues: []float64{23.0},
},
metric{
metricType: distributionAggregated,
name: "distributionTest1",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: 1,
fvalues: []float64{21.0, 22.0},
},
metric{
metricType: distributionAggregated,
name: "distributionTest2",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: 1,
fvalues: []float64{23.0},
},
metric{
metricType: set,
name: "setTest1",
Expand All @@ -127,7 +178,8 @@ func TestAggregatorFlush(t *testing.T) {
rate: 1,
svalue: "value1",
},
})
},
metrics)

}

Expand All @@ -146,6 +198,8 @@ func TestAggregatorFlushConcurrency(t *testing.T) {
a.gauge("gaugeTest1", 21, tags)
a.count("countTest1", 21, tags)
a.set("setTest1", "value1", tags)
a.histogram("histogramTest1", 21, tags)
a.distribution("distributionTest1", 21, tags)
}()
}

Expand Down
62 changes: 61 additions & 1 deletion statsd/buffer.go
@@ -1,11 +1,21 @@
package statsd

import (
"strconv"
)

type bufferFullError string

func (e bufferFullError) Error() string { return string(e) }

const errBufferFull = bufferFullError("statsd buffer is full")

type partialWriteError string

func (e partialWriteError) Error() string { return string(e) }

const errPartialWrite = partialWriteError("value partially written")

const metricOverhead = 512

// statsdBuffer is a buffer containing statsd messages
Expand Down Expand Up @@ -55,6 +65,56 @@ func (b *statsdBuffer) writeHistogram(namespace string, globalTags []string, nam
return b.validateNewElement(originalBuffer)
}

// writeAggregated serialized as many values as possible in the current buffer and return the position in values where it stopped.
func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, globalTags []string, name string, values []float64, tags string, tagSize int) (int, error) {
if b.elementCount >= b.maxElements {
return 0, errBufferFull
}

originalBuffer := b.buffer
b.buffer = appendHeader(b.buffer, namespace, name)

// buffer already full
if len(b.buffer)+tagSize > b.maxSize {
b.buffer = originalBuffer
return 0, errBufferFull
}

// We add as many value as possible
var position int
for idx, v := range values {
previousBuffer := b.buffer
if idx != 0 {
b.buffer = append(b.buffer, ':')
}
b.buffer = strconv.AppendFloat(b.buffer, v, 'f', -1, 64)

// Should we stop serializing and switch to another buffer
if len(b.buffer)+tagSize > b.maxSize {
b.buffer = previousBuffer
break
}
position = idx + 1
}

// we could not add a single value
if position == 0 {
b.buffer = originalBuffer
return 0, errBufferFull
}

b.buffer = append(b.buffer, '|')
b.buffer = append(b.buffer, metricSymbol...)
b.buffer = appendTagsAggregated(b.buffer, globalTags, tags)
b.elementCount++

if position != len(values) {
return position, errPartialWrite
}
return position, nil

}

func (b *statsdBuffer) writeDistribution(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
Expand Down Expand Up @@ -116,7 +176,7 @@ func (b *statsdBuffer) validateNewElement(originalBuffer []byte) error {

func (b *statsdBuffer) writeSeparator() {
if b.elementCount != 0 {
b.buffer = appendSeparator(b.buffer)
b.buffer = append(b.buffer, '\n')
}
}

Expand Down

0 comments on commit cae8d57

Please sign in to comment.