diff --git a/raft.go b/raft.go index a243c07f..4b2f7240 100644 --- a/raft.go +++ b/raft.go @@ -1363,6 +1363,7 @@ func (r *Raft) processHeartbeat(rpc RPC) { func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { defer metrics.MeasureSince([]string{"raft", "rpc", "appendEntries"}, time.Now()) // Setup a response + noRetryBackoffIfErr := true resp := &AppendEntriesResponse{ RPCHeader: r.getRPCHeader(), Term: r.getCurrentTerm(), @@ -1375,7 +1376,23 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { rpc.Respond(resp, rpcErr) }() - // Ignore an older term + // this use case would happen when a node was a voter and is added back to the cluster as non voter, + // it term could be higher then the cluster, but because it's a non voter that term need to be discarded + // in favor of the cluster current term to keep the cluster stable, as an election don't need to be started + // by a node which don't have voting rights. + currentTerm := r.getCurrentTerm() + hasVote := hasVote(r.getLatestConfiguration(), r.localID) + if a.Term < currentTerm && !hasVote && a.PrevLogTerm >= r.lastLogTerm { + r.logger.Warn("older term sent to non-voter", "PrevLogTerm", a.PrevLogTerm, + "lastLogTerm", r.lastLogTerm, + " Term", a.Term, "currentTerm", currentTerm) + r.setState(Follower) + r.setCurrentTerm(a.Term) + resp.Term = a.Term + noRetryBackoffIfErr = false + } + + // if a node is a voter, Ignore an older term if a.Term < r.getCurrentTerm() { return } @@ -1410,7 +1427,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { "previous-index", a.PrevLogEntry, "last-index", lastIdx, "error", err) - resp.NoRetryBackoff = true + resp.NoRetryBackoff = noRetryBackoffIfErr return } prevLogTerm = prevLog.Term diff --git a/raft_test.go b/raft_test.go index eee0b49f..2f0ba96a 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2644,6 +2644,85 @@ func TestRaft_VoteNotGranted_WhenNodeNotInCluster(t *testing.T) { } } +func TestRaft_StabilityIsKept_WhenNonVoterWithHigherTermJoin(t *testing.T) { + // Make a cluster + c := MakeCluster(3, t, nil) + + defer c.Close() + + // Get the leader + leader := c.Leader() + + // Wait until we have 2 followers + limit := time.Now().Add(c.longstopTimeout) + var followers []*Raft + for time.Now().Before(limit) && len(followers) != 2 { + c.WaitEvent(nil, c.conf.CommitTimeout) + followers = c.GetInState(Follower) + } + if len(followers) != 2 { + t.Fatalf("expected two followers: %v", followers) + } + + // Remove a follower + followerRemoved := followers[0] + future := leader.RemoveServer(followerRemoved.localID, 0, 0) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + //set that follower term to higher term to simulate a partitioning + followerRemoved.setCurrentTerm(leader.getCurrentTerm() + 100) + + //Add the node back as NonVoter + future = leader.AddNonvoter(followerRemoved.localID, followerRemoved.localAddr, 0, 0) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + // Wait a while + time.Sleep(c.propagateTimeout * 100) + + //Write some logs to ensure they replicate + for i := 0; i < 100; i++ { + future := leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) + if err := future.Error(); err != nil { + t.Fatalf("[ERR] apply err: %v", err) + } + } + c.WaitForReplication(100) + + // Check leader stable + newLeader := c.Leader() + if newLeader.leaderID != leader.leaderID { + t.Fatalf("leader changed") + } + + //Remove the server and add it back as Voter + future = leader.RemoveServer(followerRemoved.localID, 0, 0) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + leader.AddVoter(followerRemoved.localID, followerRemoved.localAddr, 0, 0) + + // Wait a while + time.Sleep(c.propagateTimeout * 10) + + //Write some logs to ensure they replicate + for i := 100; i < 200; i++ { + future := leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) + if err := future.Error(); err != nil { + t.Fatalf("[ERR] apply err: %v", err) + } + } + c.WaitForReplication(200) + + // Check leader stable + newLeader = c.Leader() + if newLeader.leaderID != leader.leaderID { + t.Fatalf("leader changed") + } +} + // TestRaft_FollowerRemovalNoElection ensures that a leader election is not // started when a standby is shut down and restarted. func TestRaft_FollowerRemovalNoElection(t *testing.T) { diff --git a/replication.go b/replication.go index efe46e4e..53481709 100644 --- a/replication.go +++ b/replication.go @@ -233,7 +233,7 @@ START: appendStats(string(peer.ID), start, float32(len(req.Entries))) // Check for a newer term, stop running - if resp.Term > req.Term { + if resp.Term > req.Term && s.peer.Suffrage != Nonvoter { r.handleStaleTerm(s) return true }