Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HeartbeatTimeout and ElectionTimeout to reloadable config. #496

Merged
merged 4 commits into from Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions api.go
Expand Up @@ -201,6 +201,12 @@ type Raft struct {
// leadershipTransferCh is used to start a leadership transfer from outside of
// the main thread.
leadershipTransferCh chan *leadershipTransferFuture

// leaderNotifyCh is used to tell leader that config has changed
leaderNotifyCh chan struct{}

// followerNotifyCh is used to tell followers that config has changed
followerNotifyCh chan struct{}
}

// BootstrapCluster initializes a server's storage with the given cluster
Expand Down Expand Up @@ -545,6 +551,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
bootstrapCh: make(chan *bootstrapFuture),
observers: make(map[uint64]*Observer),
leadershipTransferCh: make(chan *leadershipTransferFuture, 1),
leaderNotifyCh: make(chan struct{}, 1),
followerNotifyCh: make(chan struct{}, 1),
}

r.conf.Store(*conf)
Expand Down Expand Up @@ -696,6 +704,14 @@ func (r *Raft) ReloadConfig(rc ReloadableConfig) error {
return err
}
r.conf.Store(newCfg)

if rc.HeartbeatTimeout < oldCfg.HeartbeatTimeout {
// On leader, ensure replication loops running with a longer
// timeout than what we want now discover the change.
asyncNotifyCh(r.leaderNotifyCh)
// On follower, update current timer to use the shorter new value.
asyncNotifyCh(r.followerNotifyCh)
}
return nil
}

Expand Down
16 changes: 14 additions & 2 deletions config.go
Expand Up @@ -260,6 +260,14 @@ type ReloadableConfig struct {
// we perform a snapshot. This is to prevent excessive snapshots when we can
// just replay a small set of logs.
SnapshotThreshold uint64

// HeartbeatTimeout specifies the time in follower state without
// a leader before we attempt an election.
HeartbeatTimeout time.Duration

// ElectionTimeout specifies the time in candidate state without
// a leader before we attempt an election.
ElectionTimeout time.Duration
}

// apply sets the reloadable fields on the passed Config to the values in
Expand All @@ -269,6 +277,8 @@ func (rc *ReloadableConfig) apply(to Config) Config {
to.TrailingLogs = rc.TrailingLogs
to.SnapshotInterval = rc.SnapshotInterval
to.SnapshotThreshold = rc.SnapshotThreshold
to.HeartbeatTimeout = rc.HeartbeatTimeout
to.ElectionTimeout = rc.ElectionTimeout
return to
}

Expand All @@ -277,6 +287,8 @@ func (rc *ReloadableConfig) fromConfig(from Config) {
rc.TrailingLogs = from.TrailingLogs
rc.SnapshotInterval = from.SnapshotInterval
rc.SnapshotThreshold = from.SnapshotThreshold
rc.HeartbeatTimeout = from.HeartbeatTimeout
rc.ElectionTimeout = from.ElectionTimeout
}

// DefaultConfig returns a Config with usable defaults.
Expand Down Expand Up @@ -334,10 +346,10 @@ func ValidateConfig(config *Config) error {
return fmt.Errorf("LeaderLeaseTimeout is too low")
}
if config.LeaderLeaseTimeout > config.HeartbeatTimeout {
return fmt.Errorf("LeaderLeaseTimeout cannot be larger than heartbeat timeout")
return fmt.Errorf("LeaderLeaseTimeout (%s) cannot be larger than heartbeat timeout (%s)", config.LeaderLeaseTimeout, config.HeartbeatTimeout)
}
if config.ElectionTimeout < config.HeartbeatTimeout {
return fmt.Errorf("ElectionTimeout must be equal or greater than Heartbeat Timeout")
return fmt.Errorf("ElectionTimeout (%s) must be equal or greater than Heartbeat Timeout (%s)", config.ElectionTimeout, config.HeartbeatTimeout)
}
return nil
}
133 changes: 133 additions & 0 deletions integ_test.go
Expand Up @@ -5,10 +5,12 @@ import (
"fmt"
"io/ioutil"
"os"
"sync/atomic"
"testing"
"time"

"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
)

// CheckInteg will skip a test if integration testing is not enabled.
Expand Down Expand Up @@ -355,3 +357,134 @@ func TestRaft_Integ(t *testing.T) {
e.Release()
}
}

func TestRaft_RestartFollower_LongInitialHeartbeat(t *testing.T) {
CheckInteg(t)
tests := []struct {
name string
restartInitialTimeouts time.Duration
expectNewLeader bool
}{
{"Default", 0, true},
{"InitialHigher", time.Second, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := DefaultConfig()
conf.LocalID = ServerID("first")
conf.HeartbeatTimeout = 50 * time.Millisecond
conf.ElectionTimeout = 50 * time.Millisecond
conf.LeaderLeaseTimeout = 50 * time.Millisecond
conf.CommitTimeout = 5 * time.Millisecond
conf.SnapshotThreshold = 100
conf.TrailingLogs = 10

// Create a single node
env1 := MakeRaft(t, conf, true)
NoErr(WaitFor(env1, Leader), t)

// Join a few nodes!
var envs []*RaftEnv
for i := 0; i < 2; i++ {
conf.LocalID = ServerID(fmt.Sprintf("next-batch-%d", i))
env := MakeRaft(t, conf, false)
addr := env.trans.LocalAddr()
NoErr(WaitFuture(env1.raft.AddVoter(conf.LocalID, addr, 0, 0), t), t)
envs = append(envs, env)
}
allEnvs := append([]*RaftEnv{env1}, envs...)

// Wait for a leader
_, err := WaitForAny(Leader, append([]*RaftEnv{env1}, envs...))
NoErr(err, t)

CheckConsistent(append([]*RaftEnv{env1}, envs...), t)
// TODO without this sleep, the restarted follower doesn't have any stored config
// and aborts the election because it doesn't know of any peers. Shouldn't
// CheckConsistent prevent that?
time.Sleep(time.Second)

// shutdown a follower
disconnected := envs[len(envs)-1]
disconnected.logger.Info("stopping follower")
disconnected.Shutdown()

seeNewLeader := func(o *Observation) bool { _, ok := o.Data.(LeaderObservation); return ok }
leaderCh := make(chan Observation)
// TODO Closing this channel results in panics, even though we're calling Release.
//defer close(leaderCh)
leaderChanges := new(uint32)
go func() {
for range leaderCh {
atomic.AddUint32(leaderChanges, 1)
}
}()

requestVoteCh := make(chan Observation)
seeRequestVote := func(o *Observation) bool { _, ok := o.Data.(RequestVoteRequest); return ok }
requestVotes := new(uint32)
go func() {
for range requestVoteCh {
atomic.AddUint32(requestVotes, 1)
}
}()

for _, env := range allEnvs {
env.raft.RegisterObserver(NewObserver(leaderCh, false, seeNewLeader))
}

// Unfortunately we need to wait for the leader to start backing off RPCs to the down follower
// such that when the follower comes back up it'll run an election before it gets an rpc from
// the leader
time.Sleep(time.Second * 5)

if tt.restartInitialTimeouts != 0 {
disconnected.conf.HeartbeatTimeout = tt.restartInitialTimeouts
disconnected.conf.ElectionTimeout = tt.restartInitialTimeouts
}
disconnected.logger.Info("restarting follower")
disconnected.Restart(t)

time.Sleep(time.Second * 2)

if tt.expectNewLeader {
require.NotEqual(t, 0, atomic.LoadUint32(leaderChanges))
} else {
require.Equal(t, uint32(0), atomic.LoadUint32(leaderChanges))
}

if tt.restartInitialTimeouts != 0 {
for _, env := range envs {
env.raft.RegisterObserver(NewObserver(requestVoteCh, false, seeRequestVote))
NoErr(env.raft.ReloadConfig(ReloadableConfig{
TrailingLogs: conf.TrailingLogs,
SnapshotInterval: conf.SnapshotInterval,
SnapshotThreshold: conf.SnapshotThreshold,
HeartbeatTimeout: 250 * time.Millisecond,
ElectionTimeout: 250 * time.Millisecond,
}), t)
}
// Make sure that reload by itself doesn't trigger a vote
time.Sleep(300 * time.Millisecond)
require.Equal(t, uint32(0), atomic.LoadUint32(requestVotes))

// Stop the leader, ensure that we don't see a request vote within the first 50ms
// (original config of the non-restarted follower), but that we do see one within
// the 250ms both followers should now be using for heartbeat timeout. Well, not
// quite: we wait for two heartbeat intervals (plus a fudge factor), because the
// first time around, last contact will have been recent enough that no vote will
// be triggered.
env1.logger.Info("stopping leader")
env1.Shutdown()
time.Sleep(50 * time.Millisecond)
require.Equal(t, uint32(0), atomic.LoadUint32(requestVotes))
time.Sleep(600 * time.Millisecond)
require.NotEqual(t, uint32(0), atomic.LoadUint32(requestVotes))
}

for _, e := range allEnvs {
e.Release()
}
})
}
}
2 changes: 1 addition & 1 deletion observer.go
Expand Up @@ -10,7 +10,7 @@ type Observation struct {
// Raft holds the Raft instance generating the observation.
Raft *Raft
// Data holds observation-specific data. Possible types are
// *RequestVoteRequest
// RequestVoteRequest
// RaftState
// PeerObservation
// LeaderObservation
Expand Down
26 changes: 25 additions & 1 deletion raft.go
Expand Up @@ -190,6 +190,12 @@ func (r *Raft) runFollower() {
case b := <-r.bootstrapCh:
b.respond(r.liveBootstrap(b.configuration))

case <-r.leaderNotifyCh:
HridoyRoy marked this conversation as resolved.
Show resolved Hide resolved
// Ignore since we are not the leader

case <-r.followerNotifyCh:
heartbeatTimer = time.After(0)

case <-heartbeatTimer:
// Restart the heartbeat timer
hbTimeout := r.config().HeartbeatTimeout
Expand Down Expand Up @@ -275,7 +281,8 @@ func (r *Raft) runCandidate() {
// otherwise.
defer func() { r.candidateFromLeadershipTransfer = false }()

electionTimer := randomTimeout(r.config().ElectionTimeout)
electionTimeout := r.config().ElectionTimeout
electionTimer := randomTimeout(electionTimeout)

// Tally the votes, need a simple majority
grantedVotes := 0
Expand Down Expand Up @@ -337,6 +344,15 @@ func (r *Raft) runCandidate() {
case b := <-r.bootstrapCh:
b.respond(ErrCantBootstrap)

case <-r.leaderNotifyCh:
// Ignore since we are not the leader

case <-r.followerNotifyCh:
ncabatoff marked this conversation as resolved.
Show resolved Hide resolved
if electionTimeout != r.config().ElectionTimeout {
electionTimeout = r.config().ElectionTimeout
electionTimer = randomTimeout(electionTimeout)
}

case <-electionTimer:
// Election failed! Restart the election. We simply return,
// which will kick us back into runCandidate
Expand Down Expand Up @@ -819,6 +835,14 @@ func (r *Raft) leaderLoop() {
// Renew the lease timer
lease = time.After(checkInterval)

case <-r.leaderNotifyCh:
for _, repl := range r.leaderState.replState {
asyncNotifyCh(repl.notifyCh)
}

case <-r.followerNotifyCh:
HridoyRoy marked this conversation as resolved.
Show resolved Hide resolved
// Ignore since we are not a follower

case <-r.shutdownCh:
return
}
Expand Down
39 changes: 39 additions & 0 deletions raft_test.go
Expand Up @@ -2466,6 +2466,7 @@ func TestRaft_CacheLogWithStoreError(t *testing.T) {

func TestRaft_ReloadConfig(t *testing.T) {
conf := inmemConfig(t)
conf.LeaderLeaseTimeout = 40 * time.Millisecond
c := MakeCluster(1, t, conf)
defer c.Close()
raft := c.rafts[0]
Expand All @@ -2480,6 +2481,8 @@ func TestRaft_ReloadConfig(t *testing.T) {
TrailingLogs: 12345,
SnapshotInterval: 234 * time.Second,
SnapshotThreshold: 6789,
HeartbeatTimeout: 45 * time.Millisecond,
ElectionTimeout: 46 * time.Millisecond,
}

require.NoError(t, raft.ReloadConfig(newCfg))
Expand All @@ -2488,6 +2491,8 @@ func TestRaft_ReloadConfig(t *testing.T) {
require.Equal(t, newCfg.TrailingLogs, raft.config().TrailingLogs)
require.Equal(t, newCfg.SnapshotInterval, raft.config().SnapshotInterval)
require.Equal(t, newCfg.SnapshotThreshold, raft.config().SnapshotThreshold)
require.Equal(t, newCfg.HeartbeatTimeout, raft.config().HeartbeatTimeout)
require.Equal(t, newCfg.ElectionTimeout, raft.config().ElectionTimeout)
}

func TestRaft_ReloadConfigValidates(t *testing.T) {
Expand Down Expand Up @@ -2776,3 +2781,37 @@ func TestRaft_runFollower_State_Transition(t *testing.T) {
})
}
}

func TestRaft_runFollower_ReloadTimeoutConfigs(t *testing.T) {
conf := DefaultConfig()
conf.LocalID = ServerID("first")
conf.HeartbeatTimeout = 500 * time.Millisecond
conf.ElectionTimeout = 500 * time.Millisecond
conf.LeaderLeaseTimeout = 50 * time.Millisecond
conf.CommitTimeout = 5 * time.Millisecond
conf.SnapshotThreshold = 100
conf.TrailingLogs = 10
conf.skipStartup = true

env := MakeRaft(t, conf, false)
servers := []Server{{Voter, "first", ""}}
env.raft.setLatestConfiguration(Configuration{Servers: servers}, 1)
env.raft.setState(Follower)

// run the follower loop exclusively
go env.raft.runFollower()

newCfg := ReloadableConfig{
TrailingLogs: conf.TrailingLogs,
SnapshotInterval: conf.SnapshotInterval,
SnapshotThreshold: conf.SnapshotThreshold,
HeartbeatTimeout: 50 * time.Millisecond,
ElectionTimeout: 50 * time.Millisecond,
}
require.NoError(t, env.raft.ReloadConfig(newCfg))
// wait enough time to have HeartbeatTimeout
time.Sleep(3 * newCfg.HeartbeatTimeout)

// Check the follower loop set the right state
require.Equal(t, Candidate, env.raft.getState())
}