diff --git a/api.go b/api.go index 5c17bd6d..eda33c3f 100644 --- a/api.go +++ b/api.go @@ -207,6 +207,9 @@ type Raft struct { // followerNotifyCh is used to tell followers that config has changed followerNotifyCh chan struct{} + + // mainThreadSaturation measures the saturation of the main raft goroutine. + mainThreadSaturation *saturationMetric } // BootstrapCluster initializes a server's storage with the given cluster @@ -553,6 +556,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna leadershipTransferCh: make(chan *leadershipTransferFuture, 1), leaderNotifyCh: make(chan struct{}, 1), followerNotifyCh: make(chan struct{}, 1), + mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second), } r.conf.Store(*conf) diff --git a/fsm.go b/fsm.go index 487cb4b7..6d26a9a2 100644 --- a/fsm.go +++ b/fsm.go @@ -223,9 +223,15 @@ func (r *Raft) runFSM() { req.respond(err) } + saturation := newSaturationMetric([]string{"raft", "thread", "fsm", "saturation"}, 1*time.Second) + for { + saturation.sleeping() + select { case ptr := <-r.fsmMutateCh: + saturation.working() + switch req := ptr.(type) { case []*commitTuple: applyBatch(req) @@ -238,6 +244,8 @@ func (r *Raft) runFSM() { } case req := <-r.fsmSnapshotCh: + saturation.working() + snapshot(req) case <-r.shutdownCh: diff --git a/raft.go b/raft.go index 4b85ac1e..a509b3b3 100644 --- a/raft.go +++ b/raft.go @@ -159,35 +159,45 @@ func (r *Raft) runFollower() { heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout) for r.getState() == Follower { + r.mainThreadSaturation.sleeping() + select { case rpc := <-r.rpcCh: + r.mainThreadSaturation.working() r.processRPC(rpc) case c := <-r.configurationChangeCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader c.respond(ErrNotLeader) case a := <-r.applyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader a.respond(ErrNotLeader) case v := <-r.verifyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader v.respond(ErrNotLeader) - case r := <-r.userRestoreCh: + case ur := <-r.userRestoreCh: + r.mainThreadSaturation.working() // Reject any restores since we are not the leader - r.respond(ErrNotLeader) + ur.respond(ErrNotLeader) - case r := <-r.leadershipTransferCh: + case l := <-r.leadershipTransferCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader - r.respond(ErrNotLeader) + l.respond(ErrNotLeader) case c := <-r.configurationsCh: + r.mainThreadSaturation.working() c.configurations = r.configurations.Clone() c.respond(nil) case b := <-r.bootstrapCh: + r.mainThreadSaturation.working() b.respond(r.liveBootstrap(b.configuration)) case <-r.leaderNotifyCh: @@ -197,6 +207,7 @@ func (r *Raft) runFollower() { heartbeatTimer = time.After(0) case <-heartbeatTimer: + r.mainThreadSaturation.working() // Restart the heartbeat timer hbTimeout := r.config().HeartbeatTimeout heartbeatTimer = randomTimeout(hbTimeout) @@ -290,11 +301,15 @@ func (r *Raft) runCandidate() { r.logger.Debug("votes", "needed", votesNeeded) for r.getState() == Candidate { + r.mainThreadSaturation.sleeping() + select { case rpc := <-r.rpcCh: + r.mainThreadSaturation.working() r.processRPC(rpc) case vote := <-voteCh: + r.mainThreadSaturation.working() // Check if the term is greater than ours, bail if vote.Term > r.getCurrentTerm() { r.logger.Debug("newer term discovered, fallback to follower") @@ -318,30 +333,37 @@ func (r *Raft) runCandidate() { } case c := <-r.configurationChangeCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader c.respond(ErrNotLeader) case a := <-r.applyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader a.respond(ErrNotLeader) case v := <-r.verifyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader v.respond(ErrNotLeader) - case r := <-r.userRestoreCh: + case ur := <-r.userRestoreCh: + r.mainThreadSaturation.working() // Reject any restores since we are not the leader - r.respond(ErrNotLeader) + ur.respond(ErrNotLeader) - case r := <-r.leadershipTransferCh: + case l := <-r.leadershipTransferCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader - r.respond(ErrNotLeader) + l.respond(ErrNotLeader) case c := <-r.configurationsCh: + r.mainThreadSaturation.working() c.configurations = r.configurations.Clone() c.respond(nil) case b := <-r.bootstrapCh: + r.mainThreadSaturation.working() b.respond(ErrCantBootstrap) case <-r.leaderNotifyCh: @@ -354,6 +376,7 @@ func (r *Raft) runCandidate() { } case <-electionTimer: + r.mainThreadSaturation.working() // Election failed! Restart the election. We simply return, // which will kick us back into runCandidate r.logger.Warn("Election timeout reached, restarting election") @@ -598,14 +621,19 @@ func (r *Raft) leaderLoop() { lease := time.After(r.config().LeaderLeaseTimeout) for r.getState() == Leader { + r.mainThreadSaturation.sleeping() + select { case rpc := <-r.rpcCh: + r.mainThreadSaturation.working() r.processRPC(rpc) case <-r.leaderState.stepDown: + r.mainThreadSaturation.working() r.setState(Follower) case future := <-r.leadershipTransferCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -686,6 +714,7 @@ func (r *Raft) leaderLoop() { go r.leadershipTransfer(*id, *address, state, stopCh, doneCh) case <-r.leaderState.commitCh: + r.mainThreadSaturation.working() // Process the newly committed entries oldCommitIndex := r.getCommitIndex() commitIndex := r.leaderState.commitment.getCommitIndex() @@ -748,6 +777,7 @@ func (r *Raft) leaderLoop() { } case v := <-r.verifyCh: + r.mainThreadSaturation.working() if v.quorumSize == 0 { // Just dispatched, start the verification r.verifyLeader(v) @@ -772,6 +802,7 @@ func (r *Raft) leaderLoop() { } case future := <-r.userRestoreCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -781,6 +812,7 @@ func (r *Raft) leaderLoop() { future.respond(err) case future := <-r.configurationsCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -790,6 +822,7 @@ func (r *Raft) leaderLoop() { future.respond(nil) case future := <-r.configurationChangeChIfStable(): + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -798,9 +831,11 @@ func (r *Raft) leaderLoop() { r.appendConfigurationEntry(future) case b := <-r.bootstrapCh: + r.mainThreadSaturation.working() b.respond(ErrCantBootstrap) case newLog := <-r.applyCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) newLog.respond(ErrLeadershipTransferInProgress) @@ -829,6 +864,7 @@ func (r *Raft) leaderLoop() { } case <-lease: + r.mainThreadSaturation.working() // Check if we've exceeded the lease, potentially stepping down maxDiff := r.checkLeaderLease() diff --git a/saturation.go b/saturation.go new file mode 100644 index 00000000..c01430f3 --- /dev/null +++ b/saturation.go @@ -0,0 +1,111 @@ +package raft + +import ( + "math" + "time" + + "github.com/armon/go-metrics" +) + +// saturationMetric measures the saturation (percentage of time spent working vs +// waiting for work) of an event processing loop, such as runFSM. It reports the +// saturation as a gauge metric (at most) once every reportInterval. +// +// Callers must instrument their loop with calls to sleeping and working, starting +// with a call to sleeping. +// +// Note: the caller must be single-threaded and saturationMetric is not safe for +// concurrent use by multiple goroutines. +type saturationMetric struct { + reportInterval time.Duration + + // slept contains time for which the event processing loop was sleeping rather + // than working in the period since lastReport. + slept time.Duration + + // lost contains time that is considered lost due to incorrect use of + // saturationMetricBucket (e.g. calling sleeping() or working() multiple + // times in succession) in the period since lastReport. + lost time.Duration + + lastReport, sleepBegan, workBegan time.Time + + // These are overwritten in tests. + nowFn func() time.Time + reportFn func(float32) +} + +// newSaturationMetric creates a saturationMetric that will update the gauge +// with the given name at the given reportInterval. keepPrev determines the +// number of previous measurements that will be used to smooth out spikes. +func newSaturationMetric(name []string, reportInterval time.Duration) *saturationMetric { + m := &saturationMetric{ + reportInterval: reportInterval, + nowFn: time.Now, + lastReport: time.Now(), + reportFn: func(sat float32) { metrics.AddSample(name, sat) }, + } + return m +} + +// sleeping records the time at which the loop began waiting for work. After the +// initial call it must always be proceeded by a call to working. +func (s *saturationMetric) sleeping() { + now := s.nowFn() + + if !s.sleepBegan.IsZero() { + // sleeping called twice in succession. Count that time as lost rather than + // measuring nonsense. + s.lost += now.Sub(s.sleepBegan) + } + + s.sleepBegan = now + s.workBegan = time.Time{} + s.report() +} + +// working records the time at which the loop began working. It must always be +// proceeded by a call to sleeping. +func (s *saturationMetric) working() { + now := s.nowFn() + + if s.workBegan.IsZero() { + if s.sleepBegan.IsZero() { + // working called before the initial call to sleeping. Count that time as + // lost rather than measuring nonsense. + s.lost += now.Sub(s.lastReport) + } else { + s.slept += now.Sub(s.sleepBegan) + } + } else { + // working called twice in succession. Count that time as lost rather than + // measuring nonsense. + s.lost += now.Sub(s.workBegan) + } + + s.workBegan = now + s.sleepBegan = time.Time{} + s.report() +} + +// report updates the gauge if reportInterval has passed since our last report. +func (s *saturationMetric) report() { + now := s.nowFn() + timeSinceLastReport := now.Sub(s.lastReport) + + if timeSinceLastReport < s.reportInterval { + return + } + + var saturation float64 + total := timeSinceLastReport - s.lost + if total != 0 { + saturation = float64(total-s.slept) / float64(total) + saturation = math.Round(saturation*100) / 100 + } + s.reportFn(float32(saturation)) + + s.slept = 0 + s.lost = 0 + s.lastReport = now +} diff --git a/saturation_test.go b/saturation_test.go new file mode 100644 index 00000000..35046f34 --- /dev/null +++ b/saturation_test.go @@ -0,0 +1,144 @@ +package raft + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSaturationMetric(t *testing.T) { + t.Run("without smoothing", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond) + + now := sat.lastReport + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(val float32) { reported = val } + + sat.sleeping() + + // First window: 50ms sleeping + 75ms working. + now = now.Add(50 * time.Millisecond) + sat.working() + + now = now.Add(75 * time.Millisecond) + sat.sleeping() + + // Should be 60% saturation. + require.Equal(t, float32(0.6), reported) + + // Second window: 90ms sleeping + 10ms working. + now = now.Add(90 * time.Millisecond) + sat.working() + + now = now.Add(10 * time.Millisecond) + sat.sleeping() + + // Should be 10% saturation. + require.Equal(t, float32(0.1), reported) + + // Third window: 100ms sleeping + 0ms working. + now = now.Add(100 * time.Millisecond) + sat.working() + + // Should be 0% saturation. + require.Equal(t, float32(0), reported) + }) +} + +func TestSaturationMetric_IncorrectUsage(t *testing.T) { + t.Run("calling sleeping() consecutively", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 50*time.Millisecond) + + now := sat.lastReport + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(v float32) { reported = v } + + // Calling sleeping() consecutively should reset sleepBegan without recording + // a sample, such that we "lose" time rather than recording nonsense data. + // + // 0 | sleeping() | + // => Sleeping (10ms) + // +10ms | working() | + // => Working (10ms) + // +20ms | sleeping() | + // => [!] LOST [!] (10ms) + // +30ms | sleeping() | + // => Sleeping (10ms) + // +40ms | working() | + // => Working (10ms) + // +50ms | sleeping() | + // + // Total reportable time: 40ms. Saturation: 50%. + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + + require.Equal(t, float32(0.5), reported) + }) + + t.Run("calling working() consecutively", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 30*time.Millisecond) + + now := sat.lastReport + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(v float32) { reported = v } + + // Calling working() consecutively should reset workBegan without recording + // a sample, such that we "lose" time rather than recording nonsense data. + // + // 0 | sleeping() | + // => Sleeping (10ms) + // +10ms | working() | + // => [!] LOST [!] (10ms) + // +20ms | working() | + // => Working (10ms) + // +30ms | sleeping() | + // + // Total reportable time: 20ms. Saturation: 50%. + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + + require.Equal(t, float32(0.5), reported) + }) + + t.Run("calling working() first", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 10*time.Millisecond) + + now := sat.lastReport + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(v float32) { reported = v } + + // Time from start until working() is treated as lost. + sat.working() + require.Equal(t, float32(0), reported) + + sat.sleeping() + now = now.Add(5 * time.Millisecond) + sat.working() + now = now.Add(5 * time.Millisecond) + sat.sleeping() + require.Equal(t, float32(0.5), reported) + }) +}