From 8d010544f03e03d1e687a9cec5e197389d79cf8b Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 21 Sep 2021 23:49:22 -0400 Subject: [PATCH 01/31] add test to verify that a removed node is not able to vote --- raft_test.go | 66 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/raft_test.go b/raft_test.go index bf4bfcbd4..35ced0307 100644 --- a/raft_test.go +++ b/raft_test.go @@ -702,6 +702,72 @@ func TestRaft_RemoveFollower(t *testing.T) { require.Equal(t, Follower, follower.getState()) } +func TestRaft_RemovedFollower_Vote(t *testing.T) { + // Make a cluster + c := MakeCluster(3, t, nil) + defer c.Close() + + // Get the leader + leader := c.Leader() + + // Wait until we have 2 followers + limit := time.Now().Add(c.longstopTimeout) + var followers []*Raft + for time.Now().Before(limit) && len(followers) != 2 { + c.WaitEvent(nil, c.conf.CommitTimeout) + followers = c.GetInState(Follower) + } + if len(followers) != 2 { + t.Fatalf("expected two followers: %v", followers) + } + + // Remove a followerRemoved + followerRemoved := followers[0] + future := leader.RemoveServer(followerRemoved.localID, 0, 0) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait a while + time.Sleep(c.propagateTimeout) + + // Other nodes should have fewer peers + if configuration := c.getConfiguration(leader); len(configuration.Servers) != 2 { + t.Fatalf("too many peers") + } + if configuration := c.getConfiguration(followers[1]); len(configuration.Servers) != 2 { + t.Fatalf("too many peers") + } + require.Equal(t, Follower, followerRemoved.getState()) + + follower := followers[1] + + followerRemovedT := c.trans[c.IndexOf(followerRemoved)] + reqVote := RequestVoteRequest{ + RPCHeader: followerRemoved.getRPCHeader(), + Term: followerRemoved.getCurrentTerm() + 10, + Candidate: followerRemovedT.EncodePeer(followerRemoved.localID, followerRemoved.localAddr), + LastLogIndex: followerRemoved.LastIndex(), + LastLogTerm: followerRemoved.getCurrentTerm(), + LeadershipTransfer: false, + } + // a follower that thinks there's a leader should vote for that leader. + var resp RequestVoteResponse + + // + c.Partition([]ServerAddress{leader.localAddr}) + + time.Sleep(c.propagateTimeout) + require.Equal(t, Candidate, follower.getState()) + if err := followerRemovedT.RequestVote(follower.localID, follower.localAddr, &reqVote, &resp); err != nil { + t.Fatalf("RequestVote RPC failed %v", err) + } + + if resp.Granted { + t.Fatalf("expected vote to not be granted, but it was %+v", resp) + } +} + func TestRaft_RemoveLeader(t *testing.T) { // Make a cluster c := MakeCluster(3, t, nil) From 9ed9594aa955d2312d43bf3a9420fb1dd19aaefa Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 22 Sep 2021 08:34:58 -0400 Subject: [PATCH 02/31] add some comments to the tests --- raft_test.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/raft_test.go b/raft_test.go index 35ced0307..1e7a2452d 100644 --- a/raft_test.go +++ b/raft_test.go @@ -721,7 +721,7 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { t.Fatalf("expected two followers: %v", followers) } - // Remove a followerRemoved + // Remove a follower followerRemoved := followers[0] future := leader.RemoveServer(followerRemoved.localID, 0, 0) if err := future.Error(); err != nil { @@ -738,10 +738,11 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { if configuration := c.getConfiguration(followers[1]); len(configuration.Servers) != 2 { t.Fatalf("too many peers") } + // The removed node should be still in Follower state require.Equal(t, Follower, followerRemoved.getState()) + // Prepare a Vote request from the removed follower follower := followers[1] - followerRemovedT := c.trans[c.IndexOf(followerRemoved)] reqVote := RequestVoteRequest{ RPCHeader: followerRemoved.getRPCHeader(), @@ -754,15 +755,19 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { // a follower that thinks there's a leader should vote for that leader. var resp RequestVoteResponse - // + // partiton the leader to simulate an unstable cluster c.Partition([]ServerAddress{leader.localAddr}) - time.Sleep(c.propagateTimeout) + + // wait for the remaining follower to trigger an election require.Equal(t, Candidate, follower.getState()) + + // send a vote request from the removed follower to the Candidate follower if err := followerRemovedT.RequestVote(follower.localID, follower.localAddr, &reqVote, &resp); err != nil { t.Fatalf("RequestVote RPC failed %v", err) } + // the vote request should not be granted, because the voter is not part of the cluster anymore if resp.Granted { t.Fatalf("expected vote to not be granted, but it was %+v", resp) } From 3a2e40589e3dfd85395c8555f27f35ce59d92d06 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 22 Sep 2021 11:36:27 -0400 Subject: [PATCH 03/31] add wait loop to test --- raft_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/raft_test.go b/raft_test.go index 1e7a2452d..1fee1adb7 100644 --- a/raft_test.go +++ b/raft_test.go @@ -760,6 +760,11 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { time.Sleep(c.propagateTimeout) // wait for the remaining follower to trigger an election + count := 0 + for follower.getState() != Candidate && count < 1000 { + count++ + time.Sleep(1 * time.Millisecond) + } require.Equal(t, Candidate, follower.getState()) // send a vote request from the removed follower to the Candidate follower From 8da6d8db8b6629e2357a92d35b93cd3028d2e00e Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Thu, 23 Sep 2021 15:28:57 -0400 Subject: [PATCH 04/31] remove test related to removed node voting --- raft_test.go | 76 ---------------------------------------------------- 1 file changed, 76 deletions(-) diff --git a/raft_test.go b/raft_test.go index 1fee1adb7..bf4bfcbd4 100644 --- a/raft_test.go +++ b/raft_test.go @@ -702,82 +702,6 @@ func TestRaft_RemoveFollower(t *testing.T) { require.Equal(t, Follower, follower.getState()) } -func TestRaft_RemovedFollower_Vote(t *testing.T) { - // Make a cluster - c := MakeCluster(3, t, nil) - defer c.Close() - - // Get the leader - leader := c.Leader() - - // Wait until we have 2 followers - limit := time.Now().Add(c.longstopTimeout) - var followers []*Raft - for time.Now().Before(limit) && len(followers) != 2 { - c.WaitEvent(nil, c.conf.CommitTimeout) - followers = c.GetInState(Follower) - } - if len(followers) != 2 { - t.Fatalf("expected two followers: %v", followers) - } - - // Remove a follower - followerRemoved := followers[0] - future := leader.RemoveServer(followerRemoved.localID, 0, 0) - if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) - } - - // Wait a while - time.Sleep(c.propagateTimeout) - - // Other nodes should have fewer peers - if configuration := c.getConfiguration(leader); len(configuration.Servers) != 2 { - t.Fatalf("too many peers") - } - if configuration := c.getConfiguration(followers[1]); len(configuration.Servers) != 2 { - t.Fatalf("too many peers") - } - // The removed node should be still in Follower state - require.Equal(t, Follower, followerRemoved.getState()) - - // Prepare a Vote request from the removed follower - follower := followers[1] - followerRemovedT := c.trans[c.IndexOf(followerRemoved)] - reqVote := RequestVoteRequest{ - RPCHeader: followerRemoved.getRPCHeader(), - Term: followerRemoved.getCurrentTerm() + 10, - Candidate: followerRemovedT.EncodePeer(followerRemoved.localID, followerRemoved.localAddr), - LastLogIndex: followerRemoved.LastIndex(), - LastLogTerm: followerRemoved.getCurrentTerm(), - LeadershipTransfer: false, - } - // a follower that thinks there's a leader should vote for that leader. - var resp RequestVoteResponse - - // partiton the leader to simulate an unstable cluster - c.Partition([]ServerAddress{leader.localAddr}) - time.Sleep(c.propagateTimeout) - - // wait for the remaining follower to trigger an election - count := 0 - for follower.getState() != Candidate && count < 1000 { - count++ - time.Sleep(1 * time.Millisecond) - } - require.Equal(t, Candidate, follower.getState()) - - // send a vote request from the removed follower to the Candidate follower - if err := followerRemovedT.RequestVote(follower.localID, follower.localAddr, &reqVote, &resp); err != nil { - t.Fatalf("RequestVote RPC failed %v", err) - } - - // the vote request should not be granted, because the voter is not part of the cluster anymore - if resp.Granted { - t.Fatalf("expected vote to not be granted, but it was %+v", resp) - } -} - func TestRaft_RemoveLeader(t *testing.T) { // Make a cluster c := MakeCluster(3, t, nil) From b4e22731eb5aaab5be60f9124e13fafd095b4537 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Thu, 23 Sep 2021 15:30:35 -0400 Subject: [PATCH 05/31] add test that check a removed node is not allowed to vote --- raft_test.go | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/raft_test.go b/raft_test.go index bf4bfcbd4..416f76f1d 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2345,3 +2345,79 @@ func TestRaft_InstallSnapshot_InvalidPeers(t *testing.T) { require.Error(t, resp.Error) require.Contains(t, resp.Error.Error(), "failed to decode peers") } + +func TestRaft_RemovedFollower_Vote(t *testing.T) { + // Make a cluster + c := MakeCluster(3, t, nil) + defer c.Close() + + // Get the leader + leader := c.Leader() + + // Wait until we have 2 followers + limit := time.Now().Add(c.longstopTimeout) + var followers []*Raft + for time.Now().Before(limit) && len(followers) != 2 { + c.WaitEvent(nil, c.conf.CommitTimeout) + followers = c.GetInState(Follower) + } + if len(followers) != 2 { + t.Fatalf("expected two followers: %v", followers) + } + + // Remove a follower + followerRemoved := followers[0] + future := leader.RemoveServer(followerRemoved.localID, 0, 0) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait a while + time.Sleep(c.propagateTimeout) + + // Other nodes should have fewer peers + if configuration := c.getConfiguration(leader); len(configuration.Servers) != 2 { + t.Fatalf("too many peers") + } + if configuration := c.getConfiguration(followers[1]); len(configuration.Servers) != 2 { + t.Fatalf("too many peers") + } + // The removed node should be still in Follower state + require.Equal(t, Follower, followerRemoved.getState()) + + // Prepare a Vote request from the removed follower + follower := followers[1] + followerRemovedT := c.trans[c.IndexOf(followerRemoved)] + reqVote := RequestVoteRequest{ + RPCHeader: followerRemoved.getRPCHeader(), + Term: followerRemoved.getCurrentTerm() + 10, + Candidate: followerRemovedT.EncodePeer(followerRemoved.localID, followerRemoved.localAddr), + LastLogIndex: followerRemoved.LastIndex(), + LastLogTerm: followerRemoved.getCurrentTerm(), + LeadershipTransfer: false, + } + // a follower that thinks there's a leader should vote for that leader. + var resp RequestVoteResponse + + // partiton the leader to simulate an unstable cluster + c.Partition([]ServerAddress{leader.localAddr}) + time.Sleep(c.propagateTimeout) + + // wait for the remaining follower to trigger an election + count := 0 + for follower.getState() != Candidate && count < 1000 { + count++ + time.Sleep(1 * time.Millisecond) + } + require.Equal(t, Candidate, follower.getState()) + + // send a vote request from the removed follower to the Candidate follower + if err := followerRemovedT.RequestVote(follower.localID, follower.localAddr, &reqVote, &resp); err != nil { + t.Fatalf("RequestVote RPC failed %v", err) + } + + // the vote request should not be granted, because the voter is not part of the cluster anymore + if resp.Granted { + t.Fatalf("expected vote to not be granted, but it was %+v", resp) + } +} From 47da40c69f630768f56c21a100c0c74c9e0f8921 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Thu, 23 Sep 2021 15:36:52 -0400 Subject: [PATCH 06/31] add wait loop to make test more robust --- raft_test.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/raft_test.go b/raft_test.go index 416f76f1d..20b4cd628 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2382,6 +2382,7 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { if configuration := c.getConfiguration(followers[1]); len(configuration.Servers) != 2 { t.Fatalf("too many peers") } + waitforState(followerRemoved, Follower) // The removed node should be still in Follower state require.Equal(t, Follower, followerRemoved.getState()) @@ -2404,11 +2405,7 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { time.Sleep(c.propagateTimeout) // wait for the remaining follower to trigger an election - count := 0 - for follower.getState() != Candidate && count < 1000 { - count++ - time.Sleep(1 * time.Millisecond) - } + waitforState(follower, Candidate) require.Equal(t, Candidate, follower.getState()) // send a vote request from the removed follower to the Candidate follower @@ -2421,3 +2418,11 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { t.Fatalf("expected vote to not be granted, but it was %+v", resp) } } + +func waitforState(follower *Raft, state RaftState) { + count := 0 + for follower.getState() != state && count < 1000 { + count++ + time.Sleep(1 * time.Millisecond) + } +} From 10c6f7cb775317da28ce267c96daca649629d33e Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Thu, 23 Sep 2021 15:40:02 -0400 Subject: [PATCH 07/31] increase timeout --- raft_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/raft_test.go b/raft_test.go index 20b4cd628..a1c931a8c 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2382,6 +2382,7 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { if configuration := c.getConfiguration(followers[1]); len(configuration.Servers) != 2 { t.Fatalf("too many peers") } + waitforState(followerRemoved, Follower) // The removed node should be still in Follower state require.Equal(t, Follower, followerRemoved.getState()) @@ -2421,8 +2422,8 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { func waitforState(follower *Raft, state RaftState) { count := 0 - for follower.getState() != state && count < 1000 { + for follower.getState() != state && count < 5000 { count++ - time.Sleep(1 * time.Millisecond) + time.Sleep(10 * time.Millisecond) } } From 4aa895c09d83df00a5433a479d3d37959b1f180d Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Thu, 23 Sep 2021 15:41:54 -0400 Subject: [PATCH 08/31] Revert "increase timeout" This reverts commit 316c59ad1f9b993548b4778870c35673260f51a2. --- raft_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/raft_test.go b/raft_test.go index a1c931a8c..20b4cd628 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2382,7 +2382,6 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { if configuration := c.getConfiguration(followers[1]); len(configuration.Servers) != 2 { t.Fatalf("too many peers") } - waitforState(followerRemoved, Follower) // The removed node should be still in Follower state require.Equal(t, Follower, followerRemoved.getState()) @@ -2422,8 +2421,8 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { func waitforState(follower *Raft, state RaftState) { count := 0 - for follower.getState() != state && count < 5000 { + for follower.getState() != state && count < 1000 { count++ - time.Sleep(10 * time.Millisecond) + time.Sleep(1 * time.Millisecond) } } From f22dae0ac6aefe9c7f0f09f924a517d45f35bcf1 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 29 Sep 2021 10:32:42 -0400 Subject: [PATCH 09/31] add ID to `RequestVoteRequest` and check if the node is part of the cluster before granting a vote --- commands.go | 1 + config.go | 2 +- inmem_transport.go | 10 ++++++++++ net_transport.go | 10 ++++++++++ raft.go | 14 +++++++++++++ raft_test.go | 49 ++++++++++++++++++++++++++++++++++++++++++++++ transport.go | 6 ++++++ 7 files changed, 91 insertions(+), 1 deletion(-) diff --git a/commands.go b/commands.go index 3358a3284..07ba41212 100644 --- a/commands.go +++ b/commands.go @@ -73,6 +73,7 @@ type RequestVoteRequest struct { Term uint64 Candidate []byte + ID []byte // Used to ensure safety LastLogIndex uint64 LastLogTerm uint64 diff --git a/config.go b/config.go index 78dde9225..eda4e4525 100644 --- a/config.go +++ b/config.go @@ -91,7 +91,7 @@ const ( // ProtocolVersionMin is the minimum protocol version ProtocolVersionMin ProtocolVersion = 0 // ProtocolVersionMax is the maximum protocol version - ProtocolVersionMax = 3 + ProtocolVersionMax = 4 ) // SnapshotVersion is the version of snapshots that this server can understand. diff --git a/inmem_transport.go b/inmem_transport.go index b5bdecc73..7613cb630 100644 --- a/inmem_transport.go +++ b/inmem_transport.go @@ -194,6 +194,16 @@ func (i *InmemTransport) DecodePeer(buf []byte) ServerAddress { return ServerAddress(buf) } +// EncodePeer implements the Transport interface. +func (i *InmemTransport) EncodeID(id ServerID) []byte { + return []byte(id) +} + +// DecodePeer implements the Transport interface. +func (i *InmemTransport) DecodeID(buf []byte) ServerID { + return ServerID(buf) +} + // Connect is used to connect this transport to another transport for // a given peer name. This allows for local routing. func (i *InmemTransport) Connect(peer ServerAddress, t Transport) { diff --git a/net_transport.go b/net_transport.go index 3ac845290..b36fe8b11 100644 --- a/net_transport.go +++ b/net_transport.go @@ -466,6 +466,16 @@ func (n *NetworkTransport) DecodePeer(buf []byte) ServerAddress { return ServerAddress(buf) } +// EncodeID implements the Transport interface. +func (n *NetworkTransport) EncodeID(id ServerID) []byte { + return []byte(id) +} + +// DecodeID implements the Transport interface. +func (n *NetworkTransport) DecodeID(buf []byte) ServerID { + return ServerID(buf) +} + // TimeoutNow implements the Transport interface. func (n *NetworkTransport) TimeoutNow(id ServerID, target ServerAddress, args *TimeoutNowRequest, resp *TimeoutNowResponse) error { return n.genericRPC(id, target, rpcTimeoutNow, args, resp) diff --git a/raft.go b/raft.go index a53492bd4..2e0448cb3 100644 --- a/raft.go +++ b/raft.go @@ -1480,6 +1480,19 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // there is a known leader. But if the leader initiated a leadership transfer, // vote! candidate := r.trans.DecodePeer(req.Candidate) + + // Prior to version 4 the peer ID is not part of `RequestVoteRequest`, + // We assume that the peer is part of the configuration and skip this check + if r.protocolVersion > 3 { + candidateID := r.trans.DecodeID(req.ID) + // if the Servers list is empty that mean the cluster is very likely trying to bootstrap, + // Grant the vote + if len(r.configurations.latest.Servers) > 0 && !inConfig(r.configurations.latest, candidateID) { + r.logger.Warn("rejecting vote request since voter not part of the configuration", + "from", candidate) + return + } + } if leader := r.Leader(); leader != "" && leader != candidate && !req.LeadershipTransfer { r.logger.Warn("rejecting vote request since we have a leader", "from", candidate, @@ -1712,6 +1725,7 @@ func (r *Raft) electSelf() <-chan *voteResult { RPCHeader: r.getRPCHeader(), Term: r.getCurrentTerm(), Candidate: r.trans.EncodePeer(r.localID, r.localAddr), + ID: r.trans.EncodeID(r.localID), LastLogIndex: lastIdx, LastLogTerm: lastTerm, LeadershipTransfer: r.candidateFromLeadershipTransfer, diff --git a/raft_test.go b/raft_test.go index 20b4cd628..842414f97 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1740,6 +1740,53 @@ func TestRaft_Voting(t *testing.T) { RPCHeader: ldr.getRPCHeader(), Term: ldr.getCurrentTerm() + 10, Candidate: ldrT.EncodePeer(ldr.localID, ldr.localAddr), + ID: ldrT.EncodeID(ldr.localID), + LastLogIndex: ldr.LastIndex(), + LastLogTerm: ldr.getCurrentTerm(), + LeadershipTransfer: false, + } + // a follower that thinks there's a leader should vote for that leader. + var resp RequestVoteResponse + if err := ldrT.RequestVote(followers[0].localID, followers[0].localAddr, &reqVote, &resp); err != nil { + t.Fatalf("RequestVote RPC failed %v", err) + } + if !resp.Granted { + t.Fatalf("expected vote to be granted, but wasn't %+v", resp) + } + // a follower that thinks there's a leader shouldn't vote for a different candidate + reqVote.Candidate = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) + if err := ldrT.RequestVote(followers[1].localID, followers[1].localAddr, &reqVote, &resp); err != nil { + t.Fatalf("RequestVote RPC failed %v", err) + } + if resp.Granted { + t.Fatalf("expected vote not to be granted, but was %+v", resp) + } + // a follower that thinks there's a leader, but the request has the leadership transfer flag, should + // vote for a different candidate + reqVote.LeadershipTransfer = true + reqVote.Candidate = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) + if err := ldrT.RequestVote(followers[1].localID, followers[1].localAddr, &reqVote, &resp); err != nil { + t.Fatalf("RequestVote RPC failed %v", err) + } + if !resp.Granted { + t.Fatalf("expected vote to be granted, but wasn't %+v", resp) + } +} + +func TestRaft_Voting_portocolVersion3(t *testing.T) { + conf := inmemConfig(t) + conf.ProtocolVersion = 3 + c := MakeCluster(3, t, conf) + defer c.Close() + followers := c.Followers() + ldr := c.Leader() + ldrT := c.trans[c.IndexOf(ldr)] + + reqVote := RequestVoteRequest{ + RPCHeader: ldr.getRPCHeader(), + Term: ldr.getCurrentTerm() + 10, + Candidate: ldrT.EncodePeer(ldr.localID, ldr.localAddr), + ID: ldrT.EncodeID(ldr.localID), LastLogIndex: ldr.LastIndex(), LastLogTerm: ldr.getCurrentTerm(), LeadershipTransfer: false, @@ -2349,6 +2396,7 @@ func TestRaft_InstallSnapshot_InvalidPeers(t *testing.T) { func TestRaft_RemovedFollower_Vote(t *testing.T) { // Make a cluster c := MakeCluster(3, t, nil) + defer c.Close() // Get the leader @@ -2393,6 +2441,7 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { RPCHeader: followerRemoved.getRPCHeader(), Term: followerRemoved.getCurrentTerm() + 10, Candidate: followerRemovedT.EncodePeer(followerRemoved.localID, followerRemoved.localAddr), + ID: followerRemovedT.EncodeID(followerRemoved.localID), LastLogIndex: followerRemoved.LastIndex(), LastLogTerm: followerRemoved.getCurrentTerm(), LeadershipTransfer: false, diff --git a/transport.go b/transport.go index b18d24593..3b9fbd67d 100644 --- a/transport.go +++ b/transport.go @@ -53,6 +53,12 @@ type Transport interface { // DecodePeer is used to deserialize a peer's address. DecodePeer([]byte) ServerAddress + // EncodePeer is used to serialize a peer's address. + EncodeID(id ServerID) []byte + + // DecodePeer is used to deserialize a peer's address. + DecodeID([]byte) ServerID + // SetHeartbeatHandler is used to setup a heartbeat handler // as a fast-pass. This is to avoid head-of-line blocking from // disk IO. If a Transport does not support this, it can simply From 822cde271d22cac35867fb6d7830d68857d20229 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Thu, 30 Sep 2021 16:21:55 -0400 Subject: [PATCH 10/31] use request protocol version to ensure we have the right version --- raft.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft.go b/raft.go index 2e0448cb3..048959915 100644 --- a/raft.go +++ b/raft.go @@ -1483,7 +1483,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // Prior to version 4 the peer ID is not part of `RequestVoteRequest`, // We assume that the peer is part of the configuration and skip this check - if r.protocolVersion > 3 { + if req.ProtocolVersion > 3 { candidateID := r.trans.DecodeID(req.ID) // if the Servers list is empty that mean the cluster is very likely trying to bootstrap, // Grant the vote From e7a1af303ee76451627233d6a5daea440c103d6e Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 5 Oct 2021 15:42:04 -0400 Subject: [PATCH 11/31] add `ID` and `Addr` as part of `RPCHeader` and do not fill `Candidate` and `Leader` in version > 3 --- commands.go | 3 ++- fuzzy/verifier.go | 8 +++++++- net_transport.go | 16 ++++++++++++---- net_transport_test.go | 21 ++++++++++++--------- raft.go | 42 +++++++++++++++++++++++++++++++++--------- raft_test.go | 15 ++++++--------- replication.go | 13 ++++++++++--- transport_test.go | 8 ++++---- 8 files changed, 86 insertions(+), 40 deletions(-) diff --git a/commands.go b/commands.go index 07ba41212..9aaa55cb1 100644 --- a/commands.go +++ b/commands.go @@ -8,6 +8,8 @@ type RPCHeader struct { // ProtocolVersion is the version of the protocol the sender is // speaking. ProtocolVersion ProtocolVersion + ID []byte + Addr []byte } // WithRPCHeader is an interface that exposes the RPC header. @@ -73,7 +75,6 @@ type RequestVoteRequest struct { Term uint64 Candidate []byte - ID []byte // Used to ensure safety LastLogIndex uint64 LastLogTerm uint64 diff --git a/fuzzy/verifier.go b/fuzzy/verifier.go index 44b5ad5b5..e8b7e9d3a 100644 --- a/fuzzy/verifier.go +++ b/fuzzy/verifier.go @@ -45,7 +45,13 @@ func (v *appendEntriesVerifier) PreRequestVote(src, target string, rv *raft.Requ func (v *appendEntriesVerifier) PreAppendEntries(src, target string, req *raft.AppendEntriesRequest) (*raft.AppendEntriesResponse, error) { term := req.Term - ldr := string(req.Leader) + var ldr string + if req.ProtocolVersion > 3 { + ldr = string(req.Addr) + } else { + ldr = string(req.Leader) + } + if ldr != src { v.Lock() defer v.Unlock() diff --git a/net_transport.go b/net_transport.go index b36fe8b11..f359c4f9e 100644 --- a/net_transport.go +++ b/net_transport.go @@ -578,10 +578,18 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en rpc.Command = &req // Check if this is a heartbeat - if req.Term != 0 && req.Leader != nil && - req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && - len(req.Entries) == 0 && req.LeaderCommitIndex == 0 { - isHeartbeat = true + if req.ProtocolVersion > 3 { + if req.Term != 0 && req.Addr != nil && + req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && + len(req.Entries) == 0 && req.LeaderCommitIndex == 0 { + isHeartbeat = true + } + } else { + if req.Term != 0 && req.Leader != nil && + req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && + len(req.Entries) == 0 && req.LeaderCommitIndex == 0 { + isHeartbeat = true + } } case rpcRequestVote: diff --git a/net_transport_test.go b/net_transport_test.go index d42d469db..32e1ba749 100644 --- a/net_transport_test.go +++ b/net_transport_test.go @@ -35,7 +35,6 @@ func TestNetworkTransport_CloseStreams(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ Term: 10, - Leader: []byte("cartman"), PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ @@ -47,6 +46,8 @@ func TestNetworkTransport_CloseStreams(t *testing.T) { }, LeaderCommitIndex: 90, } + args.Addr = []byte("cartman") + resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -138,9 +139,11 @@ func TestNetworkTransport_Heartbeat_FastPath(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ - Term: 10, - Leader: []byte("cartman"), + Term: 10, } + args.ProtocolVersion = ProtocolVersionMax + args.Addr = []byte("cartman") + args.Leader = []byte("cartman") resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -197,7 +200,6 @@ func TestNetworkTransport_AppendEntries(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ Term: 10, - Leader: []byte("cartman"), PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ @@ -209,6 +211,7 @@ func TestNetworkTransport_AppendEntries(t *testing.T) { }, LeaderCommitIndex: 90, } + args.Addr = []byte("cartman") resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -267,7 +270,6 @@ func TestNetworkTransport_AppendEntriesPipeline(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ Term: 10, - Leader: []byte("cartman"), PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ @@ -279,6 +281,7 @@ func TestNetworkTransport_AppendEntriesPipeline(t *testing.T) { }, LeaderCommitIndex: 90, } + args.Addr = []byte("cartman") resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -352,7 +355,6 @@ func TestNetworkTransport_AppendEntriesPipeline_CloseStreams(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ Term: 10, - Leader: []byte("cartman"), PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ @@ -364,6 +366,7 @@ func TestNetworkTransport_AppendEntriesPipeline_CloseStreams(t *testing.T) { }, LeaderCommitIndex: 90, } + args.Addr = []byte("cartman") resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -462,10 +465,10 @@ func TestNetworkTransport_RequestVote(t *testing.T) { // Make the RPC request args := RequestVoteRequest{ Term: 20, - Candidate: []byte("butters"), LastLogIndex: 100, LastLogTerm: 19, } + args.Addr = []byte("butters") resp := RequestVoteResponse{ Term: 100, Granted: false, @@ -523,12 +526,12 @@ func TestNetworkTransport_InstallSnapshot(t *testing.T) { // Make the RPC request args := InstallSnapshotRequest{ Term: 10, - Leader: []byte("kyle"), LastLogIndex: 100, LastLogTerm: 9, Peers: []byte("blah blah"), Size: 10, } + args.Addr = []byte("kyle") resp := InstallSnapshotResponse{ Term: 10, Success: true, @@ -631,7 +634,6 @@ func TestNetworkTransport_PooledConn(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ Term: 10, - Leader: []byte("cartman"), PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ @@ -643,6 +645,7 @@ func TestNetworkTransport_PooledConn(t *testing.T) { }, LeaderCommitIndex: 90, } + args.Addr = []byte("cartman") resp := AppendEntriesResponse{ Term: 4, LastLog: 90, diff --git a/raft.go b/raft.go index 048959915..b1d46b108 100644 --- a/raft.go +++ b/raft.go @@ -31,6 +31,8 @@ var ( func (r *Raft) getRPCHeader() RPCHeader { return RPCHeader{ ProtocolVersion: r.config().ProtocolVersion, + ID: r.trans.EncodeID(r.config().LocalID), + Addr: r.trans.EncodePeer(r.config().LocalID, r.localAddr), } } @@ -1320,8 +1322,11 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { } // Save the current leader - r.setLeader(r.trans.DecodePeer(a.Leader)) - + if a.ProtocolVersion > 3 { + r.setLeader(r.trans.DecodePeer(a.Addr)) + } else { + r.setLeader(r.trans.DecodePeer(a.Leader)) + } // Verify the last log entry if a.PrevLogEntry > 0 { lastIdx, lastTerm := r.getLastEntry() @@ -1479,7 +1484,15 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // check the LeadershipTransfer flag is set. Usually votes are rejected if // there is a known leader. But if the leader initiated a leadership transfer, // vote! - candidate := r.trans.DecodePeer(req.Candidate) + var candidate ServerAddress + var candidateBytes []byte + if req.ProtocolVersion > 3 { + candidate = r.trans.DecodePeer(req.Addr) + candidateBytes = req.Addr + } else { + candidate = r.trans.DecodePeer(req.Candidate) + candidateBytes = req.Candidate + } // Prior to version 4 the peer ID is not part of `RequestVoteRequest`, // We assume that the peer is part of the configuration and skip this check @@ -1529,7 +1542,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // Check if we've voted in this election before if lastVoteTerm == req.Term && lastVoteCandBytes != nil { r.logger.Info("duplicate requestVote for same term", "term", req.Term) - if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 { + if bytes.Compare(lastVoteCandBytes, candidateBytes) == 0 { r.logger.Warn("duplicate requestVote from", "candidate", candidate) resp.Granted = true } @@ -1555,7 +1568,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { } // Persist a vote for safety - if err := r.persistVote(req.Term, req.Candidate); err != nil { + if err := r.persistVote(req.Term, candidateBytes); err != nil { r.logger.Error("failed to persist vote", "error", err) return } @@ -1606,7 +1619,11 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { } // Save the current leader - r.setLeader(r.trans.DecodePeer(req.Leader)) + if req.ProtocolVersion > 3 { + r.setLeader(r.trans.DecodePeer(req.Addr)) + } else { + r.setLeader(r.trans.DecodePeer(req.Leader)) + } // Create a new snapshot var reqConfiguration Configuration @@ -1724,12 +1741,13 @@ func (r *Raft) electSelf() <-chan *voteResult { req := &RequestVoteRequest{ RPCHeader: r.getRPCHeader(), Term: r.getCurrentTerm(), - Candidate: r.trans.EncodePeer(r.localID, r.localAddr), - ID: r.trans.EncodeID(r.localID), LastLogIndex: lastIdx, LastLogTerm: lastTerm, LeadershipTransfer: r.candidateFromLeadershipTransfer, } + if req.ProtocolVersion <= 3 { + req.Candidate = r.trans.EncodePeer(r.localID, r.localAddr) + } // Construct a function to ask for a vote askPeer := func(peer Server) { @@ -1753,7 +1771,13 @@ func (r *Raft) electSelf() <-chan *voteResult { if server.Suffrage == Voter { if server.ID == r.localID { // Persist a vote for ourselves - if err := r.persistVote(req.Term, req.Candidate); err != nil { + var candidate []byte + if req.ProtocolVersion > 3 { + candidate = req.Addr + } else { + candidate = req.Candidate + } + if err := r.persistVote(req.Term, candidate); err != nil { r.logger.Error("failed to persist vote", "error", err) return nil } diff --git a/raft_test.go b/raft_test.go index 842414f97..2fa0a88be 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1739,8 +1739,6 @@ func TestRaft_Voting(t *testing.T) { reqVote := RequestVoteRequest{ RPCHeader: ldr.getRPCHeader(), Term: ldr.getCurrentTerm() + 10, - Candidate: ldrT.EncodePeer(ldr.localID, ldr.localAddr), - ID: ldrT.EncodeID(ldr.localID), LastLogIndex: ldr.LastIndex(), LastLogTerm: ldr.getCurrentTerm(), LeadershipTransfer: false, @@ -1754,7 +1752,7 @@ func TestRaft_Voting(t *testing.T) { t.Fatalf("expected vote to be granted, but wasn't %+v", resp) } // a follower that thinks there's a leader shouldn't vote for a different candidate - reqVote.Candidate = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) + reqVote.Addr = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) if err := ldrT.RequestVote(followers[1].localID, followers[1].localAddr, &reqVote, &resp); err != nil { t.Fatalf("RequestVote RPC failed %v", err) } @@ -1764,7 +1762,7 @@ func TestRaft_Voting(t *testing.T) { // a follower that thinks there's a leader, but the request has the leadership transfer flag, should // vote for a different candidate reqVote.LeadershipTransfer = true - reqVote.Candidate = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) + reqVote.Addr = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) if err := ldrT.RequestVote(followers[1].localID, followers[1].localAddr, &reqVote, &resp); err != nil { t.Fatalf("RequestVote RPC failed %v", err) } @@ -1785,9 +1783,8 @@ func TestRaft_Voting_portocolVersion3(t *testing.T) { reqVote := RequestVoteRequest{ RPCHeader: ldr.getRPCHeader(), Term: ldr.getCurrentTerm() + 10, - Candidate: ldrT.EncodePeer(ldr.localID, ldr.localAddr), - ID: ldrT.EncodeID(ldr.localID), LastLogIndex: ldr.LastIndex(), + Candidate: ldrT.EncodePeer(ldr.localID, ldr.localAddr), LastLogTerm: ldr.getCurrentTerm(), LeadershipTransfer: false, } @@ -1800,6 +1797,7 @@ func TestRaft_Voting_portocolVersion3(t *testing.T) { t.Fatalf("expected vote to be granted, but wasn't %+v", resp) } // a follower that thinks there's a leader shouldn't vote for a different candidate + reqVote.Addr = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) reqVote.Candidate = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) if err := ldrT.RequestVote(followers[1].localID, followers[1].localAddr, &reqVote, &resp); err != nil { t.Fatalf("RequestVote RPC failed %v", err) @@ -1810,6 +1808,7 @@ func TestRaft_Voting_portocolVersion3(t *testing.T) { // a follower that thinks there's a leader, but the request has the leadership transfer flag, should // vote for a different candidate reqVote.LeadershipTransfer = true + reqVote.Addr = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) reqVote.Candidate = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) if err := ldrT.RequestVote(followers[1].localID, followers[1].localAddr, &reqVote, &resp); err != nil { t.Fatalf("RequestVote RPC failed %v", err) @@ -1829,9 +1828,9 @@ func TestRaft_ProtocolVersion_RejectRPC(t *testing.T) { reqVote := RequestVoteRequest{ RPCHeader: RPCHeader{ ProtocolVersion: ProtocolVersionMax + 1, + Addr: ldrT.EncodePeer(ldr.localID, ldr.localAddr), }, Term: ldr.getCurrentTerm() + 10, - Candidate: ldrT.EncodePeer(ldr.localID, ldr.localAddr), LastLogIndex: ldr.LastIndex(), LastLogTerm: ldr.getCurrentTerm(), } @@ -2440,8 +2439,6 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { reqVote := RequestVoteRequest{ RPCHeader: followerRemoved.getRPCHeader(), Term: followerRemoved.getCurrentTerm() + 10, - Candidate: followerRemovedT.EncodePeer(followerRemoved.localID, followerRemoved.localAddr), - ID: followerRemovedT.EncodeID(followerRemoved.localID), LastLogIndex: followerRemoved.LastIndex(), LastLogTerm: followerRemoved.getCurrentTerm(), LeadershipTransfer: false, diff --git a/replication.go b/replication.go index f5e81924b..8222baa89 100644 --- a/replication.go +++ b/replication.go @@ -320,7 +320,6 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { RPCHeader: r.getRPCHeader(), SnapshotVersion: meta.Version, Term: s.currentTerm, - Leader: r.trans.EncodePeer(r.localID, r.localAddr), LastLogIndex: meta.Index, LastLogTerm: meta.Term, Peers: meta.Peers, @@ -328,6 +327,9 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { Configuration: EncodeConfiguration(meta.Configuration), ConfigurationIndex: meta.ConfigurationIndex, } + if req.ProtocolVersion <= 3 { + req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) + } s.peerLock.RLock() peer := s.peer @@ -381,7 +383,9 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { req := AppendEntriesRequest{ RPCHeader: r.getRPCHeader(), Term: s.currentTerm, - Leader: r.trans.EncodePeer(r.localID, r.localAddr), + } + if req.ProtocolVersion <= 3 { + req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) } var resp AppendEntriesResponse for { @@ -552,7 +556,10 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh, func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequest, nextIndex, lastIndex uint64) error { req.RPCHeader = r.getRPCHeader() req.Term = s.currentTerm - req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) + if req.ProtocolVersion <= 3 { + req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) + } + req.LeaderCommitIndex = r.getCommitIndex() if err := r.setPreviousLog(req, nextIndex); err != nil { return err diff --git a/transport_test.go b/transport_test.go index 5a59253df..17213337d 100644 --- a/transport_test.go +++ b/transport_test.go @@ -41,7 +41,6 @@ func TestTransport_AppendEntries(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ Term: 10, - Leader: []byte("cartman"), PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ @@ -53,6 +52,7 @@ func TestTransport_AppendEntries(t *testing.T) { }, LeaderCommitIndex: 90, } + args.Addr = []byte("cartman") resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -104,7 +104,6 @@ func TestTransport_AppendEntriesPipeline(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ Term: 10, - Leader: []byte("cartman"), PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ @@ -116,6 +115,7 @@ func TestTransport_AppendEntriesPipeline(t *testing.T) { }, LeaderCommitIndex: 90, } + args.Addr = []byte("cartman") resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -185,10 +185,10 @@ func TestTransport_RequestVote(t *testing.T) { // Make the RPC request args := RequestVoteRequest{ Term: 20, - Candidate: []byte("butters"), LastLogIndex: 100, LastLogTerm: 19, } + args.Addr = []byte("butters") resp := RequestVoteResponse{ Term: 100, Granted: false, @@ -240,12 +240,12 @@ func TestTransport_InstallSnapshot(t *testing.T) { // Make the RPC request args := InstallSnapshotRequest{ Term: 10, - Leader: []byte("kyle"), LastLogIndex: 100, LastLogTerm: 9, Peers: []byte("blah blah"), Size: 10, } + args.Addr = []byte("kyle") resp := InstallSnapshotResponse{ Term: 10, Success: true, From 512d3386d25ffdb0bc3c3039fdf45b0a88c463da Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Thu, 7 Oct 2021 11:22:40 -0400 Subject: [PATCH 12/31] return `LeaderID` as part of the `Leader()` api --- api.go | 13 ++++++++----- observer.go | 5 +++-- raft.go | 45 ++++++++++++++++++++++++--------------------- raft_test.go | 6 +++--- testing.go | 13 +++++++------ 5 files changed, 45 insertions(+), 37 deletions(-) diff --git a/api.go b/api.go index 9152cf620..6440471b8 100644 --- a/api.go +++ b/api.go @@ -111,8 +111,10 @@ type Raft struct { lastContact time.Time lastContactLock sync.RWMutex - // Leader is the current cluster leader - leader ServerAddress + // leaderAddr is the current cluster leader Address + leaderAddr ServerAddress + // LeaderID is the current cluster leader ID + leaderID ServerID leaderLock sync.RWMutex // leaderCh is used to notify of leadership changes @@ -716,11 +718,12 @@ func (r *Raft) BootstrapCluster(configuration Configuration) Future { // Leader is used to return the current leader of the cluster. // It may return empty string if there is no current leader // or the leader is unknown. -func (r *Raft) Leader() ServerAddress { +func (r *Raft) Leader() (ServerAddress, ServerID) { r.leaderLock.RLock() - leader := r.leader + leaderAddr := r.leaderAddr + leaderID := r.leaderID r.leaderLock.RUnlock() - return leader + return leaderAddr, leaderID } // Apply is used to apply a command to the FSM in a highly consistent diff --git a/observer.go b/observer.go index 29f2d5802..57b3930e2 100644 --- a/observer.go +++ b/observer.go @@ -19,7 +19,8 @@ type Observation struct { // LeaderObservation is used for the data when leadership changes. type LeaderObservation struct { - Leader ServerAddress + LeaderAddr ServerAddress + LeaderID ServerID } // PeerObservation is sent to observers when peers change. @@ -28,7 +29,7 @@ type PeerObservation struct { Peer Server } -// FailedHeartbeatObservation is sent when a node fails to heartbeat with the leader +// FailedHeartbeatObservation is sent when a node fails to heartbeat with the leader Address and ID type FailedHeartbeatObservation struct { PeerID ServerID LastContact time.Time diff --git a/raft.go b/raft.go index b1d46b108..11bf6afd6 100644 --- a/raft.go +++ b/raft.go @@ -92,14 +92,16 @@ type leaderState struct { stepDown chan struct{} } -// setLeader is used to modify the current leader of the cluster -func (r *Raft) setLeader(leader ServerAddress) { +// setLeader is used to modify the current leader Address and ID of the cluster +func (r *Raft) setLeader(leaderAddr ServerAddress, leaderID ServerID) { r.leaderLock.Lock() - oldLeader := r.leader - r.leader = leader + oldLeaderAddr := r.leaderAddr + r.leaderAddr = leaderAddr + oldLeaderID := r.leaderID + r.leaderID = leaderID r.leaderLock.Unlock() - if oldLeader != leader { - r.observe(LeaderObservation{Leader: leader}) + if oldLeaderAddr != leaderAddr || oldLeaderID != leaderID { + r.observe(LeaderObservation{LeaderAddr: leaderAddr, LeaderID: leaderID}) } } @@ -132,7 +134,7 @@ func (r *Raft) run() { select { case <-r.shutdownCh: // Clear the leader to prevent forwarding - r.setLeader("") + r.setLeader("", "") return default: } @@ -152,7 +154,8 @@ func (r *Raft) run() { // runFollower runs the FSM for a follower. func (r *Raft) runFollower() { didWarn := false - r.logger.Info("entering follower state", "follower", r, "leader", r.Leader()) + leaderAddr, _ := r.Leader() + r.logger.Info("entering follower state", "follower", r, "leader", leaderAddr) metrics.IncrCounter([]string{"raft", "state", "follower"}, 1) heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout) @@ -200,8 +203,8 @@ func (r *Raft) runFollower() { } // Heartbeat failed! Transition to the candidate state - lastLeader := r.Leader() - r.setLeader("") + lastLeader, _ := r.Leader() + r.setLeader("", "") if r.configurations.latestIndex == 0 { if !didWarn { @@ -302,7 +305,7 @@ func (r *Raft) runCandidate() { if grantedVotes >= votesNeeded { r.logger.Info("election won", "tally", grantedVotes) r.setState(Leader) - r.setLeader(r.localAddr) + r.setLeader(r.localAddr, r.localID) return } @@ -437,8 +440,8 @@ func (r *Raft) runLeader() { // We may have stepped down due to an RPC call, which would // provide the leader, so we cannot always blank this out. r.leaderLock.Lock() - if r.leader == r.localAddr { - r.leader = "" + if r.leaderAddr == r.localAddr { + r.leaderAddr = "" } r.leaderLock.Unlock() @@ -1323,9 +1326,9 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { // Save the current leader if a.ProtocolVersion > 3 { - r.setLeader(r.trans.DecodePeer(a.Addr)) + r.setLeader(r.trans.DecodePeer(a.Addr), r.trans.DecodeID(a.ID)) } else { - r.setLeader(r.trans.DecodePeer(a.Leader)) + r.setLeader(r.trans.DecodePeer(a.Leader), "") } // Verify the last log entry if a.PrevLogEntry > 0 { @@ -1506,10 +1509,10 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { return } } - if leader := r.Leader(); leader != "" && leader != candidate && !req.LeadershipTransfer { + if leaderAddr, _ := r.Leader(); leaderAddr != "" && leaderAddr != candidate && !req.LeadershipTransfer { r.logger.Warn("rejecting vote request since we have a leader", "from", candidate, - "leader", leader) + "leaderAddr", leaderAddr) return } @@ -1620,9 +1623,9 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { // Save the current leader if req.ProtocolVersion > 3 { - r.setLeader(r.trans.DecodePeer(req.Addr)) + r.setLeader(r.trans.DecodePeer(req.Addr), r.trans.DecodeID(req.ID)) } else { - r.setLeader(r.trans.DecodePeer(req.Leader)) + r.setLeader(r.trans.DecodePeer(req.Leader), "") } // Create a new snapshot @@ -1823,7 +1826,7 @@ func (r *Raft) setCurrentTerm(t uint64) { // transition causes the known leader to be cleared. This means // that leader should be set only after updating the state. func (r *Raft) setState(state RaftState) { - r.setLeader("") + r.setLeader("", "") oldState := r.raftState.getState() r.raftState.setState(state) if oldState != state { @@ -1880,7 +1883,7 @@ func (r *Raft) initiateLeadershipTransfer(id *ServerID, address *ServerAddress) // timeoutNow is what happens when a server receives a TimeoutNowRequest. func (r *Raft) timeoutNow(rpc RPC, req *TimeoutNowRequest) { - r.setLeader("") + r.setLeader("", "") r.setState(Candidate) r.candidateFromLeadershipTransfer = true rpc.Respond(&TimeoutNowResponse{}, nil) diff --git a/raft_test.go b/raft_test.go index 2fa0a88be..d725d385f 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1554,10 +1554,10 @@ LOOP: } // Ensure both have cleared their leader - if l := leader.Leader(); l != "" { + if l, id := leader.Leader(); l != "" && id != "" { t.Fatalf("bad: %v", l) } - if l := follower.Leader(); l != "" { + if l, id := follower.Leader(); l != "" && id != "" { t.Fatalf("bad: %v", l) } } @@ -1659,7 +1659,7 @@ func TestRaft_VerifyLeader_Fail(t *testing.T) { } // Ensure the known leader is cleared - if l := leader.Leader(); l != "" { + if l, _ := leader.Leader(); l != "" { t.Fatalf("bad: %v", l) } } diff --git a/testing.go b/testing.go index 1dd61b94c..22026b297 100644 --- a/testing.go +++ b/testing.go @@ -529,15 +529,16 @@ func (c *cluster) EnsureLeader(t *testing.T, expect ServerAddress) { // think the leader is correct fail := false for _, r := range c.rafts { - leader := ServerAddress(r.Leader()) - if leader != expect { - if leader == "" { - leader = "[none]" + leaderAddr, _ := r.Leader() + + if leaderAddr != expect { + if leaderAddr == "" { + leaderAddr = "[none]" } if expect == "" { - c.logger.Error("peer sees incorrect leader", "peer", r, "leader", leader, "expected-leader", "[none]") + c.logger.Error("peer sees incorrect leader", "peer", r, "leader", leaderAddr, "expected-leader", "[none]") } else { - c.logger.Error("peer sees incorrect leader", "peer", r, "leader", leader, "expected-leader", expect) + c.logger.Error("peer sees incorrect leader", "peer", r, "leader", leaderAddr, "expected-leader", expect) } fail = true } From 9e1e9c2c0dfbe3b1a65be779121ef19682e7f045 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Mon, 25 Oct 2021 22:04:13 -0400 Subject: [PATCH 13/31] fix docstring --- transport.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transport.go b/transport.go index 3b9fbd67d..ef658dfb3 100644 --- a/transport.go +++ b/transport.go @@ -53,10 +53,10 @@ type Transport interface { // DecodePeer is used to deserialize a peer's address. DecodePeer([]byte) ServerAddress - // EncodePeer is used to serialize a peer's address. + // EncodeID is used to serialize a server ID. EncodeID(id ServerID) []byte - // DecodePeer is used to deserialize a peer's address. + // DecodeID is used to deserialize a server ID. DecodeID([]byte) ServerID // SetHeartbeatHandler is used to setup a heartbeat handler From 9e751dbe77e38aa81374ac4801cf657cf30e07d2 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 26 Oct 2021 12:32:40 -0400 Subject: [PATCH 14/31] fix retro compatibility with version 3 --- raft.go | 14 +++++++------- replication.go | 17 ++++++++--------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/raft.go b/raft.go index 11bf6afd6..ab8500e22 100644 --- a/raft.go +++ b/raft.go @@ -154,8 +154,8 @@ func (r *Raft) run() { // runFollower runs the FSM for a follower. func (r *Raft) runFollower() { didWarn := false - leaderAddr, _ := r.Leader() - r.logger.Info("entering follower state", "follower", r, "leader", leaderAddr) + leaderAddr, leaderID := r.Leader() + r.logger.Info("entering follower state", "follower", r, "leader (", leaderAddr, leaderID, ")") metrics.IncrCounter([]string{"raft", "state", "follower"}, 1) heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout) @@ -1509,10 +1509,10 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { return } } - if leaderAddr, _ := r.Leader(); leaderAddr != "" && leaderAddr != candidate && !req.LeadershipTransfer { + if leaderAddr, leaderID := r.Leader(); leaderAddr != "" && leaderAddr != candidate && !req.LeadershipTransfer { r.logger.Warn("rejecting vote request since we have a leader", "from", candidate, - "leaderAddr", leaderAddr) + "leader (", leaderAddr, leaderID, ")") return } @@ -1748,9 +1748,9 @@ func (r *Raft) electSelf() <-chan *voteResult { LastLogTerm: lastTerm, LeadershipTransfer: r.candidateFromLeadershipTransfer, } - if req.ProtocolVersion <= 3 { - req.Candidate = r.trans.EncodePeer(r.localID, r.localAddr) - } + + // this is needed for retro compatibility with protocolVersion = 3 + req.Candidate = r.trans.EncodePeer(r.localID, r.localAddr) // Construct a function to ask for a vote askPeer := func(peer Server) { diff --git a/replication.go b/replication.go index 8222baa89..34db561bb 100644 --- a/replication.go +++ b/replication.go @@ -327,9 +327,8 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { Configuration: EncodeConfiguration(meta.Configuration), ConfigurationIndex: meta.ConfigurationIndex, } - if req.ProtocolVersion <= 3 { - req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) - } + // this is needed for retro compatibility with protocolVersion = 3 + req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) s.peerLock.RLock() peer := s.peer @@ -384,9 +383,10 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { RPCHeader: r.getRPCHeader(), Term: s.currentTerm, } - if req.ProtocolVersion <= 3 { - req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) - } + + // this is needed for retro compatibility with protocolVersion = 3 + req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) + var resp AppendEntriesResponse for { // Wait for the next heartbeat interval or forced notify @@ -556,9 +556,8 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh, func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequest, nextIndex, lastIndex uint64) error { req.RPCHeader = r.getRPCHeader() req.Term = s.currentTerm - if req.ProtocolVersion <= 3 { - req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) - } + // this is needed for retro compatibility with protocolVersion = 3 + req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) req.LeaderCommitIndex = r.getCommitIndex() if err := r.setPreviousLog(req, nextIndex); err != nil { From 8074fe51dbf83f5a075ce195d230cb75862e869c Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 26 Oct 2021 12:43:31 -0400 Subject: [PATCH 15/31] fix string casting --- raft.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/raft.go b/raft.go index ab8500e22..1e1a56713 100644 --- a/raft.go +++ b/raft.go @@ -155,7 +155,7 @@ func (r *Raft) run() { func (r *Raft) runFollower() { didWarn := false leaderAddr, leaderID := r.Leader() - r.logger.Info("entering follower state", "follower", r, "leader (", leaderAddr, leaderID, ")") + r.logger.Info("entering follower state", "follower", r, "leader (", leaderAddr, string(leaderID), ")") metrics.IncrCounter([]string{"raft", "state", "follower"}, 1) heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout) @@ -1512,7 +1512,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { if leaderAddr, leaderID := r.Leader(); leaderAddr != "" && leaderAddr != candidate && !req.LeadershipTransfer { r.logger.Warn("rejecting vote request since we have a leader", "from", candidate, - "leader (", leaderAddr, leaderID, ")") + "leader (", leaderAddr, string(leaderID), ")") return } From 2768524cc418a0403a9b34676a463fe79ffc9e2f Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 27 Oct 2021 20:50:39 -0400 Subject: [PATCH 16/31] remove `EncodeID` and `DecodeID` from `transport` interface --- inmem_transport.go | 10 ---------- net_transport.go | 10 ---------- raft.go | 8 ++++---- transport.go | 6 ------ 4 files changed, 4 insertions(+), 30 deletions(-) diff --git a/inmem_transport.go b/inmem_transport.go index 7613cb630..b5bdecc73 100644 --- a/inmem_transport.go +++ b/inmem_transport.go @@ -194,16 +194,6 @@ func (i *InmemTransport) DecodePeer(buf []byte) ServerAddress { return ServerAddress(buf) } -// EncodePeer implements the Transport interface. -func (i *InmemTransport) EncodeID(id ServerID) []byte { - return []byte(id) -} - -// DecodePeer implements the Transport interface. -func (i *InmemTransport) DecodeID(buf []byte) ServerID { - return ServerID(buf) -} - // Connect is used to connect this transport to another transport for // a given peer name. This allows for local routing. func (i *InmemTransport) Connect(peer ServerAddress, t Transport) { diff --git a/net_transport.go b/net_transport.go index f359c4f9e..02cd2759d 100644 --- a/net_transport.go +++ b/net_transport.go @@ -466,16 +466,6 @@ func (n *NetworkTransport) DecodePeer(buf []byte) ServerAddress { return ServerAddress(buf) } -// EncodeID implements the Transport interface. -func (n *NetworkTransport) EncodeID(id ServerID) []byte { - return []byte(id) -} - -// DecodeID implements the Transport interface. -func (n *NetworkTransport) DecodeID(buf []byte) ServerID { - return ServerID(buf) -} - // TimeoutNow implements the Transport interface. func (n *NetworkTransport) TimeoutNow(id ServerID, target ServerAddress, args *TimeoutNowRequest, resp *TimeoutNowResponse) error { return n.genericRPC(id, target, rpcTimeoutNow, args, resp) diff --git a/raft.go b/raft.go index 1e1a56713..f784a09c9 100644 --- a/raft.go +++ b/raft.go @@ -31,7 +31,7 @@ var ( func (r *Raft) getRPCHeader() RPCHeader { return RPCHeader{ ProtocolVersion: r.config().ProtocolVersion, - ID: r.trans.EncodeID(r.config().LocalID), + ID: []byte(r.config().LocalID), Addr: r.trans.EncodePeer(r.config().LocalID, r.localAddr), } } @@ -1326,7 +1326,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { // Save the current leader if a.ProtocolVersion > 3 { - r.setLeader(r.trans.DecodePeer(a.Addr), r.trans.DecodeID(a.ID)) + r.setLeader(r.trans.DecodePeer(a.Addr), ServerID(a.ID)) } else { r.setLeader(r.trans.DecodePeer(a.Leader), "") } @@ -1500,7 +1500,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // Prior to version 4 the peer ID is not part of `RequestVoteRequest`, // We assume that the peer is part of the configuration and skip this check if req.ProtocolVersion > 3 { - candidateID := r.trans.DecodeID(req.ID) + candidateID := ServerID(req.ID) // if the Servers list is empty that mean the cluster is very likely trying to bootstrap, // Grant the vote if len(r.configurations.latest.Servers) > 0 && !inConfig(r.configurations.latest, candidateID) { @@ -1623,7 +1623,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { // Save the current leader if req.ProtocolVersion > 3 { - r.setLeader(r.trans.DecodePeer(req.Addr), r.trans.DecodeID(req.ID)) + r.setLeader(r.trans.DecodePeer(req.Addr), ServerID(req.ID)) } else { r.setLeader(r.trans.DecodePeer(req.Leader), "") } diff --git a/transport.go b/transport.go index ef658dfb3..b18d24593 100644 --- a/transport.go +++ b/transport.go @@ -53,12 +53,6 @@ type Transport interface { // DecodePeer is used to deserialize a peer's address. DecodePeer([]byte) ServerAddress - // EncodeID is used to serialize a server ID. - EncodeID(id ServerID) []byte - - // DecodeID is used to deserialize a server ID. - DecodeID([]byte) ServerID - // SetHeartbeatHandler is used to setup a heartbeat handler // as a fast-pass. This is to avoid head-of-line blocking from // disk IO. If a Transport does not support this, it can simply From 6ea0aa402cb62db355bf49e181223e16772f9aec Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 27 Oct 2021 22:34:58 -0400 Subject: [PATCH 17/31] add missing `leaderID` initialization --- raft.go | 1 + 1 file changed, 1 insertion(+) diff --git a/raft.go b/raft.go index f784a09c9..66c0f701b 100644 --- a/raft.go +++ b/raft.go @@ -442,6 +442,7 @@ func (r *Raft) runLeader() { r.leaderLock.Lock() if r.leaderAddr == r.localAddr { r.leaderAddr = "" + r.leaderID = "" } r.leaderLock.Unlock() From ecd60c275b9aae41e05580a8c26ad3aac7717014 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 27 Oct 2021 22:35:30 -0400 Subject: [PATCH 18/31] add protocol version 3 to version 4 upgrade test --- raft_test.go | 122 +++++++++++++++++++++++++++++++++++++++++++++++++-- testing.go | 9 ++++ 2 files changed, 128 insertions(+), 3 deletions(-) diff --git a/raft_test.go b/raft_test.go index d725d385f..73e16b8e7 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2,6 +2,7 @@ package raft import ( "bytes" + "errors" "fmt" "io/ioutil" "os" @@ -1927,6 +1928,95 @@ func TestRaft_ProtocolVersion_Upgrade_2_3(t *testing.T) { } } +func TestRaft_ProtocolVersion_Upgrade_3_4(t *testing.T) { + // Make a cluster back on protocol version 2. + conf := inmemConfig(t) + conf.ProtocolVersion = 3 + c := MakeCluster(2, t, conf) + defer c.Close() + oldFollower := c.Followers()[0] + + // Set up another server speaking protocol version 3. + conf = inmemConfig(t) + conf.ProtocolVersion = 4 + c1 := MakeClusterNoBootstrap(1, t, conf) + newFollower := c1.rafts[0] + + // Merge clusters. + c.Merge(c1) + c.FullyConnect() + + // Use the new ID-based API to add the server with its ID. + future := c.Leader().AddVoter(c1.rafts[0].localID, c1.rafts[0].localAddr, 0, 1*time.Second) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Sanity check the cluster. + c.EnsureSame(t) + c.EnsureSamePeers(t) + c.EnsureLeader(t, c.Leader().localAddr) + + addr, id := oldFollower.Leader() + require.Empty(t, id) + require.NotEmpty(t, addr) + + addr, id = newFollower.Leader() + require.Empty(t, id) + require.NotEmpty(t, addr) + //replace all protocol 3 nodes with protocol 4 (rolling upgrade) + followers := c.Followers() + for _, n := range followers { + if n.protocolVersion == 3 { + conf = inmemConfig(t) + conf.ProtocolVersion = 4 + c1 := MakeClusterNoBootstrap(1, t, conf) + // Merge clusters. + c.Merge(c1) + c.FullyConnect() + future := c.Leader().AddVoter(c1.rafts[0].localID, c1.rafts[0].localAddr, 0, 1*time.Second) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + c.Leader().RemoveServer(id, 0, 300*time.Millisecond) + c.RemoveServer(n.localID) + } + } + err := waitForLeader(c) + require.NoError(t, err) + v3Leader := c.Leader() + { + c1 := MakeClusterNoBootstrap(1, t, conf) + // Merge clusters. + c.Merge(c1) + c.FullyConnect() + future := c.Leader().AddVoter(c1.rafts[0].localID, c1.rafts[0].localAddr, 0, 1*time.Second) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + } + v3Leader.RemoveServer(v3Leader.leaderID, 0, 0) + c.RemoveServer(v3Leader.localID) + err = waitForNewLeader(c, v3Leader.localID) + require.NoError(t, err) + // Commit some logs + + for i := 0; i < 5; i++ { + future := c.Leader().Apply([]byte(fmt.Sprintf("test%d", i)), 0) + if err := future.Error(); err != nil { + t.Fatalf("[ERR] err: %v", err) + } + } + + for _, n := range c.rafts { + require.Equal(t, ProtocolVersion(4), n.protocolVersion) + addr, id = n.Leader() + require.NotEmpty(t, id) + require.NotEmpty(t, addr) + } + +} + func TestRaft_LeadershipTransferInProgress(t *testing.T) { r := &Raft{leaderState: leaderState{}} r.setupLeaderState() @@ -2429,7 +2519,7 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { if configuration := c.getConfiguration(followers[1]); len(configuration.Servers) != 2 { t.Fatalf("too many peers") } - waitforState(followerRemoved, Follower) + waitForState(followerRemoved, Follower) // The removed node should be still in Follower state require.Equal(t, Follower, followerRemoved.getState()) @@ -2451,7 +2541,7 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { time.Sleep(c.propagateTimeout) // wait for the remaining follower to trigger an election - waitforState(follower, Candidate) + waitForState(follower, Candidate) require.Equal(t, Candidate, follower.getState()) // send a vote request from the removed follower to the Candidate follower @@ -2465,10 +2555,36 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { } } -func waitforState(follower *Raft, state RaftState) { +func waitForState(follower *Raft, state RaftState) { count := 0 for follower.getState() != state && count < 1000 { count++ time.Sleep(1 * time.Millisecond) } } + +func waitForLeader(c *cluster) error { + count := 0 + for count < 100 { + r := c.GetInState(Leader) + if len(r) >= 1 { + return nil + } + count++ + time.Sleep(50 * time.Millisecond) + } + return errors.New("no leader elected") +} + +func waitForNewLeader(c *cluster, id ServerID) error { + count := 0 + for count < 100 { + r := c.GetInState(Leader) + if len(r) >= 1 && r[0].localID != id { + return nil + } + count++ + time.Sleep(50 * time.Millisecond) + } + return errors.New("no leader elected") +} diff --git a/testing.go b/testing.go index 22026b297..36fad5f5c 100644 --- a/testing.go +++ b/testing.go @@ -203,6 +203,15 @@ func (c *cluster) Merge(other *cluster) { c.rafts = append(c.rafts, other.rafts...) } +func (c *cluster) RemoveServer(id ServerID) { + for i, n := range c.rafts { + if n.localID == id { + c.rafts = append(c.rafts[:i], c.rafts[i+1:]...) + return + } + } +} + // notifyFailed will close the failed channel which can signal the goroutine // running the test that another goroutine has detected a failure in order to // terminate the test. From bfc9e23528b545f9d5fbe8061fa694a7ffe78ba2 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 27 Oct 2021 22:42:20 -0400 Subject: [PATCH 19/31] increase test timeout --- raft_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft_test.go b/raft_test.go index 73e16b8e7..b1b093697 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2584,7 +2584,7 @@ func waitForNewLeader(c *cluster, id ServerID) error { return nil } count++ - time.Sleep(50 * time.Millisecond) + time.Sleep(100 * time.Millisecond) } return errors.New("no leader elected") } From b3507f12ec91d0cfd751ef514e974560fc72463b Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Thu, 28 Oct 2021 10:55:01 -0400 Subject: [PATCH 20/31] split test, and clean code --- observer.go | 2 +- raft.go | 27 ++++------- raft_test.go | 124 ++++++++++++++++++++++++++++--------------------- replication.go | 16 +++---- 4 files changed, 90 insertions(+), 79 deletions(-) diff --git a/observer.go b/observer.go index 57b3930e2..c45e7f632 100644 --- a/observer.go +++ b/observer.go @@ -29,7 +29,7 @@ type PeerObservation struct { Peer Server } -// FailedHeartbeatObservation is sent when a node fails to heartbeat with the leader Address and ID +// FailedHeartbeatObservation is sent when a node fails to heartbeat with the leader type FailedHeartbeatObservation struct { PeerID ServerID LastContact time.Time diff --git a/raft.go b/raft.go index 66c0f701b..68364b5cd 100644 --- a/raft.go +++ b/raft.go @@ -203,7 +203,7 @@ func (r *Raft) runFollower() { } // Heartbeat failed! Transition to the candidate state - lastLeader, _ := r.Leader() + lastLeaderAddr, lastLeaderID := r.Leader() r.setLeader("", "") if r.configurations.latestIndex == 0 { @@ -220,7 +220,7 @@ func (r *Raft) runFollower() { } else { metrics.IncrCounter([]string{"raft", "transition", "heartbeat_timeout"}, 1) if inConfig(r.configurations.latest, r.localID) { - r.logger.Warn("heartbeat timeout reached, starting election", "last-leader", lastLeader) + r.logger.Warn("heartbeat timeout reached, starting election", "last-leader (", lastLeaderAddr, ",", lastLeaderID, ")") r.setState(Candidate) return } else { @@ -440,7 +440,7 @@ func (r *Raft) runLeader() { // We may have stepped down due to an RPC call, which would // provide the leader, so we cannot always blank this out. r.leaderLock.Lock() - if r.leaderAddr == r.localAddr { + if r.leaderAddr == r.localAddr && r.leaderID == r.localID { r.leaderAddr = "" r.leaderID = "" } @@ -1329,7 +1329,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { if a.ProtocolVersion > 3 { r.setLeader(r.trans.DecodePeer(a.Addr), ServerID(a.ID)) } else { - r.setLeader(r.trans.DecodePeer(a.Leader), "") + r.setLeader(r.trans.DecodePeer(a.Leader), ServerID(a.ID)) } // Verify the last log entry if a.PrevLogEntry > 0 { @@ -1626,7 +1626,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { if req.ProtocolVersion > 3 { r.setLeader(r.trans.DecodePeer(req.Addr), ServerID(req.ID)) } else { - r.setLeader(r.trans.DecodePeer(req.Leader), "") + r.setLeader(r.trans.DecodePeer(req.Leader), ServerID(req.ID)) } // Create a new snapshot @@ -1743,16 +1743,15 @@ func (r *Raft) electSelf() <-chan *voteResult { // Construct the request lastIdx, lastTerm := r.getLastEntry() req := &RequestVoteRequest{ - RPCHeader: r.getRPCHeader(), - Term: r.getCurrentTerm(), + RPCHeader: r.getRPCHeader(), + Term: r.getCurrentTerm(), + // this is needed for retro compatibility with protocolVersion = 3 + Candidate: r.trans.EncodePeer(r.localID, r.localAddr), LastLogIndex: lastIdx, LastLogTerm: lastTerm, LeadershipTransfer: r.candidateFromLeadershipTransfer, } - // this is needed for retro compatibility with protocolVersion = 3 - req.Candidate = r.trans.EncodePeer(r.localID, r.localAddr) - // Construct a function to ask for a vote askPeer := func(peer Server) { r.goFunc(func() { @@ -1775,13 +1774,7 @@ func (r *Raft) electSelf() <-chan *voteResult { if server.Suffrage == Voter { if server.ID == r.localID { // Persist a vote for ourselves - var candidate []byte - if req.ProtocolVersion > 3 { - candidate = req.Addr - } else { - candidate = req.Candidate - } - if err := r.persistVote(req.Term, candidate); err != nil { + if err := r.persistVote(req.Term, req.Addr); err != nil { r.logger.Error("failed to persist vote", "error", err) return nil } diff --git a/raft_test.go b/raft_test.go index b1b093697..1d7e5c67e 100644 --- a/raft_test.go +++ b/raft_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io/ioutil" + "log" "os" "path/filepath" "reflect" @@ -1928,93 +1929,106 @@ func TestRaft_ProtocolVersion_Upgrade_2_3(t *testing.T) { } } -func TestRaft_ProtocolVersion_Upgrade_3_4(t *testing.T) { +func TestRaft_LeaderID_Propagated(t *testing.T) { // Make a cluster back on protocol version 2. conf := inmemConfig(t) + c := MakeCluster(3, t, conf) + defer c.Close() + err := waitForLeader(c) + require.NoError(t, err) + + for _, n := range c.rafts { + require.Equal(t, ProtocolVersion(4), n.protocolVersion) + addr, id := n.Leader() + require.NotEmpty(t, id) + require.NotEmpty(t, addr) + } + for i := 0; i < 5; i++ { + future := c.Leader().Apply([]byte(fmt.Sprintf("test%d", i)), 0) + if err := future.Error(); err != nil { + t.Fatalf("[ERR] err: %v", err) + } + } + // Wait a while + time.Sleep(c.propagateTimeout) + + // Sanity check the cluster. + c.EnsureSame(t) + c.EnsureSamePeers(t) + c.EnsureLeader(t, c.Leader().localAddr) +} + +func TestRaft_MixedCluster_Stable_v3Leader(t *testing.T) { + // Make a cluster back on protocol version 1. + conf := inmemConfig(t) conf.ProtocolVersion = 3 c := MakeCluster(2, t, conf) defer c.Close() - oldFollower := c.Followers()[0] - // Set up another server speaking protocol version 3. + // Set up another server speaking protocol version 2. conf = inmemConfig(t) conf.ProtocolVersion = 4 c1 := MakeClusterNoBootstrap(1, t, conf) - newFollower := c1.rafts[0] // Merge clusters. c.Merge(c1) c.FullyConnect() - // Use the new ID-based API to add the server with its ID. future := c.Leader().AddVoter(c1.rafts[0].localID, c1.rafts[0].localAddr, 0, 1*time.Second) if err := future.Error(); err != nil { t.Fatalf("err: %v", err) } + err := waitForLeader(c) + require.NoError(t, err) + for i := 0; i < 5; i++ { + future := c.Leader().Apply([]byte(fmt.Sprintf("test%d", i)), 0) + if err := future.Error(); err != nil { + t.Fatalf("[ERR] err: %v", err) + } + } + // Wait a while + time.Sleep(c.propagateTimeout) // Sanity check the cluster. c.EnsureSame(t) c.EnsureSamePeers(t) c.EnsureLeader(t, c.Leader().localAddr) +} - addr, id := oldFollower.Leader() - require.Empty(t, id) - require.NotEmpty(t, addr) +func TestRaft_MixedCluster_Stable_v4Leader(t *testing.T) { + // Make a cluster back on protocol version 1. + conf := inmemConfig(t) + conf.ProtocolVersion = 4 + c := MakeCluster(2, t, conf) + defer c.Close() - addr, id = newFollower.Leader() - require.Empty(t, id) - require.NotEmpty(t, addr) - //replace all protocol 3 nodes with protocol 4 (rolling upgrade) - followers := c.Followers() - for _, n := range followers { - if n.protocolVersion == 3 { - conf = inmemConfig(t) - conf.ProtocolVersion = 4 - c1 := MakeClusterNoBootstrap(1, t, conf) - // Merge clusters. - c.Merge(c1) - c.FullyConnect() - future := c.Leader().AddVoter(c1.rafts[0].localID, c1.rafts[0].localAddr, 0, 1*time.Second) - if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) - } - c.Leader().RemoveServer(id, 0, 300*time.Millisecond) - c.RemoveServer(n.localID) - } + // Set up another server speaking protocol version 2. + conf = inmemConfig(t) + conf.ProtocolVersion = 3 + c1 := MakeClusterNoBootstrap(1, t, conf) + + // Merge clusters. + c.Merge(c1) + c.FullyConnect() + future := c.Leader().AddVoter(c1.rafts[0].localID, c1.rafts[0].localAddr, 0, 1*time.Second) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) } err := waitForLeader(c) require.NoError(t, err) - v3Leader := c.Leader() - { - c1 := MakeClusterNoBootstrap(1, t, conf) - // Merge clusters. - c.Merge(c1) - c.FullyConnect() - future := c.Leader().AddVoter(c1.rafts[0].localID, c1.rafts[0].localAddr, 0, 1*time.Second) - if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) - } - } - v3Leader.RemoveServer(v3Leader.leaderID, 0, 0) - c.RemoveServer(v3Leader.localID) - err = waitForNewLeader(c, v3Leader.localID) - require.NoError(t, err) - // Commit some logs - for i := 0; i < 5; i++ { future := c.Leader().Apply([]byte(fmt.Sprintf("test%d", i)), 0) if err := future.Error(); err != nil { t.Fatalf("[ERR] err: %v", err) } } + // Wait a while + c.WaitForReplication(5) - for _, n := range c.rafts { - require.Equal(t, ProtocolVersion(4), n.protocolVersion) - addr, id = n.Leader() - require.NotEmpty(t, id) - require.NotEmpty(t, addr) - } - + // Sanity check the cluster. + c.EnsureSame(t) + c.EnsureSamePeers(t) + c.EnsureLeader(t, c.Leader().localAddr) } func TestRaft_LeadershipTransferInProgress(t *testing.T) { @@ -2582,6 +2596,12 @@ func waitForNewLeader(c *cluster, id ServerID) error { r := c.GetInState(Leader) if len(r) >= 1 && r[0].localID != id { return nil + } else { + if len(r) == 0 { + log.Println("no leader yet") + } else { + log.Printf("leader still %s\n", id) + } } count++ time.Sleep(100 * time.Millisecond) diff --git a/replication.go b/replication.go index 34db561bb..6266a8f80 100644 --- a/replication.go +++ b/replication.go @@ -317,9 +317,11 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { // Setup the request req := InstallSnapshotRequest{ - RPCHeader: r.getRPCHeader(), - SnapshotVersion: meta.Version, - Term: s.currentTerm, + RPCHeader: r.getRPCHeader(), + SnapshotVersion: meta.Version, + Term: s.currentTerm, + // this is needed for retro compatibility with protocolVersion = 3 + Leader: r.trans.EncodePeer(r.localID, r.localAddr), LastLogIndex: meta.Index, LastLogTerm: meta.Term, Peers: meta.Peers, @@ -327,8 +329,6 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { Configuration: EncodeConfiguration(meta.Configuration), ConfigurationIndex: meta.ConfigurationIndex, } - // this is needed for retro compatibility with protocolVersion = 3 - req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) s.peerLock.RLock() peer := s.peer @@ -382,11 +382,10 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { req := AppendEntriesRequest{ RPCHeader: r.getRPCHeader(), Term: s.currentTerm, + // this is needed for retro compatibility with protocolVersion = 3 + Leader: r.trans.EncodePeer(r.localID, r.localAddr), } - // this is needed for retro compatibility with protocolVersion = 3 - req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) - var resp AppendEntriesResponse for { // Wait for the next heartbeat interval or forced notify @@ -558,7 +557,6 @@ func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequ req.Term = s.currentTerm // this is needed for retro compatibility with protocolVersion = 3 req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) - req.LeaderCommitIndex = r.getCommitIndex() if err := r.setPreviousLog(req, nextIndex); err != nil { return err From aea65ca1c67fc7374491998d87986bf6cb0715de Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Thu, 28 Oct 2021 11:03:42 -0400 Subject: [PATCH 21/31] add docstrings to `RPCHeader` --- commands.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/commands.go b/commands.go index 9aaa55cb1..835293298 100644 --- a/commands.go +++ b/commands.go @@ -8,8 +8,10 @@ type RPCHeader struct { // ProtocolVersion is the version of the protocol the sender is // speaking. ProtocolVersion ProtocolVersion - ID []byte - Addr []byte + // ID is the ServerID of the node sending the RPC Request or Response + ID []byte + // Addr is the ServerAddr of the node sending the RPC Request or Response + Addr []byte } // WithRPCHeader is an interface that exposes the RPC header. From 4620c719718c7f22d5fb83a62b2841426c7de2f5 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 24 Nov 2021 14:42:27 -0500 Subject: [PATCH 22/31] Apply suggestions from code review Co-authored-by: Matt Keeler --- raft.go | 4 ++-- raft_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/raft.go b/raft.go index 68364b5cd..d1a408f25 100644 --- a/raft.go +++ b/raft.go @@ -155,7 +155,7 @@ func (r *Raft) run() { func (r *Raft) runFollower() { didWarn := false leaderAddr, leaderID := r.Leader() - r.logger.Info("entering follower state", "follower", r, "leader (", leaderAddr, string(leaderID), ")") + r.logger.Info("entering follower state", "follower", r, "leader-address", leaderAddr, "leader-id", leaderID) metrics.IncrCounter([]string{"raft", "state", "follower"}, 1) heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout) @@ -220,7 +220,7 @@ func (r *Raft) runFollower() { } else { metrics.IncrCounter([]string{"raft", "transition", "heartbeat_timeout"}, 1) if inConfig(r.configurations.latest, r.localID) { - r.logger.Warn("heartbeat timeout reached, starting election", "last-leader (", lastLeaderAddr, ",", lastLeaderID, ")") + r.logger.Warn("heartbeat timeout reached, starting election", "last-leader-addr", lastLeaderAddr, "last-leader-id", lastLeaderID) r.setState(Candidate) return } else { diff --git a/raft_test.go b/raft_test.go index 1d7e5c67e..1fa1453c7 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1930,7 +1930,7 @@ func TestRaft_ProtocolVersion_Upgrade_2_3(t *testing.T) { } func TestRaft_LeaderID_Propagated(t *testing.T) { - // Make a cluster back on protocol version 2. + // Make a cluster on protocol version 3. conf := inmemConfig(t) c := MakeCluster(3, t, conf) defer c.Close() @@ -1965,7 +1965,7 @@ func TestRaft_MixedCluster_Stable_v3Leader(t *testing.T) { c := MakeCluster(2, t, conf) defer c.Close() - // Set up another server speaking protocol version 2. + // Set up another server speaking protocol version 4. conf = inmemConfig(t) conf.ProtocolVersion = 4 c1 := MakeClusterNoBootstrap(1, t, conf) @@ -2002,7 +2002,7 @@ func TestRaft_MixedCluster_Stable_v4Leader(t *testing.T) { c := MakeCluster(2, t, conf) defer c.Close() - // Set up another server speaking protocol version 2. + // Set up another server speaking protocol version 3. conf = inmemConfig(t) conf.ProtocolVersion = 3 c1 := MakeClusterNoBootstrap(1, t, conf) From 86d22fa40d7e59df5381379c6313b30a9f9f2496 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 24 Nov 2021 14:44:33 -0500 Subject: [PATCH 23/31] Fix comment Co-authored-by: Matt Keeler --- raft_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft_test.go b/raft_test.go index 1fa1453c7..c0b8d80b1 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1996,7 +1996,7 @@ func TestRaft_MixedCluster_Stable_v3Leader(t *testing.T) { } func TestRaft_MixedCluster_Stable_v4Leader(t *testing.T) { - // Make a cluster back on protocol version 1. + // Make a cluster on protocol version 4. conf := inmemConfig(t) conf.ProtocolVersion = 4 c := MakeCluster(2, t, conf) From 362140118cba09757110b0fbc8ac8b403c9e3216 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Fri, 26 Nov 2021 15:51:29 -0500 Subject: [PATCH 24/31] fix review comments --- fuzzy/verifier.go | 2 +- net_transport.go | 2 +- net_transport_test.go | 28 +++++++++++++++++----------- raft.go | 11 ++++++----- transport_test.go | 11 +++++++---- 5 files changed, 32 insertions(+), 22 deletions(-) diff --git a/fuzzy/verifier.go b/fuzzy/verifier.go index e8b7e9d3a..d35c5e4ae 100644 --- a/fuzzy/verifier.go +++ b/fuzzy/verifier.go @@ -47,7 +47,7 @@ func (v *appendEntriesVerifier) PreAppendEntries(src, target string, req *raft.A term := req.Term var ldr string if req.ProtocolVersion > 3 { - ldr = string(req.Addr) + ldr = string(req.RPCHeader.Addr) } else { ldr = string(req.Leader) } diff --git a/net_transport.go b/net_transport.go index 02cd2759d..4fae4cfe4 100644 --- a/net_transport.go +++ b/net_transport.go @@ -569,7 +569,7 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en // Check if this is a heartbeat if req.ProtocolVersion > 3 { - if req.Term != 0 && req.Addr != nil && + if req.Term != 0 && req.RPCHeader.Addr != nil && req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && len(req.Entries) == 0 && req.LeaderCommitIndex == 0 { isHeartbeat = true diff --git a/net_transport_test.go b/net_transport_test.go index 32e1ba749..07d4e7511 100644 --- a/net_transport_test.go +++ b/net_transport_test.go @@ -45,8 +45,8 @@ func TestNetworkTransport_CloseStreams(t *testing.T) { }, }, LeaderCommitIndex: 90, + RPCHeader: RPCHeader{Addr: []byte("cartman")}, } - args.Addr = []byte("cartman") resp := AppendEntriesResponse{ Term: 4, @@ -139,11 +139,11 @@ func TestNetworkTransport_Heartbeat_FastPath(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ - Term: 10, + Term: 10, + RPCHeader: RPCHeader{ProtocolVersion: ProtocolVersionMax, Addr: []byte("cartman")}, + Leader: []byte("cartman"), } - args.ProtocolVersion = ProtocolVersionMax - args.Addr = []byte("cartman") - args.Leader = []byte("cartman") + resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -210,8 +210,9 @@ func TestNetworkTransport_AppendEntries(t *testing.T) { }, }, LeaderCommitIndex: 90, + RPCHeader: RPCHeader{Addr: []byte("cartman")}, } - args.Addr = []byte("cartman") + resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -280,8 +281,9 @@ func TestNetworkTransport_AppendEntriesPipeline(t *testing.T) { }, }, LeaderCommitIndex: 90, + RPCHeader: RPCHeader{Addr: []byte("cartman")}, } - args.Addr = []byte("cartman") + resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -365,8 +367,9 @@ func TestNetworkTransport_AppendEntriesPipeline_CloseStreams(t *testing.T) { }, }, LeaderCommitIndex: 90, + RPCHeader: RPCHeader{Addr: []byte("cartman")}, } - args.Addr = []byte("cartman") + resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -467,8 +470,9 @@ func TestNetworkTransport_RequestVote(t *testing.T) { Term: 20, LastLogIndex: 100, LastLogTerm: 19, + RPCHeader: RPCHeader{Addr: []byte("butters")}, } - args.Addr = []byte("butters") + resp := RequestVoteResponse{ Term: 100, Granted: false, @@ -530,8 +534,9 @@ func TestNetworkTransport_InstallSnapshot(t *testing.T) { LastLogTerm: 9, Peers: []byte("blah blah"), Size: 10, + RPCHeader: RPCHeader{Addr: []byte("kyle")}, } - args.Addr = []byte("kyle") + resp := InstallSnapshotResponse{ Term: 10, Success: true, @@ -644,8 +649,9 @@ func TestNetworkTransport_PooledConn(t *testing.T) { }, }, LeaderCommitIndex: 90, + RPCHeader: RPCHeader{Addr: []byte("cartman")}, } - args.Addr = []byte("cartman") + resp := AppendEntriesResponse{ Term: 4, LastLog: 90, diff --git a/raft.go b/raft.go index d1a408f25..0fad38ac3 100644 --- a/raft.go +++ b/raft.go @@ -1491,8 +1491,8 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { var candidate ServerAddress var candidateBytes []byte if req.ProtocolVersion > 3 { - candidate = r.trans.DecodePeer(req.Addr) - candidateBytes = req.Addr + candidate = r.trans.DecodePeer(req.RPCHeader.Addr) + candidateBytes = req.RPCHeader.Addr } else { candidate = r.trans.DecodePeer(req.Candidate) candidateBytes = req.Candidate @@ -1513,7 +1513,8 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { if leaderAddr, leaderID := r.Leader(); leaderAddr != "" && leaderAddr != candidate && !req.LeadershipTransfer { r.logger.Warn("rejecting vote request since we have a leader", "from", candidate, - "leader (", leaderAddr, string(leaderID), ")") + "leader", leaderAddr, + "leader-id", string(leaderID)) return } @@ -1624,7 +1625,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { // Save the current leader if req.ProtocolVersion > 3 { - r.setLeader(r.trans.DecodePeer(req.Addr), ServerID(req.ID)) + r.setLeader(r.trans.DecodePeer(req.RPCHeader.Addr), ServerID(req.ID)) } else { r.setLeader(r.trans.DecodePeer(req.Leader), ServerID(req.ID)) } @@ -1774,7 +1775,7 @@ func (r *Raft) electSelf() <-chan *voteResult { if server.Suffrage == Voter { if server.ID == r.localID { // Persist a vote for ourselves - if err := r.persistVote(req.Term, req.Addr); err != nil { + if err := r.persistVote(req.Term, req.RPCHeader.Addr); err != nil { r.logger.Error("failed to persist vote", "error", err) return nil } diff --git a/transport_test.go b/transport_test.go index 17213337d..255a6be79 100644 --- a/transport_test.go +++ b/transport_test.go @@ -51,8 +51,9 @@ func TestTransport_AppendEntries(t *testing.T) { }, }, LeaderCommitIndex: 90, + RPCHeader: RPCHeader{Addr: []byte("cartman")}, } - args.Addr = []byte("cartman") + resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -114,8 +115,9 @@ func TestTransport_AppendEntriesPipeline(t *testing.T) { }, }, LeaderCommitIndex: 90, + RPCHeader: RPCHeader{Addr: []byte("cartman")}, } - args.Addr = []byte("cartman") + resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -187,8 +189,8 @@ func TestTransport_RequestVote(t *testing.T) { Term: 20, LastLogIndex: 100, LastLogTerm: 19, + RPCHeader: RPCHeader{Addr: []byte("butters")}, } - args.Addr = []byte("butters") resp := RequestVoteResponse{ Term: 100, Granted: false, @@ -244,8 +246,9 @@ func TestTransport_InstallSnapshot(t *testing.T) { LastLogTerm: 9, Peers: []byte("blah blah"), Size: 10, + RPCHeader: RPCHeader{Addr: []byte("kyle")}, } - args.Addr = []byte("kyle") + resp := InstallSnapshotResponse{ Term: 10, Success: true, From 8631c97d0fa2aeb8efbd6632fd16ecec11644693 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Mon, 29 Nov 2021 12:31:15 -0500 Subject: [PATCH 25/31] do not increment protocolVersion to 4 and rely on Addr/ID field being nil to fallback on the old fields. --- config.go | 2 +- fuzzy/verifier.go | 2 +- net_transport.go | 21 +++--- raft.go | 10 +-- raft_test.go | 188 ++++++++++++++++++++++++++++------------------ testing.go | 1 + 6 files changed, 131 insertions(+), 93 deletions(-) diff --git a/config.go b/config.go index eda4e4525..78dde9225 100644 --- a/config.go +++ b/config.go @@ -91,7 +91,7 @@ const ( // ProtocolVersionMin is the minimum protocol version ProtocolVersionMin ProtocolVersion = 0 // ProtocolVersionMax is the maximum protocol version - ProtocolVersionMax = 4 + ProtocolVersionMax = 3 ) // SnapshotVersion is the version of snapshots that this server can understand. diff --git a/fuzzy/verifier.go b/fuzzy/verifier.go index d35c5e4ae..a51efece4 100644 --- a/fuzzy/verifier.go +++ b/fuzzy/verifier.go @@ -46,7 +46,7 @@ func (v *appendEntriesVerifier) PreRequestVote(src, target string, rv *raft.Requ func (v *appendEntriesVerifier) PreAppendEntries(src, target string, req *raft.AppendEntriesRequest) (*raft.AppendEntriesResponse, error) { term := req.Term var ldr string - if req.ProtocolVersion > 3 { + if len(req.RPCHeader.Addr) > 0 { ldr = string(req.RPCHeader.Addr) } else { ldr = string(req.Leader) diff --git a/net_transport.go b/net_transport.go index 4fae4cfe4..cd909ab8d 100644 --- a/net_transport.go +++ b/net_transport.go @@ -567,19 +567,16 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en } rpc.Command = &req + leaderAddr := req.RPCHeader.Addr + if len(leaderAddr) == 0 { + leaderAddr = req.Leader + } + // Check if this is a heartbeat - if req.ProtocolVersion > 3 { - if req.Term != 0 && req.RPCHeader.Addr != nil && - req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && - len(req.Entries) == 0 && req.LeaderCommitIndex == 0 { - isHeartbeat = true - } - } else { - if req.Term != 0 && req.Leader != nil && - req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && - len(req.Entries) == 0 && req.LeaderCommitIndex == 0 { - isHeartbeat = true - } + if req.Term != 0 && leaderAddr != nil && + req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && + len(req.Entries) == 0 && req.LeaderCommitIndex == 0 { + isHeartbeat = true } case rpcRequestVote: diff --git a/raft.go b/raft.go index 0fad38ac3..59434d977 100644 --- a/raft.go +++ b/raft.go @@ -1326,7 +1326,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { } // Save the current leader - if a.ProtocolVersion > 3 { + if len(a.Addr) > 0 { r.setLeader(r.trans.DecodePeer(a.Addr), ServerID(a.ID)) } else { r.setLeader(r.trans.DecodePeer(a.Leader), ServerID(a.ID)) @@ -1490,7 +1490,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // vote! var candidate ServerAddress var candidateBytes []byte - if req.ProtocolVersion > 3 { + if len(req.RPCHeader.Addr) > 0 { candidate = r.trans.DecodePeer(req.RPCHeader.Addr) candidateBytes = req.RPCHeader.Addr } else { @@ -1498,9 +1498,9 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { candidateBytes = req.Candidate } - // Prior to version 4 the peer ID is not part of `RequestVoteRequest`, + // For older raft version ID is not part of the packed message // We assume that the peer is part of the configuration and skip this check - if req.ProtocolVersion > 3 { + if len(req.ID) > 0 { candidateID := ServerID(req.ID) // if the Servers list is empty that mean the cluster is very likely trying to bootstrap, // Grant the vote @@ -1624,7 +1624,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { } // Save the current leader - if req.ProtocolVersion > 3 { + if len(req.ID) > 0 { r.setLeader(r.trans.DecodePeer(req.RPCHeader.Addr), ServerID(req.ID)) } else { r.setLeader(r.trans.DecodePeer(req.Leader), ServerID(req.ID)) diff --git a/raft_test.go b/raft_test.go index c0b8d80b1..4eb015ae2 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1773,6 +1773,65 @@ func TestRaft_Voting(t *testing.T) { } } +func TestRaft_AppendEntry(t *testing.T) { + c := MakeCluster(3, t, nil) + defer c.Close() + followers := c.Followers() + ldr := c.Leader() + ldrT := c.trans[c.IndexOf(ldr)] + + reqAppendEntries := AppendEntriesRequest{ + RPCHeader: ldr.getRPCHeader(), + Term: ldr.getCurrentTerm() + 1, + PrevLogEntry: 0, + PrevLogTerm: ldr.getCurrentTerm(), + Leader: nil, + Entries: []*Log{ + { + Index: 1, + Term: ldr.getCurrentTerm() + 1, + Type: LogCommand, + Data: []byte("log 1"), + }, + }, + LeaderCommitIndex: 90, + } + // a follower that thinks there's a leader should vote for that leader. + var resp AppendEntriesResponse + if err := ldrT.AppendEntries(followers[0].localID, followers[0].localAddr, &reqAppendEntries, &resp); err != nil { + t.Fatalf("RequestVote RPC failed %v", err) + } + + require.True(t, resp.Success) + + headers := ldr.getRPCHeader() + headers.ID = nil + headers.Addr = nil + reqAppendEntries = AppendEntriesRequest{ + RPCHeader: headers, + Term: ldr.getCurrentTerm() + 1, + PrevLogEntry: 0, + PrevLogTerm: ldr.getCurrentTerm(), + Leader: ldr.trans.EncodePeer(ldr.config().LocalID, ldr.localAddr), + Entries: []*Log{ + { + Index: 1, + Term: ldr.getCurrentTerm() + 1, + Type: LogCommand, + Data: []byte("log 1"), + }, + }, + LeaderCommitIndex: 90, + } + // a follower that thinks there's a leader should vote for that leader. + var resp2 AppendEntriesResponse + if err := ldrT.AppendEntries(followers[0].localID, followers[0].localAddr, &reqAppendEntries, &resp2); err != nil { + t.Fatalf("RequestVote RPC failed %v", err) + } + + require.True(t, resp2.Success) +} + func TestRaft_Voting_portocolVersion3(t *testing.T) { conf := inmemConfig(t) conf.ProtocolVersion = 3 @@ -1938,7 +1997,7 @@ func TestRaft_LeaderID_Propagated(t *testing.T) { require.NoError(t, err) for _, n := range c.rafts { - require.Equal(t, ProtocolVersion(4), n.protocolVersion) + require.Equal(t, ProtocolVersion(3), n.protocolVersion) addr, id := n.Leader() require.NotEmpty(t, id) require.NotEmpty(t, addr) @@ -1958,79 +2017,6 @@ func TestRaft_LeaderID_Propagated(t *testing.T) { c.EnsureLeader(t, c.Leader().localAddr) } -func TestRaft_MixedCluster_Stable_v3Leader(t *testing.T) { - // Make a cluster back on protocol version 1. - conf := inmemConfig(t) - conf.ProtocolVersion = 3 - c := MakeCluster(2, t, conf) - defer c.Close() - - // Set up another server speaking protocol version 4. - conf = inmemConfig(t) - conf.ProtocolVersion = 4 - c1 := MakeClusterNoBootstrap(1, t, conf) - - // Merge clusters. - c.Merge(c1) - c.FullyConnect() - - future := c.Leader().AddVoter(c1.rafts[0].localID, c1.rafts[0].localAddr, 0, 1*time.Second) - if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) - } - err := waitForLeader(c) - require.NoError(t, err) - for i := 0; i < 5; i++ { - future := c.Leader().Apply([]byte(fmt.Sprintf("test%d", i)), 0) - if err := future.Error(); err != nil { - t.Fatalf("[ERR] err: %v", err) - } - } - // Wait a while - time.Sleep(c.propagateTimeout) - - // Sanity check the cluster. - c.EnsureSame(t) - c.EnsureSamePeers(t) - c.EnsureLeader(t, c.Leader().localAddr) -} - -func TestRaft_MixedCluster_Stable_v4Leader(t *testing.T) { - // Make a cluster on protocol version 4. - conf := inmemConfig(t) - conf.ProtocolVersion = 4 - c := MakeCluster(2, t, conf) - defer c.Close() - - // Set up another server speaking protocol version 3. - conf = inmemConfig(t) - conf.ProtocolVersion = 3 - c1 := MakeClusterNoBootstrap(1, t, conf) - - // Merge clusters. - c.Merge(c1) - c.FullyConnect() - future := c.Leader().AddVoter(c1.rafts[0].localID, c1.rafts[0].localAddr, 0, 1*time.Second) - if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) - } - err := waitForLeader(c) - require.NoError(t, err) - for i := 0; i < 5; i++ { - future := c.Leader().Apply([]byte(fmt.Sprintf("test%d", i)), 0) - if err := future.Error(); err != nil { - t.Fatalf("[ERR] err: %v", err) - } - } - // Wait a while - c.WaitForReplication(5) - - // Sanity check the cluster. - c.EnsureSame(t) - c.EnsureSamePeers(t) - c.EnsureLeader(t, c.Leader().localAddr) -} - func TestRaft_LeadershipTransferInProgress(t *testing.T) { r := &Raft{leaderState: leaderState{}} r.setupLeaderState() @@ -2569,6 +2555,60 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { } } +func TestRaft_VoteWithNoIDNoAddr(t *testing.T) { + // Make a cluster + c := MakeCluster(3, t, nil) + + defer c.Close() + waitForLeader(c) + + leader := c.Leader() + + // Wait until we have 2 followers + limit := time.Now().Add(c.longstopTimeout) + var followers []*Raft + for time.Now().Before(limit) && len(followers) != 2 { + c.WaitEvent(nil, c.conf.CommitTimeout) + followers = c.GetInState(Follower) + } + if len(followers) != 2 { + t.Fatalf("expected two followers: %v", followers) + } + + follower := followers[0] + + headers := follower.getRPCHeader() + headers.ID = nil + headers.Addr = nil + reqVote := RequestVoteRequest{ + RPCHeader: headers, + Term: follower.getCurrentTerm() + 10, + LastLogIndex: follower.LastIndex(), + LastLogTerm: follower.getCurrentTerm(), + Candidate: follower.trans.EncodePeer(follower.config().LocalID, follower.localAddr), + LeadershipTransfer: false, + } + // a follower that thinks there's a leader should vote for that leader. + var resp RequestVoteResponse + followerT := c.trans[c.IndexOf(followers[1])] + c.Partition([]ServerAddress{leader.localAddr}) + time.Sleep(c.propagateTimeout) + + // wait for the remaining follower to trigger an election + waitForState(follower, Candidate) + require.Equal(t, Candidate, follower.getState()) + // send a vote request from the removed follower to the Candidate follower + + if err := followerT.RequestVote(follower.localID, follower.localAddr, &reqVote, &resp); err != nil { + t.Fatalf("RequestVote RPC failed %v", err) + } + + // the vote request should not be granted, because the voter is not part of the cluster anymore + if !resp.Granted { + t.Fatalf("expected vote to not be granted, but it was %+v", resp) + } +} + func waitForState(follower *Raft, state RaftState) { count := 0 for follower.getState() != state && count < 1000 { diff --git a/testing.go b/testing.go index 36fad5f5c..13f1ccd28 100644 --- a/testing.go +++ b/testing.go @@ -334,6 +334,7 @@ CHECK: num := len(fsm.logs) fsm.Unlock() if num != fsmLength { + fmt.Printf("fsm len : %d\n", num) continue CHECK } } From 634a6022c6fcf9d5669fb87d9472c9c438d4ca7e Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 30 Nov 2021 15:20:28 -0500 Subject: [PATCH 26/31] deprecate `Leader` in favor of `LeaderWithID` --- api.go | 13 ++++++++++++- raft.go | 6 +++--- raft_test.go | 8 ++++---- testing.go | 2 +- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/api.go b/api.go index 6440471b8..40a07242f 100644 --- a/api.go +++ b/api.go @@ -718,7 +718,18 @@ func (r *Raft) BootstrapCluster(configuration Configuration) Future { // Leader is used to return the current leader of the cluster. // It may return empty string if there is no current leader // or the leader is unknown. -func (r *Raft) Leader() (ServerAddress, ServerID) { +// Deprecated: Leader is deprecated, use LeaderWithID instead. +func (r *Raft) Leader() ServerAddress { + r.leaderLock.RLock() + leaderAddr := r.leaderAddr + r.leaderLock.RUnlock() + return leaderAddr +} + +// LeaderWithID is used to return the current leader address and ID of the cluster. +// It may return empty strings if there is no current leader +// or the leader is unknown. +func (r *Raft) LeaderWithID() (ServerAddress, ServerID) { r.leaderLock.RLock() leaderAddr := r.leaderAddr leaderID := r.leaderID diff --git a/raft.go b/raft.go index 59434d977..50b965321 100644 --- a/raft.go +++ b/raft.go @@ -154,7 +154,7 @@ func (r *Raft) run() { // runFollower runs the FSM for a follower. func (r *Raft) runFollower() { didWarn := false - leaderAddr, leaderID := r.Leader() + leaderAddr, leaderID := r.LeaderWithID() r.logger.Info("entering follower state", "follower", r, "leader-address", leaderAddr, "leader-id", leaderID) metrics.IncrCounter([]string{"raft", "state", "follower"}, 1) heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout) @@ -203,7 +203,7 @@ func (r *Raft) runFollower() { } // Heartbeat failed! Transition to the candidate state - lastLeaderAddr, lastLeaderID := r.Leader() + lastLeaderAddr, lastLeaderID := r.LeaderWithID() r.setLeader("", "") if r.configurations.latestIndex == 0 { @@ -1510,7 +1510,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { return } } - if leaderAddr, leaderID := r.Leader(); leaderAddr != "" && leaderAddr != candidate && !req.LeadershipTransfer { + if leaderAddr, leaderID := r.LeaderWithID(); leaderAddr != "" && leaderAddr != candidate && !req.LeadershipTransfer { r.logger.Warn("rejecting vote request since we have a leader", "from", candidate, "leader", leaderAddr, diff --git a/raft_test.go b/raft_test.go index 4eb015ae2..393365217 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1556,10 +1556,10 @@ LOOP: } // Ensure both have cleared their leader - if l, id := leader.Leader(); l != "" && id != "" { + if l, id := leader.LeaderWithID(); l != "" && id != "" { t.Fatalf("bad: %v", l) } - if l, id := follower.Leader(); l != "" && id != "" { + if l, id := follower.LeaderWithID(); l != "" && id != "" { t.Fatalf("bad: %v", l) } } @@ -1661,7 +1661,7 @@ func TestRaft_VerifyLeader_Fail(t *testing.T) { } // Ensure the known leader is cleared - if l, _ := leader.Leader(); l != "" { + if l, _ := leader.LeaderWithID(); l != "" { t.Fatalf("bad: %v", l) } } @@ -1998,7 +1998,7 @@ func TestRaft_LeaderID_Propagated(t *testing.T) { for _, n := range c.rafts { require.Equal(t, ProtocolVersion(3), n.protocolVersion) - addr, id := n.Leader() + addr, id := n.LeaderWithID() require.NotEmpty(t, id) require.NotEmpty(t, addr) } diff --git a/testing.go b/testing.go index 13f1ccd28..9ee0dac85 100644 --- a/testing.go +++ b/testing.go @@ -539,7 +539,7 @@ func (c *cluster) EnsureLeader(t *testing.T, expect ServerAddress) { // think the leader is correct fail := false for _, r := range c.rafts { - leaderAddr, _ := r.Leader() + leaderAddr, _ := r.LeaderWithID() if leaderAddr != expect { if leaderAddr == "" { From a8af0db5a41c2aa21c330775add0615332cad67b Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 1 Dec 2021 09:26:38 -0500 Subject: [PATCH 27/31] remove duplicate test and code clean up --- raft.go | 2 +- raft_test.go | 46 ++-------------------------------------------- replication.go | 6 +++--- testing.go | 1 - 4 files changed, 6 insertions(+), 49 deletions(-) diff --git a/raft.go b/raft.go index 50b965321..b5a53eb61 100644 --- a/raft.go +++ b/raft.go @@ -1746,7 +1746,7 @@ func (r *Raft) electSelf() <-chan *voteResult { req := &RequestVoteRequest{ RPCHeader: r.getRPCHeader(), Term: r.getCurrentTerm(), - // this is needed for retro compatibility with protocolVersion = 3 + // this is needed for retro compatibility, before RPCHeader.Addr was added Candidate: r.trans.EncodePeer(r.localID, r.localAddr), LastLogIndex: lastIdx, LastLogTerm: lastTerm, diff --git a/raft_test.go b/raft_test.go index 393365217..1a5355394 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1731,48 +1731,6 @@ func TestRaft_NotifyCh(t *testing.T) { } } -func TestRaft_Voting(t *testing.T) { - c := MakeCluster(3, t, nil) - defer c.Close() - followers := c.Followers() - ldr := c.Leader() - ldrT := c.trans[c.IndexOf(ldr)] - - reqVote := RequestVoteRequest{ - RPCHeader: ldr.getRPCHeader(), - Term: ldr.getCurrentTerm() + 10, - LastLogIndex: ldr.LastIndex(), - LastLogTerm: ldr.getCurrentTerm(), - LeadershipTransfer: false, - } - // a follower that thinks there's a leader should vote for that leader. - var resp RequestVoteResponse - if err := ldrT.RequestVote(followers[0].localID, followers[0].localAddr, &reqVote, &resp); err != nil { - t.Fatalf("RequestVote RPC failed %v", err) - } - if !resp.Granted { - t.Fatalf("expected vote to be granted, but wasn't %+v", resp) - } - // a follower that thinks there's a leader shouldn't vote for a different candidate - reqVote.Addr = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) - if err := ldrT.RequestVote(followers[1].localID, followers[1].localAddr, &reqVote, &resp); err != nil { - t.Fatalf("RequestVote RPC failed %v", err) - } - if resp.Granted { - t.Fatalf("expected vote not to be granted, but was %+v", resp) - } - // a follower that thinks there's a leader, but the request has the leadership transfer flag, should - // vote for a different candidate - reqVote.LeadershipTransfer = true - reqVote.Addr = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) - if err := ldrT.RequestVote(followers[1].localID, followers[1].localAddr, &reqVote, &resp); err != nil { - t.Fatalf("RequestVote RPC failed %v", err) - } - if !resp.Granted { - t.Fatalf("expected vote to be granted, but wasn't %+v", resp) - } -} - func TestRaft_AppendEntry(t *testing.T) { c := MakeCluster(3, t, nil) defer c.Close() @@ -1832,7 +1790,7 @@ func TestRaft_AppendEntry(t *testing.T) { require.True(t, resp2.Success) } -func TestRaft_Voting_portocolVersion3(t *testing.T) { +func TestRaft_VotingGrant_WhenLeaderAvailable(t *testing.T) { conf := inmemConfig(t) conf.ProtocolVersion = 3 c := MakeCluster(3, t, conf) @@ -2482,7 +2440,7 @@ func TestRaft_InstallSnapshot_InvalidPeers(t *testing.T) { require.Contains(t, resp.Error.Error(), "failed to decode peers") } -func TestRaft_RemovedFollower_Vote(t *testing.T) { +func TestRaft_VoteNotGranted_WhenNodeNotInCluster(t *testing.T) { // Make a cluster c := MakeCluster(3, t, nil) diff --git a/replication.go b/replication.go index 6266a8f80..b3301b5c5 100644 --- a/replication.go +++ b/replication.go @@ -320,7 +320,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { RPCHeader: r.getRPCHeader(), SnapshotVersion: meta.Version, Term: s.currentTerm, - // this is needed for retro compatibility with protocolVersion = 3 + // this is needed for retro compatibility, before RPCHeader.Addr was added Leader: r.trans.EncodePeer(r.localID, r.localAddr), LastLogIndex: meta.Index, LastLogTerm: meta.Term, @@ -382,7 +382,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { req := AppendEntriesRequest{ RPCHeader: r.getRPCHeader(), Term: s.currentTerm, - // this is needed for retro compatibility with protocolVersion = 3 + // this is needed for retro compatibility, before RPCHeader.Addr was added Leader: r.trans.EncodePeer(r.localID, r.localAddr), } @@ -555,7 +555,7 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh, func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequest, nextIndex, lastIndex uint64) error { req.RPCHeader = r.getRPCHeader() req.Term = s.currentTerm - // this is needed for retro compatibility with protocolVersion = 3 + // this is needed for retro compatibility, before RPCHeader.Addr was added req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) req.LeaderCommitIndex = r.getCommitIndex() if err := r.setPreviousLog(req, nextIndex); err != nil { diff --git a/testing.go b/testing.go index 9ee0dac85..3c5d42e69 100644 --- a/testing.go +++ b/testing.go @@ -334,7 +334,6 @@ CHECK: num := len(fsm.logs) fsm.Unlock() if num != fsmLength { - fmt.Printf("fsm len : %d\n", num) continue CHECK } } From d4932d0211cf1ee6f0660e2d9dcdd9aefb2b7dcb Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 1 Dec 2021 09:32:13 -0500 Subject: [PATCH 28/31] add deprecation for `Leader` and `Candidate` fields --- commands.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/commands.go b/commands.go index 835293298..637e19d13 100644 --- a/commands.go +++ b/commands.go @@ -25,7 +25,9 @@ type AppendEntriesRequest struct { RPCHeader // Provide the current term and leader - Term uint64 + Term uint64 + + //Deprecated, use RPCHeader.Addr instead Leader []byte // Provide the previous entries for integrity checking @@ -74,7 +76,9 @@ type RequestVoteRequest struct { RPCHeader // Provide the term and our id - Term uint64 + Term uint64 + + //Deprecated, use RPCHeader.Addr instead Candidate []byte // Used to ensure safety From 86c4d964f95777771a6ffb66a6eb950b4c650b8d Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Thu, 2 Dec 2021 10:48:19 -0500 Subject: [PATCH 29/31] fix deprecation comments --- commands.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/commands.go b/commands.go index 637e19d13..2ddd418d2 100644 --- a/commands.go +++ b/commands.go @@ -27,7 +27,7 @@ type AppendEntriesRequest struct { // Provide the current term and leader Term uint64 - //Deprecated, use RPCHeader.Addr instead + // Deprecated: use RPCHeader.Addr instead Leader []byte // Provide the previous entries for integrity checking @@ -78,7 +78,7 @@ type RequestVoteRequest struct { // Provide the term and our id Term uint64 - //Deprecated, use RPCHeader.Addr instead + // Deprecated: use RPCHeader.Addr instead Candidate []byte // Used to ensure safety @@ -130,9 +130,10 @@ type InstallSnapshotRequest struct { LastLogIndex uint64 LastLogTerm uint64 - // Peer Set in the snapshot. This is deprecated in favor of Configuration + // Peer Set in the snapshot. // but remains here in case we receive an InstallSnapshot from a leader // that's running old code. + // Deprecated: This is deprecated in favor of Configuration Peers []byte // Cluster membership. From 4fa2eb0178e3d744ca43fed4f0379fd588a033aa Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 5 Jan 2022 16:01:33 -0500 Subject: [PATCH 30/31] check if node is voter instead if it's part of the cluster, nonVoter should not have votes granted. --- raft.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/raft.go b/raft.go index 91e18ea56..b6d9b67f9 100644 --- a/raft.go +++ b/raft.go @@ -1502,8 +1502,8 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { candidateID := ServerID(req.ID) // if the Servers list is empty that mean the cluster is very likely trying to bootstrap, // Grant the vote - if len(r.configurations.latest.Servers) > 0 && !inConfig(r.configurations.latest, candidateID) { - r.logger.Warn("rejecting vote request since voter not part of the configuration", + if len(r.configurations.latest.Servers) > 0 && !hasVote(r.configurations.latest, candidateID) { + r.logger.Warn("rejecting vote request since node is not a voter", "from", candidate) return } From 0e724f0025c25785cb263105f6edc68c94a37239 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Mon, 7 Mar 2022 21:24:53 +0100 Subject: [PATCH 31/31] add deprecation notice for Leader() API. --- api.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api.go b/api.go index 40a07242f..83be5a046 100644 --- a/api.go +++ b/api.go @@ -716,9 +716,10 @@ func (r *Raft) BootstrapCluster(configuration Configuration) Future { } // Leader is used to return the current leader of the cluster. +// Deprecated: use LeaderWithID instead // It may return empty string if there is no current leader // or the leader is unknown. -// Deprecated: Leader is deprecated, use LeaderWithID instead. +// Deprecated: use LeaderWithID instead. func (r *Raft) Leader() ServerAddress { r.leaderLock.RLock() leaderAddr := r.leaderAddr