Skip to content

Commit

Permalink
Remove smoothing/bucketing in favor of using a sample metric
Browse files Browse the repository at this point in the history
  • Loading branch information
boxofrad committed Apr 27, 2022
1 parent 5c3d87e commit 1f8b1ad
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 97 deletions.
2 changes: 1 addition & 1 deletion api.go
Expand Up @@ -556,7 +556,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
leadershipTransferCh: make(chan *leadershipTransferFuture, 1),
leaderNotifyCh: make(chan struct{}, 1),
followerNotifyCh: make(chan struct{}, 1),
mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second, 5),
mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second),
}

r.conf.Store(*conf)
Expand Down
2 changes: 1 addition & 1 deletion fsm.go
Expand Up @@ -223,7 +223,7 @@ func (r *Raft) runFSM() {
req.respond(err)
}

saturation := newSaturationMetric([]string{"raft", "thread", "fsm", "saturation"}, 1*time.Second, 5)
saturation := newSaturationMetric([]string{"raft", "thread", "fsm", "saturation"}, 1*time.Second)

for {
saturation.sleeping()
Expand Down
79 changes: 21 additions & 58 deletions saturation.go
Expand Up @@ -19,44 +19,32 @@ import (
type saturationMetric struct {
reportInterval time.Duration

// buckets contains measurements for both the current, and a configurable
// number of previous periods (to smooth out spikes).
buckets []saturationMetricBucket
currentBucket int

sleepBegan, workBegan time.Time

// These are overwritten in tests.
nowFn func() time.Time
reportFn func(float32)
}

// saturationMetricBucket contains the measurements for a period.
type saturationMetricBucket struct {
// beginning of the period for which this bucket contains measurements.
beginning time.Time

// slept contains time for which the event processing loop was sleeping rather
// than working.
// than working in the last period.
slept time.Duration

// lost contains time that was lost due to incorrect instrumentation of the
// event processing loop (e.g. calling sleeping() or working() multiple times
// in succession).
// in succession) in the last period.
lost time.Duration

lastReport, sleepBegan, workBegan time.Time

// These are overwritten in tests.
nowFn func() time.Time
reportFn func(float32)
}

// newSaturationMetric creates a saturationMetric that will update the gauge
// with the given name at the given reportInterval. keepPrev determines the
// number of previous measurements that will be used to smooth out spikes.
func newSaturationMetric(name []string, reportInterval time.Duration, keepPrev uint) *saturationMetric {
func newSaturationMetric(name []string, reportInterval time.Duration) *saturationMetric {
m := &saturationMetric{
reportInterval: reportInterval,
buckets: make([]saturationMetricBucket, 1+keepPrev),
nowFn: time.Now,
reportFn: func(sat float32) { metrics.SetGauge(name, sat) },
lastReport: time.Now(),
reportFn: func(sat float32) { metrics.AddSample(name, sat) },
}
m.buckets[0].beginning = time.Now()
return m
}

Expand All @@ -68,7 +56,7 @@ func (s *saturationMetric) sleeping() {
if !s.sleepBegan.IsZero() {
// sleeping called twice in succession. Count that time as lost rather than
// measuring nonsense.
s.buckets[s.currentBucket].lost += now.Sub(s.sleepBegan)
s.lost += now.Sub(s.sleepBegan)
}

s.sleepBegan = now
Expand All @@ -80,20 +68,19 @@ func (s *saturationMetric) sleeping() {
// proceeded by a call to sleeping.
func (s *saturationMetric) working() {
now := s.nowFn()
bucket := &s.buckets[s.currentBucket]

if s.workBegan.IsZero() {
if s.sleepBegan.IsZero() {
// working called before the initial call to sleeping. Count that time as
// lost rather than measuring nonsense.
bucket.lost += now.Sub(bucket.beginning)
s.lost += now.Sub(s.lastReport)
} else {
bucket.slept += now.Sub(s.sleepBegan)
s.slept += now.Sub(s.sleepBegan)
}
} else {
// working called twice in succession. Count that time as lost rather than
// measuring nonsense.
bucket.lost += now.Sub(s.workBegan)
s.lost += now.Sub(s.workBegan)
}

s.workBegan = now
Expand All @@ -104,45 +91,21 @@ func (s *saturationMetric) working() {
// report updates the gauge if reportInterval has passed since our last report.
func (s *saturationMetric) report() {
now := s.nowFn()
timeSinceLastReport := now.Sub(s.buckets[s.currentBucket].beginning)
timeSinceLastReport := now.Sub(s.lastReport)

if timeSinceLastReport < s.reportInterval {
return
}

var (
beginning time.Time
slept, lost time.Duration
)
for _, bucket := range s.buckets {
if bucket.beginning.IsZero() {
continue
}

if beginning.IsZero() || bucket.beginning.Before(beginning) {
beginning = bucket.beginning
}

slept += bucket.slept
lost += bucket.lost
}

total := now.Sub(beginning) - lost

var saturation float64
total := timeSinceLastReport - s.lost
if total != 0 {
saturation = float64(total-slept) / float64(total)
saturation = float64(total-s.slept) / float64(total)
saturation = math.Round(saturation*100) / 100
}
s.reportFn(float32(saturation))

s.currentBucket++
if s.currentBucket >= len(s.buckets) {
s.currentBucket = 0
}

bucket := &s.buckets[s.currentBucket]
bucket.slept = 0
bucket.lost = 0
bucket.beginning = now
s.slept = 0
s.lost = 0
s.lastReport = now
}
45 changes: 8 additions & 37 deletions saturation_test.go
Expand Up @@ -9,9 +9,9 @@ import (

func TestSaturationMetric(t *testing.T) {
t.Run("without smoothing", func(t *testing.T) {
sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond, 0)
sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond)

now := sat.buckets[0].beginning
now := sat.lastReport
sat.nowFn = func() time.Time { return now }

var reported float32
Expand Down Expand Up @@ -46,42 +46,13 @@ func TestSaturationMetric(t *testing.T) {
// Should be 0% saturation.
require.Equal(t, float32(0), reported)
})

t.Run("with smoothing", func(t *testing.T) {
sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond, 1)

now := sat.buckets[0].beginning
sat.nowFn = func() time.Time { return now }

var reported float32
sat.reportFn = func(val float32) { reported = val }

// First report window: 50ms sleeping + 75ms working.
sat.sleeping()

now = now.Add(50 * time.Millisecond)
sat.working()

now = now.Add(75 * time.Millisecond)
sat.sleeping()

// Second report window: 75ms sleeping + 50ms working.
now = now.Add(75 * time.Millisecond)
sat.working()

now = now.Add(50 * time.Millisecond)
sat.sleeping()

// Should average out to 50% saturation.
require.Equal(t, float32(0.5), reported)
})
}

func TestSaturationMetric_IncorrectUsage(t *testing.T) {
t.Run("calling sleeping() consecutively", func(t *testing.T) {
sat := newSaturationMetric([]string{"metric"}, 50*time.Millisecond, 0)
sat := newSaturationMetric([]string{"metric"}, 50*time.Millisecond)

now := sat.buckets[0].beginning
now := sat.lastReport
sat.nowFn = func() time.Time { return now }

var reported float32
Expand Down Expand Up @@ -119,9 +90,9 @@ func TestSaturationMetric_IncorrectUsage(t *testing.T) {
})

t.Run("calling working() consecutively", func(t *testing.T) {
sat := newSaturationMetric([]string{"metric"}, 30*time.Millisecond, 0)
sat := newSaturationMetric([]string{"metric"}, 30*time.Millisecond)

now := sat.buckets[0].beginning
now := sat.lastReport
sat.nowFn = func() time.Time { return now }

var reported float32
Expand Down Expand Up @@ -151,9 +122,9 @@ func TestSaturationMetric_IncorrectUsage(t *testing.T) {
})

t.Run("calling working() first", func(t *testing.T) {
sat := newSaturationMetric([]string{"metric"}, 10*time.Millisecond, 0)
sat := newSaturationMetric([]string{"metric"}, 10*time.Millisecond)

now := sat.buckets[0].beginning.Add(10 * time.Millisecond)
now := sat.lastReport
sat.nowFn = func() time.Time { return now }

var reported float32
Expand Down

0 comments on commit 1f8b1ad

Please sign in to comment.