From 8ac5ab222090787181e0acac971d5a780f82a1e0 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Wed, 2 Feb 2022 13:11:45 +0000 Subject: [PATCH 1/5] Thread saturation metrics Adds metrics suggested in #488, to record the percentage of time the main and FSM goroutines are busy with work vs available to accept new work, to give operators an idea of how close they are to hitting capacity limits. We keep 256 samples in memory for each metric, and update gauges (at most) once a second, possibly less if the goroutines are idle. This should be ok because it's unlikely that a goroutine would go from very high saturation to being completely idle (so at worst we'll leave the gauge on the previous low value). --- api.go | 6 ++ fsm.go | 6 ++ raft.go | 52 ++++++++++++++--- saturation.go | 139 +++++++++++++++++++++++++++++++++++++++++++++ saturation_test.go | 133 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 328 insertions(+), 8 deletions(-) create mode 100644 saturation.go create mode 100644 saturation_test.go diff --git a/api.go b/api.go index 5c17bd6df..4c021659d 100644 --- a/api.go +++ b/api.go @@ -207,6 +207,10 @@ type Raft struct { // followerNotifyCh is used to tell followers that config has changed followerNotifyCh chan struct{} + + // thread saturation metric recorders. + mainThreadSaturation *saturationMetric + fsmThreadSaturation *saturationMetric } // BootstrapCluster initializes a server's storage with the given cluster @@ -553,6 +557,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna leadershipTransferCh: make(chan *leadershipTransferFuture, 1), leaderNotifyCh: make(chan struct{}, 1), followerNotifyCh: make(chan struct{}, 1), + mainThreadSaturation: newSaturationMetric([]string{"raft", "mainThreadSaturation"}, 1*time.Second), + fsmThreadSaturation: newSaturationMetric([]string{"raft", "fsm", "threadSaturation"}, 1*time.Second), } r.conf.Store(*conf) diff --git a/fsm.go b/fsm.go index 487cb4b78..0ff888317 100644 --- a/fsm.go +++ b/fsm.go @@ -224,8 +224,12 @@ func (r *Raft) runFSM() { } for { + r.fsmThreadSaturation.sleeping() + select { case ptr := <-r.fsmMutateCh: + r.fsmThreadSaturation.working() + switch req := ptr.(type) { case []*commitTuple: applyBatch(req) @@ -238,6 +242,8 @@ func (r *Raft) runFSM() { } case req := <-r.fsmSnapshotCh: + r.fsmThreadSaturation.working() + snapshot(req) case <-r.shutdownCh: diff --git a/raft.go b/raft.go index 4b85ac1ef..a509b3b3d 100644 --- a/raft.go +++ b/raft.go @@ -159,35 +159,45 @@ func (r *Raft) runFollower() { heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout) for r.getState() == Follower { + r.mainThreadSaturation.sleeping() + select { case rpc := <-r.rpcCh: + r.mainThreadSaturation.working() r.processRPC(rpc) case c := <-r.configurationChangeCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader c.respond(ErrNotLeader) case a := <-r.applyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader a.respond(ErrNotLeader) case v := <-r.verifyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader v.respond(ErrNotLeader) - case r := <-r.userRestoreCh: + case ur := <-r.userRestoreCh: + r.mainThreadSaturation.working() // Reject any restores since we are not the leader - r.respond(ErrNotLeader) + ur.respond(ErrNotLeader) - case r := <-r.leadershipTransferCh: + case l := <-r.leadershipTransferCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader - r.respond(ErrNotLeader) + l.respond(ErrNotLeader) case c := <-r.configurationsCh: + r.mainThreadSaturation.working() c.configurations = r.configurations.Clone() c.respond(nil) case b := <-r.bootstrapCh: + r.mainThreadSaturation.working() b.respond(r.liveBootstrap(b.configuration)) case <-r.leaderNotifyCh: @@ -197,6 +207,7 @@ func (r *Raft) runFollower() { heartbeatTimer = time.After(0) case <-heartbeatTimer: + r.mainThreadSaturation.working() // Restart the heartbeat timer hbTimeout := r.config().HeartbeatTimeout heartbeatTimer = randomTimeout(hbTimeout) @@ -290,11 +301,15 @@ func (r *Raft) runCandidate() { r.logger.Debug("votes", "needed", votesNeeded) for r.getState() == Candidate { + r.mainThreadSaturation.sleeping() + select { case rpc := <-r.rpcCh: + r.mainThreadSaturation.working() r.processRPC(rpc) case vote := <-voteCh: + r.mainThreadSaturation.working() // Check if the term is greater than ours, bail if vote.Term > r.getCurrentTerm() { r.logger.Debug("newer term discovered, fallback to follower") @@ -318,30 +333,37 @@ func (r *Raft) runCandidate() { } case c := <-r.configurationChangeCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader c.respond(ErrNotLeader) case a := <-r.applyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader a.respond(ErrNotLeader) case v := <-r.verifyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader v.respond(ErrNotLeader) - case r := <-r.userRestoreCh: + case ur := <-r.userRestoreCh: + r.mainThreadSaturation.working() // Reject any restores since we are not the leader - r.respond(ErrNotLeader) + ur.respond(ErrNotLeader) - case r := <-r.leadershipTransferCh: + case l := <-r.leadershipTransferCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader - r.respond(ErrNotLeader) + l.respond(ErrNotLeader) case c := <-r.configurationsCh: + r.mainThreadSaturation.working() c.configurations = r.configurations.Clone() c.respond(nil) case b := <-r.bootstrapCh: + r.mainThreadSaturation.working() b.respond(ErrCantBootstrap) case <-r.leaderNotifyCh: @@ -354,6 +376,7 @@ func (r *Raft) runCandidate() { } case <-electionTimer: + r.mainThreadSaturation.working() // Election failed! Restart the election. We simply return, // which will kick us back into runCandidate r.logger.Warn("Election timeout reached, restarting election") @@ -598,14 +621,19 @@ func (r *Raft) leaderLoop() { lease := time.After(r.config().LeaderLeaseTimeout) for r.getState() == Leader { + r.mainThreadSaturation.sleeping() + select { case rpc := <-r.rpcCh: + r.mainThreadSaturation.working() r.processRPC(rpc) case <-r.leaderState.stepDown: + r.mainThreadSaturation.working() r.setState(Follower) case future := <-r.leadershipTransferCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -686,6 +714,7 @@ func (r *Raft) leaderLoop() { go r.leadershipTransfer(*id, *address, state, stopCh, doneCh) case <-r.leaderState.commitCh: + r.mainThreadSaturation.working() // Process the newly committed entries oldCommitIndex := r.getCommitIndex() commitIndex := r.leaderState.commitment.getCommitIndex() @@ -748,6 +777,7 @@ func (r *Raft) leaderLoop() { } case v := <-r.verifyCh: + r.mainThreadSaturation.working() if v.quorumSize == 0 { // Just dispatched, start the verification r.verifyLeader(v) @@ -772,6 +802,7 @@ func (r *Raft) leaderLoop() { } case future := <-r.userRestoreCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -781,6 +812,7 @@ func (r *Raft) leaderLoop() { future.respond(err) case future := <-r.configurationsCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -790,6 +822,7 @@ func (r *Raft) leaderLoop() { future.respond(nil) case future := <-r.configurationChangeChIfStable(): + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -798,9 +831,11 @@ func (r *Raft) leaderLoop() { r.appendConfigurationEntry(future) case b := <-r.bootstrapCh: + r.mainThreadSaturation.working() b.respond(ErrCantBootstrap) case newLog := <-r.applyCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) newLog.respond(ErrLeadershipTransferInProgress) @@ -829,6 +864,7 @@ func (r *Raft) leaderLoop() { } case <-lease: + r.mainThreadSaturation.working() // Check if we've exceeded the lease, potentially stepping down maxDiff := r.checkLeaderLease() diff --git a/saturation.go b/saturation.go new file mode 100644 index 000000000..48e480a02 --- /dev/null +++ b/saturation.go @@ -0,0 +1,139 @@ +package raft + +import ( + "math" + "time" + + "github.com/armon/go-metrics" +) + +// saturationMetric measures the saturation (percentage of time spent working vs +// waiting for work) of an event processing loop, such as runFSM. It reports the +// saturation as a gauge metric (at most) once every reportInterval. +// +// Callers must instrument their loop with calls to sleeping and working, starting +// with a call to sleeping. +// +// Note: it is expected that the caller is single-threaded and is not safe for +// concurrent use by multiple goroutines. +type saturationMetric struct { + // reportInterval is the maximum frequency at which the gauge will be + // updated (it may be less frequent if the caller is idle). + reportInterval time.Duration + + // index of the current sample. We rely on it wrapping-around on overflow and + // underflow, to implement a circular buffer (note the matching size of the + // samples array). + index uint8 + + // samples is a fixed-size array of samples (similar to a circular buffer) + // where elements at even-numbered indexes contain time spent sleeping, and + // elements at odd-numbered indexes contain time spent working. + samples [math.MaxUint8 + 1]struct { + v time.Duration // measurement + t time.Time // time at which the measurement was captured + } + + sleepBegan, workBegan time.Time + lastReport time.Time + + // These are overwritten in tests. + nowFn func() time.Time + reportFn func(float32) +} + +// newSaturationMetric creates a saturationMetric that will update the gauge +// with the given name at the given reportInterval. +func newSaturationMetric(name []string, reportInterval time.Duration) *saturationMetric { + return &saturationMetric{ + reportInterval: reportInterval, + lastReport: time.Now(), // Set to now to avoid reporting immediately. + nowFn: time.Now, + reportFn: func(sat float32) { metrics.SetGauge(name, sat) }, + } +} + +// sleeping records the time at which the caller began waiting for work. After +// the initial call it must always be proceeded by a call to working. +func (s *saturationMetric) sleeping() { + s.sleepBegan = s.nowFn() + + // Caller should've called working (we've probably missed a branch of a + // select). Reset sleepBegan without recording a sample to "lose" time + // instead of recording nonsense data. + if s.index%2 == 1 { + return + } + + if !s.workBegan.IsZero() { + sample := &s.samples[s.index-1] + sample.v = s.sleepBegan.Sub(s.workBegan) + sample.t = s.sleepBegan + } + + s.index++ + s.report() +} + +// working records the time at which the caller began working. It must always +// be proceeded by a call to sleeping. +func (s *saturationMetric) working() { + s.workBegan = s.nowFn() + + // Caller should've called sleeping. Reset workBegan without recording a + // sample to "lose" time instead of recording nonsense data. + if s.index%2 == 0 { + return + } + + sample := &s.samples[s.index-1] + sample.v = s.workBegan.Sub(s.sleepBegan) + sample.t = s.workBegan + + s.index++ + s.report() +} + +// report updates the gauge if reportInterval has passed since our last report. +func (s *saturationMetric) report() { + if s.nowFn().Sub(s.lastReport) < s.reportInterval { + return + } + + workSamples := make([]time.Duration, 0, len(s.samples)/2) + sleepSamples := make([]time.Duration, 0, len(s.samples)/2) + + for idx, sample := range s.samples { + if !sample.t.After(s.lastReport) { + continue + } + + if idx%2 == 0 { + sleepSamples = append(sleepSamples, sample.v) + } else { + workSamples = append(workSamples, sample.v) + } + } + + // Ensure we take an equal number of work and sleep samples to avoid + // over/under reporting. + take := len(workSamples) + if len(sleepSamples) < take { + take = len(sleepSamples) + } + + var saturation float32 + if take != 0 { + var work, sleep time.Duration + for _, s := range workSamples[0:take] { + work += s + } + for _, s := range sleepSamples[0:take] { + sleep += s + } + saturation = float32(work) / float32(work+sleep) + } + + s.reportFn(saturation) + s.lastReport = s.nowFn() +} diff --git a/saturation_test.go b/saturation_test.go new file mode 100644 index 000000000..c576e2e1b --- /dev/null +++ b/saturation_test.go @@ -0,0 +1,133 @@ +package raft + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSaturationMetric(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond) + + var now time.Time + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(val float32) { reported = val } + + now = time.Now() + sat.sleeping() + + now = now.Add(50 * time.Millisecond) + sat.working() + + now = now.Add(75 * time.Millisecond) + sat.sleeping() + + require.Equal(t, float32(0.6), reported) + + now = now.Add(90 * time.Millisecond) + sat.working() + + now = now.Add(10 * time.Millisecond) + sat.sleeping() + + require.Equal(t, float32(0.1), reported) + + now = now.Add(100 * time.Millisecond) + sat.working() + + require.Equal(t, float32(0), reported) +} + +func TestSaturationMetric_IndexWraparound(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond) + + now := time.Now() + sat.nowFn = func() time.Time { return now } + + for i := 0; i < 1024; i++ { + now = now.Add(25 * time.Millisecond) + + if i%2 == 0 { + require.NotPanics(t, sat.sleeping) + } else { + require.NotPanics(t, sat.working) + } + } +} + +func TestSaturationMetric_IncorrectUsage(t *testing.T) { + t.Run("calling sleeping() consecutively", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 50*time.Millisecond) + + now := time.Now() + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(v float32) { reported = v } + + // Calling sleeping() consecutively should reset sleepBegan without recording + // a sample, such that we "lose" time rather than recording nonsense data. + // + // 0 | sleeping() | + // => Sleeping (10ms) + // +10ms | working() | + // => Working (10ms) + // +20ms | sleeping() | + // => [!] LOST [!] (10ms) + // +30ms | sleeping() | + // => Sleeping (10ms) + // +40ms | working() | + // => Working (10ms) + // +50ms | sleeping() | + // + // Total reportable time: 40ms. Saturation: 50%. + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + + require.Equal(t, float32(0.5), reported) + }) + + t.Run("calling working() consecutively", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 30*time.Millisecond) + + now := time.Now() + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(v float32) { reported = v } + + // Calling working() consecutively should reset workBegan without recording + // a sample, such that we "lose" time rather than recording nonsense data. + // + // 0 | sleeping() | + // => Sleeping (10ms) + // +10ms | working() | + // => [!] LOST [!] (10ms) + // +20ms | working() | + // => Working (10ms) + // +30ms | sleeping() | + // + // Total reportable time: 20ms. Saturation: 50%. + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + + require.Equal(t, float32(0.5), reported) + }) +} From 24fc8b576bf2712bc17ea67a23210ab74e07bbf8 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Thu, 3 Feb 2022 11:52:31 +0000 Subject: [PATCH 2/5] Incorporate PR feedback - Much simpler implementation based on an accumulator of sleep time. We no longer drop samples when the buffer is full. - We now keep 5 previous measurements to smooth out spikes. - Rename metrics to `raft.thread.fsm.saturation` and `raft.thread.main.saturation`. - Remove FSM saturation metric from the `Raft` struct. --- api.go | 6 +- fsm.go | 8 ++- saturation.go | 157 +++++++++++++++++++++++---------------------- saturation_test.go | 114 +++++++++++++++++++++----------- 4 files changed, 166 insertions(+), 119 deletions(-) diff --git a/api.go b/api.go index 4c021659d..79a97c614 100644 --- a/api.go +++ b/api.go @@ -208,9 +208,8 @@ type Raft struct { // followerNotifyCh is used to tell followers that config has changed followerNotifyCh chan struct{} - // thread saturation metric recorders. + // mainThreadSaturation measures the saturation of the main raft goroutine. mainThreadSaturation *saturationMetric - fsmThreadSaturation *saturationMetric } // BootstrapCluster initializes a server's storage with the given cluster @@ -557,8 +556,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna leadershipTransferCh: make(chan *leadershipTransferFuture, 1), leaderNotifyCh: make(chan struct{}, 1), followerNotifyCh: make(chan struct{}, 1), - mainThreadSaturation: newSaturationMetric([]string{"raft", "mainThreadSaturation"}, 1*time.Second), - fsmThreadSaturation: newSaturationMetric([]string{"raft", "fsm", "threadSaturation"}, 1*time.Second), + mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second, 5), } r.conf.Store(*conf) diff --git a/fsm.go b/fsm.go index 0ff888317..cfed3450c 100644 --- a/fsm.go +++ b/fsm.go @@ -223,12 +223,14 @@ func (r *Raft) runFSM() { req.respond(err) } + saturation := newSaturationMetric([]string{"raft", "thread", "fsm", "saturation"}, 1*time.Second, 5) + for { - r.fsmThreadSaturation.sleeping() + saturation.sleeping() select { case ptr := <-r.fsmMutateCh: - r.fsmThreadSaturation.working() + saturation.working() switch req := ptr.(type) { case []*commitTuple: @@ -242,7 +244,7 @@ func (r *Raft) runFSM() { } case req := <-r.fsmSnapshotCh: - r.fsmThreadSaturation.working() + saturation.working() snapshot(req) diff --git a/saturation.go b/saturation.go index 48e480a02..ffc537174 100644 --- a/saturation.go +++ b/saturation.go @@ -1,7 +1,6 @@ package raft import ( - "math" "time" "github.com/armon/go-metrics" @@ -17,123 +16,131 @@ import ( // Note: it is expected that the caller is single-threaded and is not safe for // concurrent use by multiple goroutines. type saturationMetric struct { - // reportInterval is the maximum frequency at which the gauge will be - // updated (it may be less frequent if the caller is idle). reportInterval time.Duration - // index of the current sample. We rely on it wrapping-around on overflow and - // underflow, to implement a circular buffer (note the matching size of the - // samples array). - index uint8 - - // samples is a fixed-size array of samples (similar to a circular buffer) - // where elements at even-numbered indexes contain time spent sleeping, and - // elements at odd-numbered indexes contain time spent working. - samples [math.MaxUint8 + 1]struct { - v time.Duration // measurement - t time.Time // time at which the measurement was captured - } + // buckets contains measurements for both the current, and a configurable + // number of previous periods (to smooth out spikes). + buckets []saturationMetricBucket + currentBucket int sleepBegan, workBegan time.Time - lastReport time.Time // These are overwritten in tests. nowFn func() time.Time reportFn func(float32) } +// saturationMetricBucket contains the measurements for a period. +type saturationMetricBucket struct { + // beginning of the period for which this bucket contains measurements. + beginning time.Time + + // slept contains time for which the event processing loop was sleeping rather + // than working. + slept time.Duration + + // lost contains time that was lost due to incorrect instrumentation of the + // event processing loop (e.g. calling sleeping() or working() multiple times + // in succession). + lost time.Duration +} + // newSaturationMetric creates a saturationMetric that will update the gauge -// with the given name at the given reportInterval. -func newSaturationMetric(name []string, reportInterval time.Duration) *saturationMetric { - return &saturationMetric{ +// with the given name at the given reportInterval. keepPrev determines the +// number of previous measurements that will be used to smooth out spikes. +func newSaturationMetric(name []string, reportInterval time.Duration, keepPrev uint) *saturationMetric { + m := &saturationMetric{ reportInterval: reportInterval, - lastReport: time.Now(), // Set to now to avoid reporting immediately. + buckets: make([]saturationMetricBucket, 1+keepPrev), nowFn: time.Now, reportFn: func(sat float32) { metrics.SetGauge(name, sat) }, } + m.buckets[0].beginning = time.Now() + return m } -// sleeping records the time at which the caller began waiting for work. After -// the initial call it must always be proceeded by a call to working. +// sleeping records the time at which the loop began waiting for work. After the +// initial call it must always be proceeded by a call to working. func (s *saturationMetric) sleeping() { - s.sleepBegan = s.nowFn() - - // Caller should've called working (we've probably missed a branch of a - // select). Reset sleepBegan without recording a sample to "lose" time - // instead of recording nonsense data. - if s.index%2 == 1 { - return - } + now := s.nowFn() - if !s.workBegan.IsZero() { - sample := &s.samples[s.index-1] - sample.v = s.sleepBegan.Sub(s.workBegan) - sample.t = s.sleepBegan + if !s.sleepBegan.IsZero() { + // sleeping called twice in succession. Count that time as lost rather than + // measuring nonsense. + s.buckets[s.currentBucket].lost += now.Sub(s.sleepBegan) } - s.index++ + s.sleepBegan = now + s.workBegan = time.Time{} s.report() } -// working records the time at which the caller began working. It must always -// be proceeded by a call to sleeping. +// working records the time at which the loop began working. It must always be +// proceeded by a call to sleeping. func (s *saturationMetric) working() { - s.workBegan = s.nowFn() - - // Caller should've called sleeping. Reset workBegan without recording a - // sample to "lose" time instead of recording nonsense data. - if s.index%2 == 0 { - return + now := s.nowFn() + bucket := &s.buckets[s.currentBucket] + + if s.workBegan.IsZero() { + if s.sleepBegan.IsZero() { + // working called before the initial call to sleeping. Count that time as + // lost rather than measuring nonsense. + bucket.lost += now.Sub(bucket.beginning) + } else { + bucket.slept += now.Sub(s.sleepBegan) + } + } else { + // working called twice in succession. Count that time as lost rather than + // measuring nonsense. + bucket.lost += now.Sub(s.workBegan) } - sample := &s.samples[s.index-1] - sample.v = s.workBegan.Sub(s.sleepBegan) - sample.t = s.workBegan - - s.index++ + s.workBegan = now + s.sleepBegan = time.Time{} s.report() } // report updates the gauge if reportInterval has passed since our last report. func (s *saturationMetric) report() { - if s.nowFn().Sub(s.lastReport) < s.reportInterval { + now := s.nowFn() + timeSinceLastReport := now.Sub(s.buckets[s.currentBucket].beginning) + + if timeSinceLastReport < s.reportInterval { return } - workSamples := make([]time.Duration, 0, len(s.samples)/2) - sleepSamples := make([]time.Duration, 0, len(s.samples)/2) - - for idx, sample := range s.samples { - if !sample.t.After(s.lastReport) { + var ( + beginning time.Time + slept, lost time.Duration + ) + for _, bucket := range s.buckets { + if bucket.beginning.IsZero() { continue } - if idx%2 == 0 { - sleepSamples = append(sleepSamples, sample.v) - } else { - workSamples = append(workSamples, sample.v) + if beginning.IsZero() || bucket.beginning.Before(beginning) { + beginning = bucket.beginning } - } - // Ensure we take an equal number of work and sleep samples to avoid - // over/under reporting. - take := len(workSamples) - if len(sleepSamples) < take { - take = len(sleepSamples) + slept += bucket.slept + lost += bucket.lost } + total := now.Sub(beginning) - lost + var saturation float32 - if take != 0 { - var work, sleep time.Duration - for _, s := range workSamples[0:take] { - work += s - } - for _, s := range sleepSamples[0:take] { - sleep += s - } - saturation = float32(work) / float32(work+sleep) + if total != 0 { + saturation = float32(total-slept) / float32(total) } - s.reportFn(saturation) - s.lastReport = s.nowFn() + + s.currentBucket++ + if s.currentBucket >= len(s.buckets) { + s.currentBucket = 0 + } + + bucket := &s.buckets[s.currentBucket] + bucket.slept = 0 + bucket.lost = 0 + bucket.beginning = now } diff --git a/saturation_test.go b/saturation_test.go index c576e2e1b..572ccaef9 100644 --- a/saturation_test.go +++ b/saturation_test.go @@ -8,61 +8,80 @@ import ( ) func TestSaturationMetric(t *testing.T) { - sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond) + t.Run("without smoothing", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond, 0) - var now time.Time - sat.nowFn = func() time.Time { return now } + now := sat.buckets[0].beginning + sat.nowFn = func() time.Time { return now } - var reported float32 - sat.reportFn = func(val float32) { reported = val } + var reported float32 + sat.reportFn = func(val float32) { reported = val } - now = time.Now() - sat.sleeping() + sat.sleeping() - now = now.Add(50 * time.Millisecond) - sat.working() + // First window: 50ms sleeping + 75ms working. + now = now.Add(50 * time.Millisecond) + sat.working() - now = now.Add(75 * time.Millisecond) - sat.sleeping() + now = now.Add(75 * time.Millisecond) + sat.sleeping() - require.Equal(t, float32(0.6), reported) + // Should be 60% saturation. + require.Equal(t, float32(0.6), reported) - now = now.Add(90 * time.Millisecond) - sat.working() + // Second window: 90ms sleeping + 10ms working. + now = now.Add(90 * time.Millisecond) + sat.working() - now = now.Add(10 * time.Millisecond) - sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.sleeping() - require.Equal(t, float32(0.1), reported) + // Should be 10% saturation. + require.Equal(t, float32(0.1), reported) - now = now.Add(100 * time.Millisecond) - sat.working() + // Third window: 100ms sleeping + 0ms working. + now = now.Add(100 * time.Millisecond) + sat.working() - require.Equal(t, float32(0), reported) -} + // Should be 0% saturation. + require.Equal(t, float32(0), reported) + }) + + t.Run("with smoothing", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond, 1) + + now := sat.buckets[0].beginning + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(val float32) { reported = val } -func TestSaturationMetric_IndexWraparound(t *testing.T) { - sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond) + // First report window: 50ms sleeping + 75ms working. + sat.sleeping() - now := time.Now() - sat.nowFn = func() time.Time { return now } + now = now.Add(50 * time.Millisecond) + sat.working() - for i := 0; i < 1024; i++ { - now = now.Add(25 * time.Millisecond) + now = now.Add(75 * time.Millisecond) + sat.sleeping() - if i%2 == 0 { - require.NotPanics(t, sat.sleeping) - } else { - require.NotPanics(t, sat.working) - } - } + // Second report window: 75ms sleeping + 50ms working. + now = now.Add(75 * time.Millisecond) + sat.working() + + now = now.Add(50 * time.Millisecond) + sat.sleeping() + + // Should average out to 50% saturation. + require.Equal(t, float32(0.5), reported) + }) } func TestSaturationMetric_IncorrectUsage(t *testing.T) { t.Run("calling sleeping() consecutively", func(t *testing.T) { - sat := newSaturationMetric([]string{"metric"}, 50*time.Millisecond) + sat := newSaturationMetric([]string{"metric"}, 50*time.Millisecond, 0) - now := time.Now() + now := sat.buckets[0].beginning sat.nowFn = func() time.Time { return now } var reported float32 @@ -100,9 +119,9 @@ func TestSaturationMetric_IncorrectUsage(t *testing.T) { }) t.Run("calling working() consecutively", func(t *testing.T) { - sat := newSaturationMetric([]string{"metric"}, 30*time.Millisecond) + sat := newSaturationMetric([]string{"metric"}, 30*time.Millisecond, 0) - now := time.Now() + now := sat.buckets[0].beginning sat.nowFn = func() time.Time { return now } var reported float32 @@ -130,4 +149,25 @@ func TestSaturationMetric_IncorrectUsage(t *testing.T) { require.Equal(t, float32(0.5), reported) }) + + t.Run("calling working() first", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 10*time.Millisecond, 0) + + now := sat.buckets[0].beginning.Add(10 * time.Millisecond) + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(v float32) { reported = v } + + // Time from start until working() is treated as lost. + sat.working() + require.Equal(t, float32(0), reported) + + sat.sleeping() + now = now.Add(5 * time.Millisecond) + sat.working() + now = now.Add(5 * time.Millisecond) + sat.sleeping() + require.Equal(t, float32(0.5), reported) + }) } From 5c3d87e3e64510c54f431cb16809001628b831bb Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Thu, 10 Feb 2022 16:16:23 +0000 Subject: [PATCH 3/5] Round saturation to 2dp to report integer percentages --- saturation.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/saturation.go b/saturation.go index ffc537174..38ba2a93c 100644 --- a/saturation.go +++ b/saturation.go @@ -1,6 +1,7 @@ package raft import ( + "math" "time" "github.com/armon/go-metrics" @@ -128,11 +129,12 @@ func (s *saturationMetric) report() { total := now.Sub(beginning) - lost - var saturation float32 + var saturation float64 if total != 0 { - saturation = float32(total-slept) / float32(total) + saturation = float64(total-slept) / float64(total) + saturation = math.Round(saturation*100) / 100 } - s.reportFn(saturation) + s.reportFn(float32(saturation)) s.currentBucket++ if s.currentBucket >= len(s.buckets) { From 1f8b1adf9116ddd21df84985a2ecf16f23ba7fca Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Wed, 27 Apr 2022 14:22:06 +0100 Subject: [PATCH 4/5] Remove smoothing/bucketing in favor of using a sample metric --- api.go | 2 +- fsm.go | 2 +- saturation.go | 79 ++++++++++++---------------------------------- saturation_test.go | 45 +++++--------------------- 4 files changed, 31 insertions(+), 97 deletions(-) diff --git a/api.go b/api.go index 79a97c614..eda33c3f0 100644 --- a/api.go +++ b/api.go @@ -556,7 +556,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna leadershipTransferCh: make(chan *leadershipTransferFuture, 1), leaderNotifyCh: make(chan struct{}, 1), followerNotifyCh: make(chan struct{}, 1), - mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second, 5), + mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second), } r.conf.Store(*conf) diff --git a/fsm.go b/fsm.go index cfed3450c..6d26a9a28 100644 --- a/fsm.go +++ b/fsm.go @@ -223,7 +223,7 @@ func (r *Raft) runFSM() { req.respond(err) } - saturation := newSaturationMetric([]string{"raft", "thread", "fsm", "saturation"}, 1*time.Second, 5) + saturation := newSaturationMetric([]string{"raft", "thread", "fsm", "saturation"}, 1*time.Second) for { saturation.sleeping() diff --git a/saturation.go b/saturation.go index 38ba2a93c..0124016eb 100644 --- a/saturation.go +++ b/saturation.go @@ -19,44 +19,32 @@ import ( type saturationMetric struct { reportInterval time.Duration - // buckets contains measurements for both the current, and a configurable - // number of previous periods (to smooth out spikes). - buckets []saturationMetricBucket - currentBucket int - - sleepBegan, workBegan time.Time - - // These are overwritten in tests. - nowFn func() time.Time - reportFn func(float32) -} - -// saturationMetricBucket contains the measurements for a period. -type saturationMetricBucket struct { - // beginning of the period for which this bucket contains measurements. - beginning time.Time - // slept contains time for which the event processing loop was sleeping rather - // than working. + // than working in the last period. slept time.Duration // lost contains time that was lost due to incorrect instrumentation of the // event processing loop (e.g. calling sleeping() or working() multiple times - // in succession). + // in succession) in the last period. lost time.Duration + + lastReport, sleepBegan, workBegan time.Time + + // These are overwritten in tests. + nowFn func() time.Time + reportFn func(float32) } // newSaturationMetric creates a saturationMetric that will update the gauge // with the given name at the given reportInterval. keepPrev determines the // number of previous measurements that will be used to smooth out spikes. -func newSaturationMetric(name []string, reportInterval time.Duration, keepPrev uint) *saturationMetric { +func newSaturationMetric(name []string, reportInterval time.Duration) *saturationMetric { m := &saturationMetric{ reportInterval: reportInterval, - buckets: make([]saturationMetricBucket, 1+keepPrev), nowFn: time.Now, - reportFn: func(sat float32) { metrics.SetGauge(name, sat) }, + lastReport: time.Now(), + reportFn: func(sat float32) { metrics.AddSample(name, sat) }, } - m.buckets[0].beginning = time.Now() return m } @@ -68,7 +56,7 @@ func (s *saturationMetric) sleeping() { if !s.sleepBegan.IsZero() { // sleeping called twice in succession. Count that time as lost rather than // measuring nonsense. - s.buckets[s.currentBucket].lost += now.Sub(s.sleepBegan) + s.lost += now.Sub(s.sleepBegan) } s.sleepBegan = now @@ -80,20 +68,19 @@ func (s *saturationMetric) sleeping() { // proceeded by a call to sleeping. func (s *saturationMetric) working() { now := s.nowFn() - bucket := &s.buckets[s.currentBucket] if s.workBegan.IsZero() { if s.sleepBegan.IsZero() { // working called before the initial call to sleeping. Count that time as // lost rather than measuring nonsense. - bucket.lost += now.Sub(bucket.beginning) + s.lost += now.Sub(s.lastReport) } else { - bucket.slept += now.Sub(s.sleepBegan) + s.slept += now.Sub(s.sleepBegan) } } else { // working called twice in succession. Count that time as lost rather than // measuring nonsense. - bucket.lost += now.Sub(s.workBegan) + s.lost += now.Sub(s.workBegan) } s.workBegan = now @@ -104,45 +91,21 @@ func (s *saturationMetric) working() { // report updates the gauge if reportInterval has passed since our last report. func (s *saturationMetric) report() { now := s.nowFn() - timeSinceLastReport := now.Sub(s.buckets[s.currentBucket].beginning) + timeSinceLastReport := now.Sub(s.lastReport) if timeSinceLastReport < s.reportInterval { return } - var ( - beginning time.Time - slept, lost time.Duration - ) - for _, bucket := range s.buckets { - if bucket.beginning.IsZero() { - continue - } - - if beginning.IsZero() || bucket.beginning.Before(beginning) { - beginning = bucket.beginning - } - - slept += bucket.slept - lost += bucket.lost - } - - total := now.Sub(beginning) - lost - var saturation float64 + total := timeSinceLastReport - s.lost if total != 0 { - saturation = float64(total-slept) / float64(total) + saturation = float64(total-s.slept) / float64(total) saturation = math.Round(saturation*100) / 100 } s.reportFn(float32(saturation)) - s.currentBucket++ - if s.currentBucket >= len(s.buckets) { - s.currentBucket = 0 - } - - bucket := &s.buckets[s.currentBucket] - bucket.slept = 0 - bucket.lost = 0 - bucket.beginning = now + s.slept = 0 + s.lost = 0 + s.lastReport = now } diff --git a/saturation_test.go b/saturation_test.go index 572ccaef9..35046f347 100644 --- a/saturation_test.go +++ b/saturation_test.go @@ -9,9 +9,9 @@ import ( func TestSaturationMetric(t *testing.T) { t.Run("without smoothing", func(t *testing.T) { - sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond, 0) + sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond) - now := sat.buckets[0].beginning + now := sat.lastReport sat.nowFn = func() time.Time { return now } var reported float32 @@ -46,42 +46,13 @@ func TestSaturationMetric(t *testing.T) { // Should be 0% saturation. require.Equal(t, float32(0), reported) }) - - t.Run("with smoothing", func(t *testing.T) { - sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond, 1) - - now := sat.buckets[0].beginning - sat.nowFn = func() time.Time { return now } - - var reported float32 - sat.reportFn = func(val float32) { reported = val } - - // First report window: 50ms sleeping + 75ms working. - sat.sleeping() - - now = now.Add(50 * time.Millisecond) - sat.working() - - now = now.Add(75 * time.Millisecond) - sat.sleeping() - - // Second report window: 75ms sleeping + 50ms working. - now = now.Add(75 * time.Millisecond) - sat.working() - - now = now.Add(50 * time.Millisecond) - sat.sleeping() - - // Should average out to 50% saturation. - require.Equal(t, float32(0.5), reported) - }) } func TestSaturationMetric_IncorrectUsage(t *testing.T) { t.Run("calling sleeping() consecutively", func(t *testing.T) { - sat := newSaturationMetric([]string{"metric"}, 50*time.Millisecond, 0) + sat := newSaturationMetric([]string{"metric"}, 50*time.Millisecond) - now := sat.buckets[0].beginning + now := sat.lastReport sat.nowFn = func() time.Time { return now } var reported float32 @@ -119,9 +90,9 @@ func TestSaturationMetric_IncorrectUsage(t *testing.T) { }) t.Run("calling working() consecutively", func(t *testing.T) { - sat := newSaturationMetric([]string{"metric"}, 30*time.Millisecond, 0) + sat := newSaturationMetric([]string{"metric"}, 30*time.Millisecond) - now := sat.buckets[0].beginning + now := sat.lastReport sat.nowFn = func() time.Time { return now } var reported float32 @@ -151,9 +122,9 @@ func TestSaturationMetric_IncorrectUsage(t *testing.T) { }) t.Run("calling working() first", func(t *testing.T) { - sat := newSaturationMetric([]string{"metric"}, 10*time.Millisecond, 0) + sat := newSaturationMetric([]string{"metric"}, 10*time.Millisecond) - now := sat.buckets[0].beginning.Add(10 * time.Millisecond) + now := sat.lastReport sat.nowFn = func() time.Time { return now } var reported float32 From 0e9a7c54139eb1f1863d8fe7ca2c7fb0fa4dcb55 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Wed, 27 Apr 2022 14:27:43 +0100 Subject: [PATCH 5/5] Incorporate @kisunji's comment improvements --- saturation.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/saturation.go b/saturation.go index 0124016eb..c01430f32 100644 --- a/saturation.go +++ b/saturation.go @@ -14,18 +14,18 @@ import ( // Callers must instrument their loop with calls to sleeping and working, starting // with a call to sleeping. // -// Note: it is expected that the caller is single-threaded and is not safe for +// Note: the caller must be single-threaded and saturationMetric is not safe for // concurrent use by multiple goroutines. type saturationMetric struct { reportInterval time.Duration // slept contains time for which the event processing loop was sleeping rather - // than working in the last period. + // than working in the period since lastReport. slept time.Duration - // lost contains time that was lost due to incorrect instrumentation of the - // event processing loop (e.g. calling sleeping() or working() multiple times - // in succession) in the last period. + // lost contains time that is considered lost due to incorrect use of + // saturationMetricBucket (e.g. calling sleeping() or working() multiple + // times in succession) in the period since lastReport. lost time.Duration lastReport, sleepBegan, workBegan time.Time