Skip to content

Commit

Permalink
Use a worker-specific random source to remove lock contention.
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewdale committed Dec 23, 2020
1 parent 70f33d5 commit 1bcf5eb
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
10 changes: 9 additions & 1 deletion statsd/worker.go
Expand Up @@ -3,23 +3,31 @@ package statsd
import (
"math/rand"
"sync"
"time"
)

type worker struct {
pool *bufferPool
buffer *statsdBuffer
sender *sender
random *rand.Rand
sync.Mutex

inputMetrics chan metric
stop chan struct{}
}

func newWorker(pool *bufferPool, sender *sender) *worker {
// Note that calling time.Now().UnixNano() repeatedly quickly may return
// very similar values. That's fine for seeding the worker-specific random
// source because we just need an evenly distributed stream of float values.
// Do not use this random source for cryptographic randomness.
random := rand.New(rand.NewSource(time.Now().UnixNano()))
return &worker{
pool: pool,
sender: sender,
buffer: pool.borrowBuffer(),
random: random,
stop: make(chan struct{}),
}
}
Expand Down Expand Up @@ -59,7 +67,7 @@ func (w *worker) processMetric(m metric) error {
}

func (w *worker) shouldSample(rate float64) bool {
if rate < 1 && rand.Float64() > rate {
if rate < 1 && w.random.Float64() > rate {
return false
}
return true
Expand Down
35 changes: 35 additions & 0 deletions statsd/worker_test.go
@@ -0,0 +1,35 @@
package statsd

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func TestShouldSample(t *testing.T) {
rates := []float64{0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.99, 1.0}
iterations := 50_000
worker := newWorker(newBufferPool(1, 1, 1), nil)
for _, rate := range rates {
rate := rate // Capture range variable.
t.Run(fmt.Sprintf("Rate %0.2f", rate), func(t *testing.T) {
count := 0
for i := 0; i < iterations; i++ {
if worker.shouldSample(rate) {
count++
}
}
assert.InDelta(t, rate, float64(count)/float64(iterations), 0.01)
})
}
}

func BenchmarkShouldSample(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
worker := newWorker(newBufferPool(1, 1, 1), nil)
for pb.Next() {
worker.shouldSample(0.1)
}
})
}

0 comments on commit 1bcf5eb

Please sign in to comment.