Skip to content

Commit

Permalink
Update limiter rate (#812)
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelgavache committed Jan 11, 2021
1 parent a5f08bc commit 3ed2a27
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 32 deletions.
24 changes: 12 additions & 12 deletions ddtrace/tracer/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,12 @@ func (sr *SamplingRule) MarshalJSON() ([]byte, error) {
type rateLimiter struct {
limiter *rate.Limiter

mu sync.Mutex // guards below fields
prevTime time.Time // time at which prevRate was set
prevRate float64 // previous second's rate.
allowed int // number of spans allowed in the current period
seen int // number of spans seen in the current period
mu sync.Mutex // guards below fields
prevTime time.Time // time at which prevAllowed and prevSeen were set
allowed float64 // number of spans allowed in the current period
seen float64 // number of spans seen in the current period
prevAllowed float64 // number of spans allowed in the previous period
prevSeen float64 // number of spans seen in the previous period
}

// allowOne returns the rate limiter's decision to allow the span to be sampled, and the
Expand All @@ -428,11 +429,13 @@ func (r *rateLimiter) allowOne(now time.Time) (bool, float64) {
if d := now.Sub(r.prevTime); d >= time.Second {
// enough time has passed to reset the counters
if d.Truncate(time.Second) == time.Second && r.seen > 0 {
// exactly one second, so update prevRate
r.prevRate = float64(r.allowed) / float64(r.seen)
// exactly one second, so update prev
r.prevAllowed = r.allowed
r.prevSeen = r.seen
} else {
// more than one second, so reset previous rate
r.prevRate = 0.0
r.prevAllowed = 0
r.prevSeen = 0
}
r.prevTime = now
r.allowed = 0
Expand All @@ -445,9 +448,6 @@ func (r *rateLimiter) allowOne(now time.Time) (bool, float64) {
r.allowed++
sampled = true
}
// TODO(x): This algorithm is wrong. When there were no spans in the previous period prevRate will be 0.0
// and the resulting effective rate will be half of the actual rate. We should fix the algorithm by using
// a similar method as we do in the Datadog Agent in the rate limiter (using a decay period).
er := (r.prevRate + (float64(r.allowed) / float64(r.seen))) / 2.0
er := (r.prevAllowed + r.allowed) / (r.prevSeen + r.seen)
return sampled, er
}
41 changes: 22 additions & 19 deletions ddtrace/tracer/sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ func TestRateSampler(t *testing.T) {
func TestRateSamplerSetting(t *testing.T) {
assert := assert.New(t)
rs := NewRateSampler(1)
assert.Equal(float64(1), rs.Rate())
assert.Equal(1.0, rs.Rate())
rs.SetRate(0.5)
assert.Equal(float64(0.5), rs.Rate())
assert.Equal(0.5, rs.Rate())
}

func TestRuleEnvVars(t *testing.T) {
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestRulesSampler(t *testing.T) {
result := rs.apply(span)
assert.True(result)
assert.Equal(1.0, span.Metrics["_dd.rule_psr"])
assert.Equal(0.5, span.Metrics["_dd.limit_psr"])
assert.Equal(1.0, span.Metrics["_dd.limit_psr"])
})
}
})
Expand Down Expand Up @@ -352,7 +352,7 @@ func TestRulesSampler(t *testing.T) {
assert.True(result)
assert.Equal(rate, span.Metrics["_dd.rule_psr"])
if rate > 0.0 {
assert.Equal(0.5, span.Metrics["_dd.limit_psr"])
assert.Equal(1.0, span.Metrics["_dd.limit_psr"])
}
})
}
Expand Down Expand Up @@ -404,7 +404,6 @@ func TestRulesSamplerInternals(t *testing.T) {
rs := newRulesSampler(nil)
// set samplingLimiter to specific state
rs.limiter.prevTime = now.Add(-1 * time.Second)
rs.limiter.prevRate = 1.0
rs.limiter.allowed = 1
rs.limiter.seen = 1

Expand All @@ -421,9 +420,8 @@ func TestRulesSamplerInternals(t *testing.T) {
// force sampling limiter to 1.0 spans/sec
rs.limiter.limiter = rate.NewLimiter(rate.Limit(1.0), 1)
rs.limiter.prevTime = now.Add(-1 * time.Second)
rs.limiter.prevRate = 1.0
rs.limiter.allowed = 1
rs.limiter.seen = 1
rs.limiter.allowed = 2
rs.limiter.seen = 2
// first span kept, second dropped
span := makeSpanAt("http.request", "test-service", now)
rs.applyRate(span, 1.0, now)
Expand All @@ -442,24 +440,27 @@ func TestSamplingLimiter(t *testing.T) {
t.Run("resets-every-second", func(t *testing.T) {
assert := assert.New(t)
sl := newRateLimiter()
sl.prevRate = 0.99
sl.prevSeen = 100
sl.prevAllowed = 99
sl.allowed = 42
sl.seen = 100
// exact point it should reset
now := time.Now().Add(1 * time.Second)

sampled, _ := sl.allowOne(now)
assert.True(sampled)
assert.Equal(0.42, sl.prevRate)
assert.Equal(42.0, sl.prevAllowed)
assert.Equal(100.0, sl.prevSeen)
assert.Equal(now, sl.prevTime)
assert.Equal(1, sl.seen)
assert.Equal(1, sl.allowed)
assert.Equal(1.0, sl.seen)
assert.Equal(1.0, sl.allowed)
})

t.Run("averages-rates", func(t *testing.T) {
assert := assert.New(t)
sl := newRateLimiter()
sl.prevRate = 0.42
sl.prevSeen = 100
sl.prevAllowed = 42
sl.allowed = 41
sl.seen = 99
// this event occurs within the current period
Expand All @@ -469,26 +470,28 @@ func TestSamplingLimiter(t *testing.T) {
assert.True(sampled)
assert.Equal(0.42, rate)
assert.Equal(now, sl.prevTime)
assert.Equal(100, sl.seen)
assert.Equal(42, sl.allowed)
assert.Equal(100.0, sl.seen)
assert.Equal(42.0, sl.allowed)

})

t.Run("discards-rate", func(t *testing.T) {
assert := assert.New(t)
sl := newRateLimiter()
sl.prevRate = 0.42
sl.prevSeen = 100
sl.prevAllowed = 42
sl.allowed = 42
sl.seen = 100
// exact point it should discard previous rate
now := time.Now().Add(2 * time.Second)

sampled, _ := sl.allowOne(now)
assert.True(sampled)
assert.Equal(0.0, sl.prevRate)
assert.Equal(0.0, sl.prevSeen)
assert.Equal(0.0, sl.prevAllowed)
assert.Equal(now, sl.prevTime)
assert.Equal(1, sl.seen)
assert.Equal(1, sl.allowed)
assert.Equal(1.0, sl.seen)
assert.Equal(1.0, sl.allowed)
})
}

Expand Down
2 changes: 1 addition & 1 deletion ddtrace/tracer/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (s *span) Format(f fmt.State, c rune) {

const (
keySamplingPriority = "_sampling_priority_v1"
keySamplingPriorityRate = "_sampling_priority_rate_v1"
keySamplingPriorityRate = "_dd.agent_psr"
keyOrigin = "_dd.origin"
keyHostname = "_dd.hostname"
keyRulesSamplerAppliedRate = "_dd.rule_psr"
Expand Down

0 comments on commit 3ed2a27

Please sign in to comment.