From 8ac5ab222090787181e0acac971d5a780f82a1e0 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Wed, 2 Feb 2022 13:11:45 +0000 Subject: [PATCH] Thread saturation metrics Adds metrics suggested in #488, to record the percentage of time the main and FSM goroutines are busy with work vs available to accept new work, to give operators an idea of how close they are to hitting capacity limits. We keep 256 samples in memory for each metric, and update gauges (at most) once a second, possibly less if the goroutines are idle. This should be ok because it's unlikely that a goroutine would go from very high saturation to being completely idle (so at worst we'll leave the gauge on the previous low value). --- api.go | 6 ++ fsm.go | 6 ++ raft.go | 52 ++++++++++++++--- saturation.go | 139 +++++++++++++++++++++++++++++++++++++++++++++ saturation_test.go | 133 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 328 insertions(+), 8 deletions(-) create mode 100644 saturation.go create mode 100644 saturation_test.go diff --git a/api.go b/api.go index 5c17bd6df..4c021659d 100644 --- a/api.go +++ b/api.go @@ -207,6 +207,10 @@ type Raft struct { // followerNotifyCh is used to tell followers that config has changed followerNotifyCh chan struct{} + + // thread saturation metric recorders. + mainThreadSaturation *saturationMetric + fsmThreadSaturation *saturationMetric } // BootstrapCluster initializes a server's storage with the given cluster @@ -553,6 +557,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna leadershipTransferCh: make(chan *leadershipTransferFuture, 1), leaderNotifyCh: make(chan struct{}, 1), followerNotifyCh: make(chan struct{}, 1), + mainThreadSaturation: newSaturationMetric([]string{"raft", "mainThreadSaturation"}, 1*time.Second), + fsmThreadSaturation: newSaturationMetric([]string{"raft", "fsm", "threadSaturation"}, 1*time.Second), } r.conf.Store(*conf) diff --git a/fsm.go b/fsm.go index 487cb4b78..0ff888317 100644 --- a/fsm.go +++ b/fsm.go @@ -224,8 +224,12 @@ func (r *Raft) runFSM() { } for { + r.fsmThreadSaturation.sleeping() + select { case ptr := <-r.fsmMutateCh: + r.fsmThreadSaturation.working() + switch req := ptr.(type) { case []*commitTuple: applyBatch(req) @@ -238,6 +242,8 @@ func (r *Raft) runFSM() { } case req := <-r.fsmSnapshotCh: + r.fsmThreadSaturation.working() + snapshot(req) case <-r.shutdownCh: diff --git a/raft.go b/raft.go index 4b85ac1ef..a509b3b3d 100644 --- a/raft.go +++ b/raft.go @@ -159,35 +159,45 @@ func (r *Raft) runFollower() { heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout) for r.getState() == Follower { + r.mainThreadSaturation.sleeping() + select { case rpc := <-r.rpcCh: + r.mainThreadSaturation.working() r.processRPC(rpc) case c := <-r.configurationChangeCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader c.respond(ErrNotLeader) case a := <-r.applyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader a.respond(ErrNotLeader) case v := <-r.verifyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader v.respond(ErrNotLeader) - case r := <-r.userRestoreCh: + case ur := <-r.userRestoreCh: + r.mainThreadSaturation.working() // Reject any restores since we are not the leader - r.respond(ErrNotLeader) + ur.respond(ErrNotLeader) - case r := <-r.leadershipTransferCh: + case l := <-r.leadershipTransferCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader - r.respond(ErrNotLeader) + l.respond(ErrNotLeader) case c := <-r.configurationsCh: + r.mainThreadSaturation.working() c.configurations = r.configurations.Clone() c.respond(nil) case b := <-r.bootstrapCh: + r.mainThreadSaturation.working() b.respond(r.liveBootstrap(b.configuration)) case <-r.leaderNotifyCh: @@ -197,6 +207,7 @@ func (r *Raft) runFollower() { heartbeatTimer = time.After(0) case <-heartbeatTimer: + r.mainThreadSaturation.working() // Restart the heartbeat timer hbTimeout := r.config().HeartbeatTimeout heartbeatTimer = randomTimeout(hbTimeout) @@ -290,11 +301,15 @@ func (r *Raft) runCandidate() { r.logger.Debug("votes", "needed", votesNeeded) for r.getState() == Candidate { + r.mainThreadSaturation.sleeping() + select { case rpc := <-r.rpcCh: + r.mainThreadSaturation.working() r.processRPC(rpc) case vote := <-voteCh: + r.mainThreadSaturation.working() // Check if the term is greater than ours, bail if vote.Term > r.getCurrentTerm() { r.logger.Debug("newer term discovered, fallback to follower") @@ -318,30 +333,37 @@ func (r *Raft) runCandidate() { } case c := <-r.configurationChangeCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader c.respond(ErrNotLeader) case a := <-r.applyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader a.respond(ErrNotLeader) case v := <-r.verifyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader v.respond(ErrNotLeader) - case r := <-r.userRestoreCh: + case ur := <-r.userRestoreCh: + r.mainThreadSaturation.working() // Reject any restores since we are not the leader - r.respond(ErrNotLeader) + ur.respond(ErrNotLeader) - case r := <-r.leadershipTransferCh: + case l := <-r.leadershipTransferCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader - r.respond(ErrNotLeader) + l.respond(ErrNotLeader) case c := <-r.configurationsCh: + r.mainThreadSaturation.working() c.configurations = r.configurations.Clone() c.respond(nil) case b := <-r.bootstrapCh: + r.mainThreadSaturation.working() b.respond(ErrCantBootstrap) case <-r.leaderNotifyCh: @@ -354,6 +376,7 @@ func (r *Raft) runCandidate() { } case <-electionTimer: + r.mainThreadSaturation.working() // Election failed! Restart the election. We simply return, // which will kick us back into runCandidate r.logger.Warn("Election timeout reached, restarting election") @@ -598,14 +621,19 @@ func (r *Raft) leaderLoop() { lease := time.After(r.config().LeaderLeaseTimeout) for r.getState() == Leader { + r.mainThreadSaturation.sleeping() + select { case rpc := <-r.rpcCh: + r.mainThreadSaturation.working() r.processRPC(rpc) case <-r.leaderState.stepDown: + r.mainThreadSaturation.working() r.setState(Follower) case future := <-r.leadershipTransferCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -686,6 +714,7 @@ func (r *Raft) leaderLoop() { go r.leadershipTransfer(*id, *address, state, stopCh, doneCh) case <-r.leaderState.commitCh: + r.mainThreadSaturation.working() // Process the newly committed entries oldCommitIndex := r.getCommitIndex() commitIndex := r.leaderState.commitment.getCommitIndex() @@ -748,6 +777,7 @@ func (r *Raft) leaderLoop() { } case v := <-r.verifyCh: + r.mainThreadSaturation.working() if v.quorumSize == 0 { // Just dispatched, start the verification r.verifyLeader(v) @@ -772,6 +802,7 @@ func (r *Raft) leaderLoop() { } case future := <-r.userRestoreCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -781,6 +812,7 @@ func (r *Raft) leaderLoop() { future.respond(err) case future := <-r.configurationsCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -790,6 +822,7 @@ func (r *Raft) leaderLoop() { future.respond(nil) case future := <-r.configurationChangeChIfStable(): + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -798,9 +831,11 @@ func (r *Raft) leaderLoop() { r.appendConfigurationEntry(future) case b := <-r.bootstrapCh: + r.mainThreadSaturation.working() b.respond(ErrCantBootstrap) case newLog := <-r.applyCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) newLog.respond(ErrLeadershipTransferInProgress) @@ -829,6 +864,7 @@ func (r *Raft) leaderLoop() { } case <-lease: + r.mainThreadSaturation.working() // Check if we've exceeded the lease, potentially stepping down maxDiff := r.checkLeaderLease() diff --git a/saturation.go b/saturation.go new file mode 100644 index 000000000..48e480a02 --- /dev/null +++ b/saturation.go @@ -0,0 +1,139 @@ +package raft + +import ( + "math" + "time" + + "github.com/armon/go-metrics" +) + +// saturationMetric measures the saturation (percentage of time spent working vs +// waiting for work) of an event processing loop, such as runFSM. It reports the +// saturation as a gauge metric (at most) once every reportInterval. +// +// Callers must instrument their loop with calls to sleeping and working, starting +// with a call to sleeping. +// +// Note: it is expected that the caller is single-threaded and is not safe for +// concurrent use by multiple goroutines. +type saturationMetric struct { + // reportInterval is the maximum frequency at which the gauge will be + // updated (it may be less frequent if the caller is idle). + reportInterval time.Duration + + // index of the current sample. We rely on it wrapping-around on overflow and + // underflow, to implement a circular buffer (note the matching size of the + // samples array). + index uint8 + + // samples is a fixed-size array of samples (similar to a circular buffer) + // where elements at even-numbered indexes contain time spent sleeping, and + // elements at odd-numbered indexes contain time spent working. + samples [math.MaxUint8 + 1]struct { + v time.Duration // measurement + t time.Time // time at which the measurement was captured + } + + sleepBegan, workBegan time.Time + lastReport time.Time + + // These are overwritten in tests. + nowFn func() time.Time + reportFn func(float32) +} + +// newSaturationMetric creates a saturationMetric that will update the gauge +// with the given name at the given reportInterval. +func newSaturationMetric(name []string, reportInterval time.Duration) *saturationMetric { + return &saturationMetric{ + reportInterval: reportInterval, + lastReport: time.Now(), // Set to now to avoid reporting immediately. + nowFn: time.Now, + reportFn: func(sat float32) { metrics.SetGauge(name, sat) }, + } +} + +// sleeping records the time at which the caller began waiting for work. After +// the initial call it must always be proceeded by a call to working. +func (s *saturationMetric) sleeping() { + s.sleepBegan = s.nowFn() + + // Caller should've called working (we've probably missed a branch of a + // select). Reset sleepBegan without recording a sample to "lose" time + // instead of recording nonsense data. + if s.index%2 == 1 { + return + } + + if !s.workBegan.IsZero() { + sample := &s.samples[s.index-1] + sample.v = s.sleepBegan.Sub(s.workBegan) + sample.t = s.sleepBegan + } + + s.index++ + s.report() +} + +// working records the time at which the caller began working. It must always +// be proceeded by a call to sleeping. +func (s *saturationMetric) working() { + s.workBegan = s.nowFn() + + // Caller should've called sleeping. Reset workBegan without recording a + // sample to "lose" time instead of recording nonsense data. + if s.index%2 == 0 { + return + } + + sample := &s.samples[s.index-1] + sample.v = s.workBegan.Sub(s.sleepBegan) + sample.t = s.workBegan + + s.index++ + s.report() +} + +// report updates the gauge if reportInterval has passed since our last report. +func (s *saturationMetric) report() { + if s.nowFn().Sub(s.lastReport) < s.reportInterval { + return + } + + workSamples := make([]time.Duration, 0, len(s.samples)/2) + sleepSamples := make([]time.Duration, 0, len(s.samples)/2) + + for idx, sample := range s.samples { + if !sample.t.After(s.lastReport) { + continue + } + + if idx%2 == 0 { + sleepSamples = append(sleepSamples, sample.v) + } else { + workSamples = append(workSamples, sample.v) + } + } + + // Ensure we take an equal number of work and sleep samples to avoid + // over/under reporting. + take := len(workSamples) + if len(sleepSamples) < take { + take = len(sleepSamples) + } + + var saturation float32 + if take != 0 { + var work, sleep time.Duration + for _, s := range workSamples[0:take] { + work += s + } + for _, s := range sleepSamples[0:take] { + sleep += s + } + saturation = float32(work) / float32(work+sleep) + } + + s.reportFn(saturation) + s.lastReport = s.nowFn() +} diff --git a/saturation_test.go b/saturation_test.go new file mode 100644 index 000000000..c576e2e1b --- /dev/null +++ b/saturation_test.go @@ -0,0 +1,133 @@ +package raft + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSaturationMetric(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond) + + var now time.Time + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(val float32) { reported = val } + + now = time.Now() + sat.sleeping() + + now = now.Add(50 * time.Millisecond) + sat.working() + + now = now.Add(75 * time.Millisecond) + sat.sleeping() + + require.Equal(t, float32(0.6), reported) + + now = now.Add(90 * time.Millisecond) + sat.working() + + now = now.Add(10 * time.Millisecond) + sat.sleeping() + + require.Equal(t, float32(0.1), reported) + + now = now.Add(100 * time.Millisecond) + sat.working() + + require.Equal(t, float32(0), reported) +} + +func TestSaturationMetric_IndexWraparound(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond) + + now := time.Now() + sat.nowFn = func() time.Time { return now } + + for i := 0; i < 1024; i++ { + now = now.Add(25 * time.Millisecond) + + if i%2 == 0 { + require.NotPanics(t, sat.sleeping) + } else { + require.NotPanics(t, sat.working) + } + } +} + +func TestSaturationMetric_IncorrectUsage(t *testing.T) { + t.Run("calling sleeping() consecutively", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 50*time.Millisecond) + + now := time.Now() + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(v float32) { reported = v } + + // Calling sleeping() consecutively should reset sleepBegan without recording + // a sample, such that we "lose" time rather than recording nonsense data. + // + // 0 | sleeping() | + // => Sleeping (10ms) + // +10ms | working() | + // => Working (10ms) + // +20ms | sleeping() | + // => [!] LOST [!] (10ms) + // +30ms | sleeping() | + // => Sleeping (10ms) + // +40ms | working() | + // => Working (10ms) + // +50ms | sleeping() | + // + // Total reportable time: 40ms. Saturation: 50%. + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + + require.Equal(t, float32(0.5), reported) + }) + + t.Run("calling working() consecutively", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 30*time.Millisecond) + + now := time.Now() + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(v float32) { reported = v } + + // Calling working() consecutively should reset workBegan without recording + // a sample, such that we "lose" time rather than recording nonsense data. + // + // 0 | sleeping() | + // => Sleeping (10ms) + // +10ms | working() | + // => [!] LOST [!] (10ms) + // +20ms | working() | + // => Working (10ms) + // +30ms | sleeping() | + // + // Total reportable time: 20ms. Saturation: 50%. + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + + require.Equal(t, float32(0.5), reported) + }) +}