Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread saturation metrics 馃搱 #489

Merged
merged 5 commits into from Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions api.go
Expand Up @@ -207,6 +207,9 @@ type Raft struct {

// followerNotifyCh is used to tell followers that config has changed
followerNotifyCh chan struct{}

// mainThreadSaturation measures the saturation of the main raft goroutine.
mainThreadSaturation *saturationMetric
}

// BootstrapCluster initializes a server's storage with the given cluster
Expand Down Expand Up @@ -553,6 +556,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
leadershipTransferCh: make(chan *leadershipTransferFuture, 1),
leaderNotifyCh: make(chan struct{}, 1),
followerNotifyCh: make(chan struct{}, 1),
mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second),
}

r.conf.Store(*conf)
Expand Down
8 changes: 8 additions & 0 deletions fsm.go
Expand Up @@ -223,9 +223,15 @@ func (r *Raft) runFSM() {
req.respond(err)
}

saturation := newSaturationMetric([]string{"raft", "thread", "fsm", "saturation"}, 1*time.Second)

for {
saturation.sleeping()

select {
case ptr := <-r.fsmMutateCh:
saturation.working()

switch req := ptr.(type) {
case []*commitTuple:
applyBatch(req)
Expand All @@ -238,6 +244,8 @@ func (r *Raft) runFSM() {
}

case req := <-r.fsmSnapshotCh:
saturation.working()

snapshot(req)

case <-r.shutdownCh:
Expand Down
52 changes: 44 additions & 8 deletions raft.go
Expand Up @@ -159,35 +159,45 @@ func (r *Raft) runFollower() {
heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout)

for r.getState() == Follower {
r.mainThreadSaturation.sleeping()

select {
case rpc := <-r.rpcCh:
r.mainThreadSaturation.working()
r.processRPC(rpc)

case c := <-r.configurationChangeCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
c.respond(ErrNotLeader)

case a := <-r.applyCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
a.respond(ErrNotLeader)

case v := <-r.verifyCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)

case r := <-r.userRestoreCh:
case ur := <-r.userRestoreCh:
r.mainThreadSaturation.working()
// Reject any restores since we are not the leader
r.respond(ErrNotLeader)
ur.respond(ErrNotLeader)

case r := <-r.leadershipTransferCh:
case l := <-r.leadershipTransferCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
r.respond(ErrNotLeader)
l.respond(ErrNotLeader)

case c := <-r.configurationsCh:
r.mainThreadSaturation.working()
c.configurations = r.configurations.Clone()
c.respond(nil)

case b := <-r.bootstrapCh:
r.mainThreadSaturation.working()
b.respond(r.liveBootstrap(b.configuration))

case <-r.leaderNotifyCh:
Expand All @@ -197,6 +207,7 @@ func (r *Raft) runFollower() {
heartbeatTimer = time.After(0)

case <-heartbeatTimer:
r.mainThreadSaturation.working()
// Restart the heartbeat timer
hbTimeout := r.config().HeartbeatTimeout
heartbeatTimer = randomTimeout(hbTimeout)
Expand Down Expand Up @@ -290,11 +301,15 @@ func (r *Raft) runCandidate() {
r.logger.Debug("votes", "needed", votesNeeded)

for r.getState() == Candidate {
r.mainThreadSaturation.sleeping()

select {
case rpc := <-r.rpcCh:
r.mainThreadSaturation.working()
r.processRPC(rpc)

case vote := <-voteCh:
r.mainThreadSaturation.working()
// Check if the term is greater than ours, bail
if vote.Term > r.getCurrentTerm() {
r.logger.Debug("newer term discovered, fallback to follower")
Expand All @@ -318,30 +333,37 @@ func (r *Raft) runCandidate() {
}

case c := <-r.configurationChangeCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
c.respond(ErrNotLeader)

case a := <-r.applyCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
a.respond(ErrNotLeader)

case v := <-r.verifyCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)

case r := <-r.userRestoreCh:
case ur := <-r.userRestoreCh:
r.mainThreadSaturation.working()
// Reject any restores since we are not the leader
r.respond(ErrNotLeader)
ur.respond(ErrNotLeader)

case r := <-r.leadershipTransferCh:
case l := <-r.leadershipTransferCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
r.respond(ErrNotLeader)
l.respond(ErrNotLeader)

case c := <-r.configurationsCh:
r.mainThreadSaturation.working()
c.configurations = r.configurations.Clone()
c.respond(nil)

case b := <-r.bootstrapCh:
r.mainThreadSaturation.working()
b.respond(ErrCantBootstrap)

case <-r.leaderNotifyCh:
Expand All @@ -354,6 +376,7 @@ func (r *Raft) runCandidate() {
}

case <-electionTimer:
r.mainThreadSaturation.working()
// Election failed! Restart the election. We simply return,
// which will kick us back into runCandidate
r.logger.Warn("Election timeout reached, restarting election")
Expand Down Expand Up @@ -598,14 +621,19 @@ func (r *Raft) leaderLoop() {
lease := time.After(r.config().LeaderLeaseTimeout)

for r.getState() == Leader {
r.mainThreadSaturation.sleeping()

select {
case rpc := <-r.rpcCh:
r.mainThreadSaturation.working()
r.processRPC(rpc)

case <-r.leaderState.stepDown:
r.mainThreadSaturation.working()
r.setState(Follower)

case future := <-r.leadershipTransferCh:
r.mainThreadSaturation.working()
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
future.respond(ErrLeadershipTransferInProgress)
Expand Down Expand Up @@ -686,6 +714,7 @@ func (r *Raft) leaderLoop() {
go r.leadershipTransfer(*id, *address, state, stopCh, doneCh)

case <-r.leaderState.commitCh:
r.mainThreadSaturation.working()
// Process the newly committed entries
oldCommitIndex := r.getCommitIndex()
commitIndex := r.leaderState.commitment.getCommitIndex()
Expand Down Expand Up @@ -748,6 +777,7 @@ func (r *Raft) leaderLoop() {
}

case v := <-r.verifyCh:
r.mainThreadSaturation.working()
if v.quorumSize == 0 {
// Just dispatched, start the verification
r.verifyLeader(v)
Expand All @@ -772,6 +802,7 @@ func (r *Raft) leaderLoop() {
}

case future := <-r.userRestoreCh:
r.mainThreadSaturation.working()
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
future.respond(ErrLeadershipTransferInProgress)
Expand All @@ -781,6 +812,7 @@ func (r *Raft) leaderLoop() {
future.respond(err)

case future := <-r.configurationsCh:
r.mainThreadSaturation.working()
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
future.respond(ErrLeadershipTransferInProgress)
Expand All @@ -790,6 +822,7 @@ func (r *Raft) leaderLoop() {
future.respond(nil)

case future := <-r.configurationChangeChIfStable():
r.mainThreadSaturation.working()
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
future.respond(ErrLeadershipTransferInProgress)
Expand All @@ -798,9 +831,11 @@ func (r *Raft) leaderLoop() {
r.appendConfigurationEntry(future)

case b := <-r.bootstrapCh:
r.mainThreadSaturation.working()
b.respond(ErrCantBootstrap)

case newLog := <-r.applyCh:
r.mainThreadSaturation.working()
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
newLog.respond(ErrLeadershipTransferInProgress)
Expand Down Expand Up @@ -829,6 +864,7 @@ func (r *Raft) leaderLoop() {
}

case <-lease:
r.mainThreadSaturation.working()
// Check if we've exceeded the lease, potentially stepping down
maxDiff := r.checkLeaderLease()

Expand Down
111 changes: 111 additions & 0 deletions saturation.go
@@ -0,0 +1,111 @@
package raft

import (
"math"
"time"

"github.com/armon/go-metrics"
)

// saturationMetric measures the saturation (percentage of time spent working vs
// waiting for work) of an event processing loop, such as runFSM. It reports the
// saturation as a gauge metric (at most) once every reportInterval.
kisunji marked this conversation as resolved.
Show resolved Hide resolved
//
// Callers must instrument their loop with calls to sleeping and working, starting
// with a call to sleeping.
//
// Note: the caller must be single-threaded and saturationMetric is not safe for
// concurrent use by multiple goroutines.
type saturationMetric struct {
reportInterval time.Duration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could this be simply interval?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! I don't feel strongly either way 馃槃


// slept contains time for which the event processing loop was sleeping rather
// than working in the period since lastReport.
slept time.Duration

// lost contains time that is considered lost due to incorrect use of
// saturationMetricBucket (e.g. calling sleeping() or working() multiple
// times in succession) in the period since lastReport.
lost time.Duration

lastReport, sleepBegan, workBegan time.Time

// These are overwritten in tests.
nowFn func() time.Time
reportFn func(float32)
}

// newSaturationMetric creates a saturationMetric that will update the gauge
// with the given name at the given reportInterval. keepPrev determines the
// number of previous measurements that will be used to smooth out spikes.
func newSaturationMetric(name []string, reportInterval time.Duration) *saturationMetric {
m := &saturationMetric{
reportInterval: reportInterval,
nowFn: time.Now,
lastReport: time.Now(),
reportFn: func(sat float32) { metrics.AddSample(name, sat) },
}
return m
}

// sleeping records the time at which the loop began waiting for work. After the
// initial call it must always be proceeded by a call to working.
func (s *saturationMetric) sleeping() {
now := s.nowFn()

if !s.sleepBegan.IsZero() {
// sleeping called twice in succession. Count that time as lost rather than
// measuring nonsense.
s.lost += now.Sub(s.sleepBegan)
}

s.sleepBegan = now
s.workBegan = time.Time{}
s.report()
}

// working records the time at which the loop began working. It must always be
// proceeded by a call to sleeping.
func (s *saturationMetric) working() {
now := s.nowFn()

if s.workBegan.IsZero() {
if s.sleepBegan.IsZero() {
// working called before the initial call to sleeping. Count that time as
// lost rather than measuring nonsense.
s.lost += now.Sub(s.lastReport)
} else {
s.slept += now.Sub(s.sleepBegan)
}
} else {
// working called twice in succession. Count that time as lost rather than
// measuring nonsense.
s.lost += now.Sub(s.workBegan)
}

s.workBegan = now
s.sleepBegan = time.Time{}
s.report()
}

// report updates the gauge if reportInterval has passed since our last report.
func (s *saturationMetric) report() {
now := s.nowFn()
timeSinceLastReport := now.Sub(s.lastReport)

if timeSinceLastReport < s.reportInterval {
return
}

var saturation float64
total := timeSinceLastReport - s.lost
if total != 0 {
saturation = float64(total-s.slept) / float64(total)
saturation = math.Round(saturation*100) / 100
}
s.reportFn(float32(saturation))

s.slept = 0
s.lost = 0
s.lastReport = now
}