Skip to content

Commit

Permalink
remove leader transfer from pre-vote path, fix logs and comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
dhiaayachi committed Mar 28, 2024
1 parent de78bf8 commit 921be2d
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 50 deletions.
3 changes: 2 additions & 1 deletion api.go
Expand Up @@ -534,6 +534,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
applyCh = make(chan *logFuture, conf.MaxAppendEntries)
}

_, transportSupportPreVote := trans.(WithPreVote)
// Create Raft struct.
r := &Raft{
protocolVersion: protocolVersion,
Expand Down Expand Up @@ -563,7 +564,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
leaderNotifyCh: make(chan struct{}, 1),
followerNotifyCh: make(chan struct{}, 1),
mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second),
preVote: conf.PreVote,
preVote: conf.PreVote && transportSupportPreVote,
}

r.conf.Store(*conf)
Expand Down
10 changes: 0 additions & 10 deletions commands.go
Expand Up @@ -134,11 +134,6 @@ type RequestPreVoteRequest struct {
// Used to ensure safety
LastLogIndex uint64
LastLogTerm uint64

// Used to indicate to peers if this vote was triggered by a leadership
// transfer. It is required for leadership transfer to work, because servers
// wouldn't vote otherwise if they are aware of an existing leader.
LeadershipTransfer bool
}

// GetRPCHeader - See WithRPCHeader.
Expand All @@ -153,11 +148,6 @@ type RequestPreVoteResponse struct {
// Newer term if leader is out of date.
Term uint64

// Peers is deprecated, but required by servers that only understand
// protocol version 0. This is not populated in protocol version 2
// and later.
Peers []byte

// Is the vote granted.
Granted bool
}
Expand Down
8 changes: 4 additions & 4 deletions config.go
Expand Up @@ -232,11 +232,11 @@ type Config struct {
// raft's configuration and index values.
NoSnapshotRestoreOnStart bool

// skipStartup allows NewRaft() to bypass all background work goroutines
skipStartup bool

// PreVote activate the pre-vote feature
PreVote bool

// skipStartup allows NewRaft() to bypass all background work goroutines
skipStartup bool
}

func (conf *Config) getOrCreateLogger() hclog.Logger {
Expand Down Expand Up @@ -320,7 +320,7 @@ func DefaultConfig() *Config {
SnapshotInterval: 120 * time.Second,
SnapshotThreshold: 8192,
LeaderLeaseTimeout: 500 * time.Millisecond,
LogLevel: "TRACE",
LogLevel: "DEBUG",
}
}

Expand Down
61 changes: 26 additions & 35 deletions raft.go
Expand Up @@ -291,10 +291,11 @@ func (r *Raft) runCandidate() {
var voteCh <-chan *voteResult
var prevoteCh <-chan *preVoteResult

// check if the transport support prevote requests
_, ok := r.trans.(WithPreVote)
// check if pre-vote is active and that this is not a leader transfer

if r.preVote && ok {
// Leader transfer do not perform prevote by design, as the selected server is very likely to be fit
// and an election will happen in all cases.
if r.preVote && !r.candidateFromLeadershipTransfer.Load() {
prevoteCh = r.preElectSelf()
} else {
voteCh = r.electSelf()
Expand Down Expand Up @@ -327,10 +328,10 @@ func (r *Raft) runCandidate() {
case preVote := <-prevoteCh:
// This a pre-vote case it should trigger a "real" election if the pre-vote is won.
r.mainThreadSaturation.working()
r.logger.Debug("got a prevote!!", "from", preVote.voterID, "term", preVote.Term, "tally", preVoteGrantedVotes)
r.logger.Debug("pre-vote received", "from", preVote.voterID, "term", preVote.Term, "tally", preVoteGrantedVotes)
// Check if the term is greater than ours, bail
if preVote.Term > term {
r.logger.Debug("newer term discovered on pre-preVote, fallback to follower", "term", preVote.Term)
r.logger.Debug("pre-vote denied: found newer term, falling back to follower", "term", preVote.Term)
r.setState(Follower)
r.setCurrentTerm(preVote.Term)
return
Expand All @@ -339,15 +340,15 @@ func (r *Raft) runCandidate() {
// Check if the preVote is granted
if preVote.Granted {
preVoteGrantedVotes++
r.logger.Debug("prevote granted", "from", preVote.voterID, "term", preVote.Term, "tally", preVoteGrantedVotes)
r.logger.Debug("pre-vote granted", "from", preVote.voterID, "term", preVote.Term, "tally", preVoteGrantedVotes)
} else {
preVoteRefusedVotes++
r.logger.Debug("prevote refused", "from", preVote.voterID, "term", preVote.Term, "tally", preVoteGrantedVotes)
r.logger.Debug("pre-vote denied", "from", preVote.voterID, "term", preVote.Term, "tally", preVoteGrantedVotes)
}

// Check if we've won the pre-vote and proceed to election if so
if preVoteGrantedVotes >= votesNeeded {
r.logger.Info("pre election won", "term", preVote.Term, "tally", preVoteGrantedVotes, "votesNeeded", votesNeeded-1)
r.logger.Info("pre-vote successful, starting election", "term", preVote.Term, "tally", preVoteGrantedVotes, "votesNeeded", votesNeeded-1)
preVoteGrantedVotes = 0
preVoteRefusedVotes = 0
electionTimer = randomTimeout(electionTimeout)
Expand All @@ -357,7 +358,7 @@ func (r *Raft) runCandidate() {
// Check if we've lost the pre-vote and wait for the election to timeout so we can do another time of
// prevote.
if preVoteRefusedVotes >= votesNeeded {
r.logger.Info("pre election lost, wait for election to timeout", "term", preVote.Term, "tally", preVoteGrantedVotes, "votesNeeded", votesNeeded-1)
r.logger.Info("pre-vote campaign failed, waiting for election timeout", "term", preVote.Term, "tally", preVoteGrantedVotes, "votesNeeded", votesNeeded-1)
}
case vote := <-voteCh:
r.mainThreadSaturation.working()
Expand All @@ -369,7 +370,7 @@ func (r *Raft) runCandidate() {
return
}

// Check if the preVote is granted
// Check if the vote is granted
if vote.Granted {
grantedVotes++
r.logger.Debug("vote granted", "from", vote.voterID, "term", vote.Term, "tally", grantedVotes)
Expand Down Expand Up @@ -1721,7 +1722,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
return
}

// Persist a vote for safety\
// Persist a vote for safety
if err := r.persistVote(req.Term, candidateBytes); err != nil {
r.logger.Error("failed to persist vote", "error", err)
return
Expand All @@ -1731,7 +1732,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
r.setLastContact()
}

// requestPreVote is invoked when we get a request vote RPC call.
// requestPreVote is invoked when we get a request Pre-Vote RPC call.
func (r *Raft) requestPreVote(rpc RPC, req *RequestPreVoteRequest) {
defer metrics.MeasureSince([]string{"raft", "rpc", "requestVote"}, time.Now())
r.observe(*req)
Expand All @@ -1747,12 +1748,6 @@ func (r *Raft) requestPreVote(rpc RPC, req *RequestPreVoteRequest) {
rpc.Respond(resp, rpcErr)
}()

// Version 0 servers will panic unless the peers is present. It's only
// used on them to produce a warning message.
if r.protocolVersion < 2 {
resp.Peers = encodePeers(r.configurations.latest, r.trans)
}

// Check if we have an existing leader [who's not the candidate] and also
// check the LeadershipTransfer flag is set. Usually votes are rejected if
// there is a known leader. But if the leader initiated a leadership transfer,
Expand All @@ -1768,7 +1763,7 @@ func (r *Raft) requestPreVote(rpc RPC, req *RequestPreVoteRequest) {
return
}

if leaderAddr, leaderID := r.LeaderWithID(); leaderAddr != "" && leaderAddr != candidate && !req.LeadershipTransfer {
if leaderAddr, leaderID := r.LeaderWithID(); leaderAddr != "" && leaderAddr != candidate {
r.logger.Warn("rejecting pre-vote request since we have a leader",
"from", candidate,
"leader", leaderAddr,
Expand All @@ -1781,18 +1776,14 @@ func (r *Raft) requestPreVote(rpc RPC, req *RequestPreVoteRequest) {
return
}

// Increase the term if we see a newer one
if req.Term > r.getCurrentTerm() {
// continue processing here to possibly grant the pre-vote as in a "real" vote this will transition us to follower
r.logger.Debug("received a requestPreVote with a newer term, grant the pre-vote")
resp.Term = req.Term
}

// if we get a request for vote from a nonVoter and the request term is higher,
// step down and update term, but reject the vote request
// if we get a request for a pre-vote from a nonVoter and the request term is higher, do not grant the Pre-Vote
// This could happen when a node, previously voter, is converted to non-voter
// The reason we need to step in is to permit to the cluster to make progress in such a scenario
// More details about that in https://github.com/hashicorp/raft/pull/526
if len(r.configurations.latest.Servers) > 0 && !hasVote(r.configurations.latest, candidateID) {
r.logger.Warn("rejecting pre-vote request since node is not a voter", "from", candidate)
return
Expand All @@ -1801,15 +1792,15 @@ func (r *Raft) requestPreVote(rpc RPC, req *RequestPreVoteRequest) {
// Reject if their term is older
lastIdx, lastTerm := r.getLastEntry()
if lastTerm > req.LastLogTerm {
r.logger.Warn("rejecting vote request since our last term is greater",
r.logger.Warn("rejecting pre-vote request since our last term is greater",
"candidate", candidate,
"last-term", lastTerm,
"last-candidate-term", req.LastLogTerm)
return
}

if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex {
r.logger.Warn("rejecting vote request since our last index is greater",
r.logger.Warn("rejecting pre-vote request since our last index is greater",
"candidate", candidate,
"last-index", lastIdx,
"last-candidate-index", req.LastLogIndex)
Expand Down Expand Up @@ -2055,10 +2046,11 @@ func (r *Raft) electSelf() <-chan *voteResult {
return respCh
}

// preElectSelf is used to send a RequestVote RPC to all peers, and vote for
// ourself. This has the side affecting of incrementing the current term. The
// preElectSelf is used to send a RequestPreVote RPC to all peers, and vote for
// ourself. This will not increment the current term. The
// response channel returned is used to wait for all the responses (including a
// vote for ourself). This must only be called from the main thread.
// vote for ourself).
// This must only be called from the main thread.
func (r *Raft) preElectSelf() <-chan *preVoteResult {
// Create a response channel
respCh := make(chan *preVoteResult, len(r.configurations.latest.Servers))
Expand All @@ -2072,10 +2064,9 @@ func (r *Raft) preElectSelf() <-chan *preVoteResult {
RPCHeader: r.getRPCHeader(),
Term: newTerm,
// this is needed for retro compatibility, before RPCHeader.Addr was added
Candidate: r.trans.EncodePeer(r.localID, r.localAddr),
LastLogIndex: lastIdx,
LastLogTerm: lastTerm,
LeadershipTransfer: r.candidateFromLeadershipTransfer.Load(),
Candidate: r.trans.EncodePeer(r.localID, r.localAddr),
LastLogIndex: lastIdx,
LastLogTerm: lastTerm,
}

// Construct a function to ask for a vote
Expand Down Expand Up @@ -2113,8 +2104,8 @@ func (r *Raft) preElectSelf() <-chan *preVoteResult {
if server.Suffrage == Voter {
if server.ID == r.localID {
r.logger.Debug("pre-voting for self", "term", req.Term, "id", r.localID)
// Persist a vote for ourselves
// Include our own vote

// cast a pre-vote for our self
respCh <- &preVoteResult{
RequestPreVoteResponse: RequestPreVoteResponse{
RPCHeader: r.getRPCHeader(),
Expand Down
4 changes: 4 additions & 0 deletions raft_test.go
Expand Up @@ -2061,6 +2061,10 @@ func TestRaft_AppendEntry(t *testing.T) {
require.True(t, resp2.Success)
}

// TestRaft_PreVoteMixedCluster focus on testing a cluster with
// a mix of nodes that have pre-vote activated and deactivated.
// Once the cluster is created, we force an election by partioning the leader
// and verify that the cluster regain stability.
func TestRaft_PreVoteMixedCluster(t *testing.T) {

tcs := []struct {
Expand Down

0 comments on commit 921be2d

Please sign in to comment.