Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement ChannelMode and sampling rate for extended aggregation
When using extended aggregation we still want to respect sampling rate for histograms, distribution and timing as it will have direct consequences on the Agent (number of point to aggregate). For apps sending a high number of metrics the aggregator implements ChannelMode to avoid lock contention when generating random numbers.
- Loading branch information
Showing
10 changed files
with
409 additions
and
191 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
package statsd | ||
|
||
import ( | ||
"math/rand" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
) | ||
|
||
// bufferedMetricContexts represent the contexts for Histograms, Distributions | ||
// and Timing. Since those 3 metric types behave the same way and are sampled | ||
// with the same type they're represented by the same class. | ||
type bufferedMetricContexts struct { | ||
nbContext int32 | ||
mutex sync.RWMutex | ||
values bufferedMetricMap | ||
newMetric func(string, float64, string) *bufferedMetric | ||
|
||
// Each bufferedMetricContexts uses its own random source and random | ||
// lock to prevent goroutines from contending for the lock on the | ||
// "math/rand" package-global random source (e.g. calls like | ||
// "rand.Float64()" must acquire a shared lock to get the next | ||
// pseudorandom number). | ||
random *rand.Rand | ||
randomLock sync.Mutex | ||
} | ||
|
||
func newBufferedContexts(newMetric func(string, float64, string) *bufferedMetric) bufferedMetricContexts { | ||
return bufferedMetricContexts{ | ||
values: bufferedMetricMap{}, | ||
newMetric: newMetric, | ||
// Note that calling "time.Now().UnixNano()" repeatedly quickly may return | ||
// very similar values. That's fine for seeding the worker-specific random | ||
// source because we just need an evenly distributed stream of float values. | ||
// Do not use this random source for cryptographic randomness. | ||
random: rand.New(rand.NewSource(time.Now().UnixNano())), | ||
} | ||
} | ||
|
||
func (bc *bufferedMetricContexts) flush(metrics []metric) []metric { | ||
bc.mutex.Lock() | ||
values := bc.values | ||
bc.values = bufferedMetricMap{} | ||
bc.mutex.Unlock() | ||
|
||
for _, d := range values { | ||
metrics = append(metrics, d.flushUnsafe()) | ||
} | ||
atomic.AddInt32(&bc.nbContext, int32(len(values))) | ||
return metrics | ||
} | ||
|
||
func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string, rate float64) error { | ||
if !shouldSample(rate, bc.random, &bc.randomLock) { | ||
return nil | ||
} | ||
|
||
context, stringTags := getContextAndTags(name, tags) | ||
bc.mutex.RLock() | ||
if v, found := bc.values[context]; found { | ||
v.sample(value) | ||
bc.mutex.RUnlock() | ||
return nil | ||
} | ||
bc.mutex.RUnlock() | ||
|
||
bc.mutex.Lock() | ||
bc.values[context] = bc.newMetric(name, value, stringTags) | ||
bc.mutex.Unlock() | ||
return nil | ||
} | ||
|
||
func (bc *bufferedMetricContexts) resetAndGetNbContext() int32 { | ||
return atomic.SwapInt32(&bc.nbContext, 0) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.