From 7584327624b1e226a43f3ab21e02c350bc24332b Mon Sep 17 00:00:00 2001 From: Maxime mouial Date: Wed, 27 Jan 2021 17:18:17 +0100 Subject: [PATCH] Fix worker random race condition --- statsd/worker.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/statsd/worker.go b/statsd/worker.go index 40f790a8..7073d6f1 100644 --- a/statsd/worker.go +++ b/statsd/worker.go @@ -7,10 +7,11 @@ import ( ) type worker struct { - pool *bufferPool - buffer *statsdBuffer - sender *sender - random *rand.Rand + pool *bufferPool + buffer *statsdBuffer + sender *sender + random *rand.Rand + randomLock sync.Mutex sync.Mutex inputMetrics chan metric @@ -18,10 +19,11 @@ type worker struct { } func newWorker(pool *bufferPool, sender *sender) *worker { - // Each worker uses its own random source to prevent workers in separate - // goroutines from contending for the lock on the "math/rand" package-global - // random source (e.g. calls like "rand.Float64()" must acquire a shared - // lock to get the next pseudorandom number). + // Each worker uses its own random source and random lock to prevent + // workers in separate goroutines from contending for the lock on the + // "math/rand" package-global random source (e.g. calls like + // "rand.Float64()" must acquire a shared lock to get the next + // pseudorandom number). // Note that calling "time.Now().UnixNano()" repeatedly quickly may return // very similar values. That's fine for seeding the worker-specific random // source because we just need an evenly distributed stream of float values. @@ -71,9 +73,15 @@ func (w *worker) processMetric(m metric) error { } func (w *worker) shouldSample(rate float64) bool { + // rand.NewSource is not thread safe. + // TODO: use defer once the lowest Go version we support is 1.14 (defer + // has an overhead before that). + w.randomLock.Lock() if rate < 1 && w.random.Float64() > rate { + w.randomLock.Unlock() return false } + w.randomLock.Unlock() return true }