Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a worker-specific random source to remove lock contention. #178

Merged
merged 2 commits into from Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 13 additions & 1 deletion statsd/worker.go
Expand Up @@ -3,23 +3,35 @@ package statsd
import (
"math/rand"
"sync"
"time"
)

type worker struct {
pool *bufferPool
buffer *statsdBuffer
sender *sender
random *rand.Rand
sync.Mutex

inputMetrics chan metric
stop chan struct{}
}

func newWorker(pool *bufferPool, sender *sender) *worker {
// Each worker uses its own random source to prevent workers in separate
// goroutines from contending for the lock on the "math/rand" package-global
// random source (e.g. calls like "rand.Float64()" must acquire a shared
// lock to get the next pseudorandom number).
// Note that calling "time.Now().UnixNano()" repeatedly quickly may return
// very similar values. That's fine for seeding the worker-specific random
// source because we just need an evenly distributed stream of float values.
// Do not use this random source for cryptographic randomness.
random := rand.New(rand.NewSource(time.Now().UnixNano()))
matthewdale marked this conversation as resolved.
Show resolved Hide resolved
return &worker{
pool: pool,
sender: sender,
buffer: pool.borrowBuffer(),
random: random,
stop: make(chan struct{}),
}
}
Expand Down Expand Up @@ -59,7 +71,7 @@ func (w *worker) processMetric(m metric) error {
}

func (w *worker) shouldSample(rate float64) bool {
if rate < 1 && rand.Float64() > rate {
if rate < 1 && w.random.Float64() > rate {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matthewdale @hush-hush Shouldn't we have a local lock here? From the docs:

The default Source is safe for concurrent use by multiple goroutines, but Sources created by NewSource are not.

I think this is causing race conditions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct. When MutexMode is enabled, the call to worker.processMetric() is made from the same goroutine as the call to the top-level metric emit function (e.g. Count). I was previously under the impression that all calls to worker.processMetric() would be made from a single goroutine per worker, but that's not the case.

I'm working on a fix for the data race bug and will submit it when it's ready.

return false
}
return true
Expand Down
38 changes: 38 additions & 0 deletions statsd/worker_test.go
@@ -0,0 +1,38 @@
package statsd

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func TestShouldSample(t *testing.T) {
rates := []float64{0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.99, 1.0}
iterations := 50_000

for _, rate := range rates {
rate := rate // Capture range variable.
matthewdale marked this conversation as resolved.
Show resolved Hide resolved
t.Run(fmt.Sprintf("Rate %0.2f", rate), func(t *testing.T) {
t.Parallel()

worker := newWorker(newBufferPool(1, 1, 1), nil)
count := 0
for i := 0; i < iterations; i++ {
if worker.shouldSample(rate) {
count++
}
}
assert.InDelta(t, rate, float64(count)/float64(iterations), 0.01)
})
}
}

func BenchmarkShouldSample(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
worker := newWorker(newBufferPool(1, 1, 1), nil)
for pb.Next() {
worker.shouldSample(0.1)
}
})
}