Skip to content

Commit

Permalink
Incorporate PR feedback
Browse files Browse the repository at this point in the history
- Much simpler implementation based on an accumulator of sleep time.
  We no longer drop samples when the buffer is full.
- We now keep 5 previous measurements to smooth out spikes.
- Rename metrics to `raft.thread.fsm.saturation` and
  `raft.thread.main.saturation`.
- Remove FSM saturation metric from the `Raft` struct.
  • Loading branch information
boxofrad committed Apr 27, 2022
1 parent 8ac5ab2 commit 24fc8b5
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 119 deletions.
6 changes: 2 additions & 4 deletions api.go
Expand Up @@ -208,9 +208,8 @@ type Raft struct {
// followerNotifyCh is used to tell followers that config has changed
followerNotifyCh chan struct{}

// thread saturation metric recorders.
// mainThreadSaturation measures the saturation of the main raft goroutine.
mainThreadSaturation *saturationMetric
fsmThreadSaturation *saturationMetric
}

// BootstrapCluster initializes a server's storage with the given cluster
Expand Down Expand Up @@ -557,8 +556,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
leadershipTransferCh: make(chan *leadershipTransferFuture, 1),
leaderNotifyCh: make(chan struct{}, 1),
followerNotifyCh: make(chan struct{}, 1),
mainThreadSaturation: newSaturationMetric([]string{"raft", "mainThreadSaturation"}, 1*time.Second),
fsmThreadSaturation: newSaturationMetric([]string{"raft", "fsm", "threadSaturation"}, 1*time.Second),
mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second, 5),
}

r.conf.Store(*conf)
Expand Down
8 changes: 5 additions & 3 deletions fsm.go
Expand Up @@ -223,12 +223,14 @@ func (r *Raft) runFSM() {
req.respond(err)
}

saturation := newSaturationMetric([]string{"raft", "thread", "fsm", "saturation"}, 1*time.Second, 5)

for {
r.fsmThreadSaturation.sleeping()
saturation.sleeping()

select {
case ptr := <-r.fsmMutateCh:
r.fsmThreadSaturation.working()
saturation.working()

switch req := ptr.(type) {
case []*commitTuple:
Expand All @@ -242,7 +244,7 @@ func (r *Raft) runFSM() {
}

case req := <-r.fsmSnapshotCh:
r.fsmThreadSaturation.working()
saturation.working()

snapshot(req)

Expand Down
157 changes: 82 additions & 75 deletions saturation.go
@@ -1,7 +1,6 @@
package raft

import (
"math"
"time"

"github.com/armon/go-metrics"
Expand All @@ -17,123 +16,131 @@ import (
// Note: it is expected that the caller is single-threaded and is not safe for
// concurrent use by multiple goroutines.
type saturationMetric struct {
// reportInterval is the maximum frequency at which the gauge will be
// updated (it may be less frequent if the caller is idle).
reportInterval time.Duration

// index of the current sample. We rely on it wrapping-around on overflow and
// underflow, to implement a circular buffer (note the matching size of the
// samples array).
index uint8

// samples is a fixed-size array of samples (similar to a circular buffer)
// where elements at even-numbered indexes contain time spent sleeping, and
// elements at odd-numbered indexes contain time spent working.
samples [math.MaxUint8 + 1]struct {
v time.Duration // measurement
t time.Time // time at which the measurement was captured
}
// buckets contains measurements for both the current, and a configurable
// number of previous periods (to smooth out spikes).
buckets []saturationMetricBucket
currentBucket int

sleepBegan, workBegan time.Time
lastReport time.Time

// These are overwritten in tests.
nowFn func() time.Time
reportFn func(float32)
}

// saturationMetricBucket contains the measurements for a period.
type saturationMetricBucket struct {
// beginning of the period for which this bucket contains measurements.
beginning time.Time

// slept contains time for which the event processing loop was sleeping rather
// than working.
slept time.Duration

// lost contains time that was lost due to incorrect instrumentation of the
// event processing loop (e.g. calling sleeping() or working() multiple times
// in succession).
lost time.Duration
}

// newSaturationMetric creates a saturationMetric that will update the gauge
// with the given name at the given reportInterval.
func newSaturationMetric(name []string, reportInterval time.Duration) *saturationMetric {
return &saturationMetric{
// with the given name at the given reportInterval. keepPrev determines the
// number of previous measurements that will be used to smooth out spikes.
func newSaturationMetric(name []string, reportInterval time.Duration, keepPrev uint) *saturationMetric {
m := &saturationMetric{
reportInterval: reportInterval,
lastReport: time.Now(), // Set to now to avoid reporting immediately.
buckets: make([]saturationMetricBucket, 1+keepPrev),
nowFn: time.Now,
reportFn: func(sat float32) { metrics.SetGauge(name, sat) },
}
m.buckets[0].beginning = time.Now()
return m
}

// sleeping records the time at which the caller began waiting for work. After
// the initial call it must always be proceeded by a call to working.
// sleeping records the time at which the loop began waiting for work. After the
// initial call it must always be proceeded by a call to working.
func (s *saturationMetric) sleeping() {
s.sleepBegan = s.nowFn()

// Caller should've called working (we've probably missed a branch of a
// select). Reset sleepBegan without recording a sample to "lose" time
// instead of recording nonsense data.
if s.index%2 == 1 {
return
}
now := s.nowFn()

if !s.workBegan.IsZero() {
sample := &s.samples[s.index-1]
sample.v = s.sleepBegan.Sub(s.workBegan)
sample.t = s.sleepBegan
if !s.sleepBegan.IsZero() {
// sleeping called twice in succession. Count that time as lost rather than
// measuring nonsense.
s.buckets[s.currentBucket].lost += now.Sub(s.sleepBegan)
}

s.index++
s.sleepBegan = now
s.workBegan = time.Time{}
s.report()
}

// working records the time at which the caller began working. It must always
// be proceeded by a call to sleeping.
// working records the time at which the loop began working. It must always be
// proceeded by a call to sleeping.
func (s *saturationMetric) working() {
s.workBegan = s.nowFn()

// Caller should've called sleeping. Reset workBegan without recording a
// sample to "lose" time instead of recording nonsense data.
if s.index%2 == 0 {
return
now := s.nowFn()
bucket := &s.buckets[s.currentBucket]

if s.workBegan.IsZero() {
if s.sleepBegan.IsZero() {
// working called before the initial call to sleeping. Count that time as
// lost rather than measuring nonsense.
bucket.lost += now.Sub(bucket.beginning)
} else {
bucket.slept += now.Sub(s.sleepBegan)
}
} else {
// working called twice in succession. Count that time as lost rather than
// measuring nonsense.
bucket.lost += now.Sub(s.workBegan)
}

sample := &s.samples[s.index-1]
sample.v = s.workBegan.Sub(s.sleepBegan)
sample.t = s.workBegan

s.index++
s.workBegan = now
s.sleepBegan = time.Time{}
s.report()
}

// report updates the gauge if reportInterval has passed since our last report.
func (s *saturationMetric) report() {
if s.nowFn().Sub(s.lastReport) < s.reportInterval {
now := s.nowFn()
timeSinceLastReport := now.Sub(s.buckets[s.currentBucket].beginning)

if timeSinceLastReport < s.reportInterval {
return
}

workSamples := make([]time.Duration, 0, len(s.samples)/2)
sleepSamples := make([]time.Duration, 0, len(s.samples)/2)

for idx, sample := range s.samples {
if !sample.t.After(s.lastReport) {
var (
beginning time.Time
slept, lost time.Duration
)
for _, bucket := range s.buckets {
if bucket.beginning.IsZero() {
continue
}

if idx%2 == 0 {
sleepSamples = append(sleepSamples, sample.v)
} else {
workSamples = append(workSamples, sample.v)
if beginning.IsZero() || bucket.beginning.Before(beginning) {
beginning = bucket.beginning
}
}

// Ensure we take an equal number of work and sleep samples to avoid
// over/under reporting.
take := len(workSamples)
if len(sleepSamples) < take {
take = len(sleepSamples)
slept += bucket.slept
lost += bucket.lost
}

total := now.Sub(beginning) - lost

var saturation float32
if take != 0 {
var work, sleep time.Duration
for _, s := range workSamples[0:take] {
work += s
}
for _, s := range sleepSamples[0:take] {
sleep += s
}
saturation = float32(work) / float32(work+sleep)
if total != 0 {
saturation = float32(total-slept) / float32(total)
}

s.reportFn(saturation)
s.lastReport = s.nowFn()

s.currentBucket++
if s.currentBucket >= len(s.buckets) {
s.currentBucket = 0
}

bucket := &s.buckets[s.currentBucket]
bucket.slept = 0
bucket.lost = 0
bucket.beginning = now
}

0 comments on commit 24fc8b5

Please sign in to comment.