From d142e9b066a862b279321ba96f611e1e3760854f Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Thu, 8 Sep 2022 15:03:52 -0400 Subject: [PATCH 1/3] fix for non voter with higher term and add test --- raft.go | 16 ++++++++-- raft_test.go | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++ replication.go | 2 +- 3 files changed, 94 insertions(+), 3 deletions(-) diff --git a/raft.go b/raft.go index a243c07fb..655cf7c96 100644 --- a/raft.go +++ b/raft.go @@ -1375,7 +1375,19 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { rpc.Respond(resp, rpcErr) }() - // Ignore an older term + // this use case would happen when a node was a voter and is added back to the cluster as non voter, + // it term could be higher then the cluster, but because it's a non voter that term need to be discarded + // in favor of the cluster current term to keep the cluster stable, as an election don't need to be started + // by a node which don't have voting rights. + currentTerm := r.getCurrentTerm() + hasVote := hasVote(r.configurations.latest, r.localID) + if a.Term < currentTerm && !hasVote { + r.setState(Follower) + r.setCurrentTerm(a.Term) + resp.Term = a.Term + } + + // if a node is a voter, Ignore an older term if a.Term < r.getCurrentTerm() { return } @@ -1410,7 +1422,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { "previous-index", a.PrevLogEntry, "last-index", lastIdx, "error", err) - resp.NoRetryBackoff = true + resp.NoRetryBackoff = hasVote return } prevLogTerm = prevLog.Term diff --git a/raft_test.go b/raft_test.go index eee0b49f5..2f0ba96a0 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2644,6 +2644,85 @@ func TestRaft_VoteNotGranted_WhenNodeNotInCluster(t *testing.T) { } } +func TestRaft_StabilityIsKept_WhenNonVoterWithHigherTermJoin(t *testing.T) { + // Make a cluster + c := MakeCluster(3, t, nil) + + defer c.Close() + + // Get the leader + leader := c.Leader() + + // Wait until we have 2 followers + limit := time.Now().Add(c.longstopTimeout) + var followers []*Raft + for time.Now().Before(limit) && len(followers) != 2 { + c.WaitEvent(nil, c.conf.CommitTimeout) + followers = c.GetInState(Follower) + } + if len(followers) != 2 { + t.Fatalf("expected two followers: %v", followers) + } + + // Remove a follower + followerRemoved := followers[0] + future := leader.RemoveServer(followerRemoved.localID, 0, 0) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + //set that follower term to higher term to simulate a partitioning + followerRemoved.setCurrentTerm(leader.getCurrentTerm() + 100) + + //Add the node back as NonVoter + future = leader.AddNonvoter(followerRemoved.localID, followerRemoved.localAddr, 0, 0) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + // Wait a while + time.Sleep(c.propagateTimeout * 100) + + //Write some logs to ensure they replicate + for i := 0; i < 100; i++ { + future := leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) + if err := future.Error(); err != nil { + t.Fatalf("[ERR] apply err: %v", err) + } + } + c.WaitForReplication(100) + + // Check leader stable + newLeader := c.Leader() + if newLeader.leaderID != leader.leaderID { + t.Fatalf("leader changed") + } + + //Remove the server and add it back as Voter + future = leader.RemoveServer(followerRemoved.localID, 0, 0) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + leader.AddVoter(followerRemoved.localID, followerRemoved.localAddr, 0, 0) + + // Wait a while + time.Sleep(c.propagateTimeout * 10) + + //Write some logs to ensure they replicate + for i := 100; i < 200; i++ { + future := leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) + if err := future.Error(); err != nil { + t.Fatalf("[ERR] apply err: %v", err) + } + } + c.WaitForReplication(200) + + // Check leader stable + newLeader = c.Leader() + if newLeader.leaderID != leader.leaderID { + t.Fatalf("leader changed") + } +} + // TestRaft_FollowerRemovalNoElection ensures that a leader election is not // started when a standby is shut down and restarted. func TestRaft_FollowerRemovalNoElection(t *testing.T) { diff --git a/replication.go b/replication.go index efe46e4e4..534817092 100644 --- a/replication.go +++ b/replication.go @@ -233,7 +233,7 @@ START: appendStats(string(peer.ID), start, float32(len(req.Entries))) // Check for a newer term, stop running - if resp.Term > req.Term { + if resp.Term > req.Term && s.peer.Suffrage != Nonvoter { r.handleStaleTerm(s) return true } From 67d7891302d06bffe729ef324603da638b7a687f Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Thu, 8 Sep 2022 15:33:34 -0400 Subject: [PATCH 2/3] fix data race --- raft.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft.go b/raft.go index 655cf7c96..2e4c9734d 100644 --- a/raft.go +++ b/raft.go @@ -1380,7 +1380,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { // in favor of the cluster current term to keep the cluster stable, as an election don't need to be started // by a node which don't have voting rights. currentTerm := r.getCurrentTerm() - hasVote := hasVote(r.configurations.latest, r.localID) + hasVote := hasVote(r.getLatestConfiguration(), r.localID) if a.Term < currentTerm && !hasVote { r.setState(Follower) r.setCurrentTerm(a.Term) From dbd6d068edfd1dda4ea4b0d960726165f9163bb8 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Fri, 9 Sep 2022 13:45:23 -0400 Subject: [PATCH 3/3] change fix to only apply for append entries with newer logs term --- raft.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/raft.go b/raft.go index 2e4c9734d..4b2f7240a 100644 --- a/raft.go +++ b/raft.go @@ -1363,6 +1363,7 @@ func (r *Raft) processHeartbeat(rpc RPC) { func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { defer metrics.MeasureSince([]string{"raft", "rpc", "appendEntries"}, time.Now()) // Setup a response + noRetryBackoffIfErr := true resp := &AppendEntriesResponse{ RPCHeader: r.getRPCHeader(), Term: r.getCurrentTerm(), @@ -1381,10 +1382,14 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { // by a node which don't have voting rights. currentTerm := r.getCurrentTerm() hasVote := hasVote(r.getLatestConfiguration(), r.localID) - if a.Term < currentTerm && !hasVote { + if a.Term < currentTerm && !hasVote && a.PrevLogTerm >= r.lastLogTerm { + r.logger.Warn("older term sent to non-voter", "PrevLogTerm", a.PrevLogTerm, + "lastLogTerm", r.lastLogTerm, + " Term", a.Term, "currentTerm", currentTerm) r.setState(Follower) r.setCurrentTerm(a.Term) resp.Term = a.Term + noRetryBackoffIfErr = false } // if a node is a voter, Ignore an older term @@ -1422,7 +1427,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { "previous-index", a.PrevLogEntry, "last-index", lastIdx, "error", err) - resp.NoRetryBackoff = hasVote + resp.NoRetryBackoff = noRetryBackoffIfErr return } prevLogTerm = prevLog.Term