From 38cb18695192d910bd774e2442d198fdc2c691a0 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Mon, 7 Mar 2022 21:29:24 +0100 Subject: [PATCH] node not part of the cluster is not allowed to vote (#477) * add test to verify that a removed node is not able to vote * add some comments to the tests * add wait loop to test * remove test related to removed node voting * add test that check a removed node is not allowed to vote * add wait loop to make test more robust * increase timeout * Revert "increase timeout" This reverts commit 316c59ad1f9b993548b4778870c35673260f51a2. * add ID to `RequestVoteRequest` and check if the node is part of the cluster before granting a vote * use request protocol version to ensure we have the right version * add `ID` and `Addr` as part of `RPCHeader` and do not fill `Candidate` and `Leader` in version > 3 * return `LeaderID` as part of the `Leader()` api * fix docstring * fix retro compatibility with version 3 * fix string casting * remove `EncodeID` and `DecodeID` from `transport` interface * add missing `leaderID` initialization * add protocol version 3 to version 4 upgrade test * increase test timeout * split test, and clean code * add docstrings to `RPCHeader` * Apply suggestions from code review Co-authored-by: Matt Keeler * Fix comment Co-authored-by: Matt Keeler * fix review comments * do not increment protocolVersion to 4 and rely on Addr/ID field being nil to fallback on the old fields. * deprecate `Leader` in favor of `LeaderWithID` * remove duplicate test and code clean up * add deprecation for `Leader` and `Candidate` fields * fix deprecation comments * check if node is voter instead if it's part of the cluster, nonVoter should not have votes granted. * add deprecation notice for Leader() API. Co-authored-by: Matt Keeler --- api.go | 23 +++- commands.go | 15 ++- fuzzy/verifier.go | 8 +- net_transport.go | 7 +- net_transport_test.go | 27 +++-- observer.go | 3 +- raft.go | 90 +++++++++----- raft_test.go | 273 +++++++++++++++++++++++++++++++++++++++++- replication.go | 12 +- testing.go | 22 +++- transport_test.go | 11 +- 11 files changed, 425 insertions(+), 66 deletions(-) diff --git a/api.go b/api.go index c056ec8b9..82c279609 100644 --- a/api.go +++ b/api.go @@ -115,8 +115,10 @@ type Raft struct { lastContact time.Time lastContactLock sync.RWMutex - // Leader is the current cluster leader - leader ServerAddress + // leaderAddr is the current cluster leader Address + leaderAddr ServerAddress + // LeaderID is the current cluster leader ID + leaderID ServerID leaderLock sync.RWMutex // leaderCh is used to notify of leadership changes @@ -736,13 +738,26 @@ func (r *Raft) BootstrapCluster(configuration Configuration) Future { } // Leader is used to return the current leader of the cluster. +// Deprecated: use LeaderWithID instead // It may return empty string if there is no current leader // or the leader is unknown. +// Deprecated: use LeaderWithID instead. func (r *Raft) Leader() ServerAddress { r.leaderLock.RLock() - leader := r.leader + leaderAddr := r.leaderAddr r.leaderLock.RUnlock() - return leader + return leaderAddr +} + +// LeaderWithID is used to return the current leader address and ID of the cluster. +// It may return empty strings if there is no current leader +// or the leader is unknown. +func (r *Raft) LeaderWithID() (ServerAddress, ServerID) { + r.leaderLock.RLock() + leaderAddr := r.leaderAddr + leaderID := r.leaderID + r.leaderLock.RUnlock() + return leaderAddr, leaderID } // Apply is used to apply a command to the FSM in a highly consistent diff --git a/commands.go b/commands.go index 3358a3284..2ddd418d2 100644 --- a/commands.go +++ b/commands.go @@ -8,6 +8,10 @@ type RPCHeader struct { // ProtocolVersion is the version of the protocol the sender is // speaking. ProtocolVersion ProtocolVersion + // ID is the ServerID of the node sending the RPC Request or Response + ID []byte + // Addr is the ServerAddr of the node sending the RPC Request or Response + Addr []byte } // WithRPCHeader is an interface that exposes the RPC header. @@ -21,7 +25,9 @@ type AppendEntriesRequest struct { RPCHeader // Provide the current term and leader - Term uint64 + Term uint64 + + // Deprecated: use RPCHeader.Addr instead Leader []byte // Provide the previous entries for integrity checking @@ -70,7 +76,9 @@ type RequestVoteRequest struct { RPCHeader // Provide the term and our id - Term uint64 + Term uint64 + + // Deprecated: use RPCHeader.Addr instead Candidate []byte // Used to ensure safety @@ -122,9 +130,10 @@ type InstallSnapshotRequest struct { LastLogIndex uint64 LastLogTerm uint64 - // Peer Set in the snapshot. This is deprecated in favor of Configuration + // Peer Set in the snapshot. // but remains here in case we receive an InstallSnapshot from a leader // that's running old code. + // Deprecated: This is deprecated in favor of Configuration Peers []byte // Cluster membership. diff --git a/fuzzy/verifier.go b/fuzzy/verifier.go index 44b5ad5b5..a51efece4 100644 --- a/fuzzy/verifier.go +++ b/fuzzy/verifier.go @@ -45,7 +45,13 @@ func (v *appendEntriesVerifier) PreRequestVote(src, target string, rv *raft.Requ func (v *appendEntriesVerifier) PreAppendEntries(src, target string, req *raft.AppendEntriesRequest) (*raft.AppendEntriesResponse, error) { term := req.Term - ldr := string(req.Leader) + var ldr string + if len(req.RPCHeader.Addr) > 0 { + ldr = string(req.RPCHeader.Addr) + } else { + ldr = string(req.Leader) + } + if ldr != src { v.Lock() defer v.Unlock() diff --git a/net_transport.go b/net_transport.go index 307dcab6f..2e98bd36d 100644 --- a/net_transport.go +++ b/net_transport.go @@ -584,8 +584,13 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en } rpc.Command = &req + leaderAddr := req.RPCHeader.Addr + if len(leaderAddr) == 0 { + leaderAddr = req.Leader + } + // Check if this is a heartbeat - if req.Term != 0 && req.Leader != nil && + if req.Term != 0 && leaderAddr != nil && req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && len(req.Entries) == 0 && req.LeaderCommitIndex == 0 { isHeartbeat = true diff --git a/net_transport_test.go b/net_transport_test.go index d42d469db..07d4e7511 100644 --- a/net_transport_test.go +++ b/net_transport_test.go @@ -35,7 +35,6 @@ func TestNetworkTransport_CloseStreams(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ Term: 10, - Leader: []byte("cartman"), PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ @@ -46,7 +45,9 @@ func TestNetworkTransport_CloseStreams(t *testing.T) { }, }, LeaderCommitIndex: 90, + RPCHeader: RPCHeader{Addr: []byte("cartman")}, } + resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -138,9 +139,11 @@ func TestNetworkTransport_Heartbeat_FastPath(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ - Term: 10, - Leader: []byte("cartman"), + Term: 10, + RPCHeader: RPCHeader{ProtocolVersion: ProtocolVersionMax, Addr: []byte("cartman")}, + Leader: []byte("cartman"), } + resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -197,7 +200,6 @@ func TestNetworkTransport_AppendEntries(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ Term: 10, - Leader: []byte("cartman"), PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ @@ -208,7 +210,9 @@ func TestNetworkTransport_AppendEntries(t *testing.T) { }, }, LeaderCommitIndex: 90, + RPCHeader: RPCHeader{Addr: []byte("cartman")}, } + resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -267,7 +271,6 @@ func TestNetworkTransport_AppendEntriesPipeline(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ Term: 10, - Leader: []byte("cartman"), PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ @@ -278,7 +281,9 @@ func TestNetworkTransport_AppendEntriesPipeline(t *testing.T) { }, }, LeaderCommitIndex: 90, + RPCHeader: RPCHeader{Addr: []byte("cartman")}, } + resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -352,7 +357,6 @@ func TestNetworkTransport_AppendEntriesPipeline_CloseStreams(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ Term: 10, - Leader: []byte("cartman"), PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ @@ -363,7 +367,9 @@ func TestNetworkTransport_AppendEntriesPipeline_CloseStreams(t *testing.T) { }, }, LeaderCommitIndex: 90, + RPCHeader: RPCHeader{Addr: []byte("cartman")}, } + resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -462,10 +468,11 @@ func TestNetworkTransport_RequestVote(t *testing.T) { // Make the RPC request args := RequestVoteRequest{ Term: 20, - Candidate: []byte("butters"), LastLogIndex: 100, LastLogTerm: 19, + RPCHeader: RPCHeader{Addr: []byte("butters")}, } + resp := RequestVoteResponse{ Term: 100, Granted: false, @@ -523,12 +530,13 @@ func TestNetworkTransport_InstallSnapshot(t *testing.T) { // Make the RPC request args := InstallSnapshotRequest{ Term: 10, - Leader: []byte("kyle"), LastLogIndex: 100, LastLogTerm: 9, Peers: []byte("blah blah"), Size: 10, + RPCHeader: RPCHeader{Addr: []byte("kyle")}, } + resp := InstallSnapshotResponse{ Term: 10, Success: true, @@ -631,7 +639,6 @@ func TestNetworkTransport_PooledConn(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ Term: 10, - Leader: []byte("cartman"), PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ @@ -642,7 +649,9 @@ func TestNetworkTransport_PooledConn(t *testing.T) { }, }, LeaderCommitIndex: 90, + RPCHeader: RPCHeader{Addr: []byte("cartman")}, } + resp := AppendEntriesResponse{ Term: 4, LastLog: 90, diff --git a/observer.go b/observer.go index 29f2d5802..c45e7f632 100644 --- a/observer.go +++ b/observer.go @@ -19,7 +19,8 @@ type Observation struct { // LeaderObservation is used for the data when leadership changes. type LeaderObservation struct { - Leader ServerAddress + LeaderAddr ServerAddress + LeaderID ServerID } // PeerObservation is sent to observers when peers change. diff --git a/raft.go b/raft.go index 2cbeb778d..eebc2e113 100644 --- a/raft.go +++ b/raft.go @@ -31,6 +31,8 @@ var ( func (r *Raft) getRPCHeader() RPCHeader { return RPCHeader{ ProtocolVersion: r.config().ProtocolVersion, + ID: []byte(r.config().LocalID), + Addr: r.trans.EncodePeer(r.config().LocalID, r.localAddr), } } @@ -90,14 +92,16 @@ type leaderState struct { stepDown chan struct{} } -// setLeader is used to modify the current leader of the cluster -func (r *Raft) setLeader(leader ServerAddress) { +// setLeader is used to modify the current leader Address and ID of the cluster +func (r *Raft) setLeader(leaderAddr ServerAddress, leaderID ServerID) { r.leaderLock.Lock() - oldLeader := r.leader - r.leader = leader + oldLeaderAddr := r.leaderAddr + r.leaderAddr = leaderAddr + oldLeaderID := r.leaderID + r.leaderID = leaderID r.leaderLock.Unlock() - if oldLeader != leader { - r.observe(LeaderObservation{Leader: leader}) + if oldLeaderAddr != leaderAddr || oldLeaderID != leaderID { + r.observe(LeaderObservation{LeaderAddr: leaderAddr, LeaderID: leaderID}) } } @@ -130,7 +134,7 @@ func (r *Raft) run() { select { case <-r.shutdownCh: // Clear the leader to prevent forwarding - r.setLeader("") + r.setLeader("", "") return default: } @@ -149,7 +153,8 @@ func (r *Raft) run() { // runFollower runs the main loop while in the follower state. func (r *Raft) runFollower() { didWarn := false - r.logger.Info("entering follower state", "follower", r, "leader", r.Leader()) + leaderAddr, leaderID := r.LeaderWithID() + r.logger.Info("entering follower state", "follower", r, "leader-address", leaderAddr, "leader-id", leaderID) metrics.IncrCounter([]string{"raft", "state", "follower"}, 1) heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout) @@ -197,8 +202,8 @@ func (r *Raft) runFollower() { } // Heartbeat failed! Transition to the candidate state - lastLeader := r.Leader() - r.setLeader("") + lastLeaderAddr, lastLeaderID := r.LeaderWithID() + r.setLeader("", "") if r.configurations.latestIndex == 0 { if !didWarn { @@ -214,7 +219,7 @@ func (r *Raft) runFollower() { } else { metrics.IncrCounter([]string{"raft", "transition", "heartbeat_timeout"}, 1) if hasVote(r.configurations.latest, r.localID) { - r.logger.Warn("heartbeat timeout reached, starting election", "last-leader", lastLeader) + r.logger.Warn("heartbeat timeout reached, starting election", "last-leader-addr", lastLeaderAddr, "last-leader-id", lastLeaderID) r.setState(Candidate) return } else if !didWarn { @@ -301,7 +306,7 @@ func (r *Raft) runCandidate() { if grantedVotes >= votesNeeded { r.logger.Info("election won", "tally", grantedVotes) r.setState(Leader) - r.setLeader(r.localAddr) + r.setLeader(r.localAddr, r.localID) return } @@ -436,8 +441,9 @@ func (r *Raft) runLeader() { // We may have stepped down due to an RPC call, which would // provide the leader, so we cannot always blank this out. r.leaderLock.Lock() - if r.leader == r.localAddr { - r.leader = "" + if r.leaderAddr == r.localAddr && r.leaderID == r.localID { + r.leaderAddr = "" + r.leaderID = "" } r.leaderLock.Unlock() @@ -1317,8 +1323,11 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { } // Save the current leader - r.setLeader(r.trans.DecodePeer(a.Leader)) - + if len(a.Addr) > 0 { + r.setLeader(r.trans.DecodePeer(a.Addr), ServerID(a.ID)) + } else { + r.setLeader(r.trans.DecodePeer(a.Leader), ServerID(a.ID)) + } // Verify the last log entry if a.PrevLogEntry > 0 { lastIdx, lastTerm := r.getLastEntry() @@ -1474,11 +1483,33 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // check the LeadershipTransfer flag is set. Usually votes are rejected if // there is a known leader. But if the leader initiated a leadership transfer, // vote! - candidate := r.trans.DecodePeer(req.Candidate) - if leader := r.Leader(); leader != "" && leader != candidate && !req.LeadershipTransfer { + var candidate ServerAddress + var candidateBytes []byte + if len(req.RPCHeader.Addr) > 0 { + candidate = r.trans.DecodePeer(req.RPCHeader.Addr) + candidateBytes = req.RPCHeader.Addr + } else { + candidate = r.trans.DecodePeer(req.Candidate) + candidateBytes = req.Candidate + } + + // For older raft version ID is not part of the packed message + // We assume that the peer is part of the configuration and skip this check + if len(req.ID) > 0 { + candidateID := ServerID(req.ID) + // if the Servers list is empty that mean the cluster is very likely trying to bootstrap, + // Grant the vote + if len(r.configurations.latest.Servers) > 0 && !hasVote(r.configurations.latest, candidateID) { + r.logger.Warn("rejecting vote request since node is not a voter", + "from", candidate) + return + } + } + if leaderAddr, leaderID := r.LeaderWithID(); leaderAddr != "" && leaderAddr != candidate && !req.LeadershipTransfer { r.logger.Warn("rejecting vote request since we have a leader", "from", candidate, - "leader", leader) + "leader", leaderAddr, + "leader-id", string(leaderID)) return } @@ -1511,7 +1542,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // Check if we've voted in this election before if lastVoteTerm == req.Term && lastVoteCandBytes != nil { r.logger.Info("duplicate requestVote for same term", "term", req.Term) - if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 { + if bytes.Compare(lastVoteCandBytes, candidateBytes) == 0 { r.logger.Warn("duplicate requestVote from", "candidate", candidate) resp.Granted = true } @@ -1537,7 +1568,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { } // Persist a vote for safety - if err := r.persistVote(req.Term, req.Candidate); err != nil { + if err := r.persistVote(req.Term, candidateBytes); err != nil { r.logger.Error("failed to persist vote", "error", err) return } @@ -1588,7 +1619,11 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { } // Save the current leader - r.setLeader(r.trans.DecodePeer(req.Leader)) + if len(req.ID) > 0 { + r.setLeader(r.trans.DecodePeer(req.RPCHeader.Addr), ServerID(req.ID)) + } else { + r.setLeader(r.trans.DecodePeer(req.Leader), ServerID(req.ID)) + } // Create a new snapshot var reqConfiguration Configuration @@ -1710,8 +1745,9 @@ func (r *Raft) electSelf() <-chan *voteResult { // Construct the request lastIdx, lastTerm := r.getLastEntry() req := &RequestVoteRequest{ - RPCHeader: r.getRPCHeader(), - Term: r.getCurrentTerm(), + RPCHeader: r.getRPCHeader(), + Term: r.getCurrentTerm(), + // this is needed for retro compatibility, before RPCHeader.Addr was added Candidate: r.trans.EncodePeer(r.localID, r.localAddr), LastLogIndex: lastIdx, LastLogTerm: lastTerm, @@ -1740,7 +1776,7 @@ func (r *Raft) electSelf() <-chan *voteResult { if server.Suffrage == Voter { if server.ID == r.localID { // Persist a vote for ourselves - if err := r.persistVote(req.Term, req.Candidate); err != nil { + if err := r.persistVote(req.Term, req.RPCHeader.Addr); err != nil { r.logger.Error("failed to persist vote", "error", err) return nil } @@ -1786,7 +1822,7 @@ func (r *Raft) setCurrentTerm(t uint64) { // transition causes the known leader to be cleared. This means // that leader should be set only after updating the state. func (r *Raft) setState(state RaftState) { - r.setLeader("") + r.setLeader("", "") oldState := r.raftState.getState() r.raftState.setState(state) if oldState != state { @@ -1843,7 +1879,7 @@ func (r *Raft) initiateLeadershipTransfer(id *ServerID, address *ServerAddress) // timeoutNow is what happens when a server receives a TimeoutNowRequest. func (r *Raft) timeoutNow(rpc RPC, req *TimeoutNowRequest) { - r.setLeader("") + r.setLeader("", "") r.setState(Candidate) r.candidateFromLeadershipTransfer = true rpc.Respond(&TimeoutNowResponse{}, nil) diff --git a/raft_test.go b/raft_test.go index 26ef4ea55..6af480eb1 100644 --- a/raft_test.go +++ b/raft_test.go @@ -3,8 +3,10 @@ package raft import ( "bufio" "bytes" + "errors" "fmt" "io/ioutil" + "log" "os" "path/filepath" "reflect" @@ -1680,10 +1682,10 @@ LOOP: } // Ensure both have cleared their leader - if l := leader.Leader(); l != "" { + if l, id := leader.LeaderWithID(); l != "" && id != "" { t.Fatalf("bad: %v", l) } - if l := follower.Leader(); l != "" { + if l, id := follower.LeaderWithID(); l != "" && id != "" { t.Fatalf("bad: %v", l) } } @@ -1785,7 +1787,7 @@ func TestRaft_VerifyLeader_Fail(t *testing.T) { } // Ensure the known leader is cleared - if l := leader.Leader(); l != "" { + if l, _ := leader.LeaderWithID(); l != "" { t.Fatalf("bad: %v", l) } } @@ -1855,18 +1857,79 @@ func TestRaft_NotifyCh(t *testing.T) { } } -func TestRaft_Voting(t *testing.T) { +func TestRaft_AppendEntry(t *testing.T) { c := MakeCluster(3, t, nil) defer c.Close() followers := c.Followers() ldr := c.Leader() ldrT := c.trans[c.IndexOf(ldr)] + reqAppendEntries := AppendEntriesRequest{ + RPCHeader: ldr.getRPCHeader(), + Term: ldr.getCurrentTerm() + 1, + PrevLogEntry: 0, + PrevLogTerm: ldr.getCurrentTerm(), + Leader: nil, + Entries: []*Log{ + { + Index: 1, + Term: ldr.getCurrentTerm() + 1, + Type: LogCommand, + Data: []byte("log 1"), + }, + }, + LeaderCommitIndex: 90, + } + // a follower that thinks there's a leader should vote for that leader. + var resp AppendEntriesResponse + if err := ldrT.AppendEntries(followers[0].localID, followers[0].localAddr, &reqAppendEntries, &resp); err != nil { + t.Fatalf("RequestVote RPC failed %v", err) + } + + require.True(t, resp.Success) + + headers := ldr.getRPCHeader() + headers.ID = nil + headers.Addr = nil + reqAppendEntries = AppendEntriesRequest{ + RPCHeader: headers, + Term: ldr.getCurrentTerm() + 1, + PrevLogEntry: 0, + PrevLogTerm: ldr.getCurrentTerm(), + Leader: ldr.trans.EncodePeer(ldr.config().LocalID, ldr.localAddr), + Entries: []*Log{ + { + Index: 1, + Term: ldr.getCurrentTerm() + 1, + Type: LogCommand, + Data: []byte("log 1"), + }, + }, + LeaderCommitIndex: 90, + } + // a follower that thinks there's a leader should vote for that leader. + var resp2 AppendEntriesResponse + if err := ldrT.AppendEntries(followers[0].localID, followers[0].localAddr, &reqAppendEntries, &resp2); err != nil { + t.Fatalf("RequestVote RPC failed %v", err) + } + + require.True(t, resp2.Success) +} + +func TestRaft_VotingGrant_WhenLeaderAvailable(t *testing.T) { + conf := inmemConfig(t) + conf.ProtocolVersion = 3 + c := MakeCluster(3, t, conf) + defer c.Close() + followers := c.Followers() + ldr := c.Leader() + ldrT := c.trans[c.IndexOf(ldr)] + reqVote := RequestVoteRequest{ RPCHeader: ldr.getRPCHeader(), Term: ldr.getCurrentTerm() + 10, - Candidate: ldrT.EncodePeer(ldr.localID, ldr.localAddr), LastLogIndex: ldr.LastIndex(), + Candidate: ldrT.EncodePeer(ldr.localID, ldr.localAddr), LastLogTerm: ldr.getCurrentTerm(), LeadershipTransfer: false, } @@ -1879,6 +1942,7 @@ func TestRaft_Voting(t *testing.T) { t.Fatalf("expected vote to be granted, but wasn't %+v", resp) } // a follower that thinks there's a leader shouldn't vote for a different candidate + reqVote.Addr = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) reqVote.Candidate = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) if err := ldrT.RequestVote(followers[1].localID, followers[1].localAddr, &reqVote, &resp); err != nil { t.Fatalf("RequestVote RPC failed %v", err) @@ -1889,6 +1953,7 @@ func TestRaft_Voting(t *testing.T) { // a follower that thinks there's a leader, but the request has the leadership transfer flag, should // vote for a different candidate reqVote.LeadershipTransfer = true + reqVote.Addr = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) reqVote.Candidate = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) if err := ldrT.RequestVote(followers[1].localID, followers[1].localAddr, &reqVote, &resp); err != nil { t.Fatalf("RequestVote RPC failed %v", err) @@ -1908,9 +1973,9 @@ func TestRaft_ProtocolVersion_RejectRPC(t *testing.T) { reqVote := RequestVoteRequest{ RPCHeader: RPCHeader{ ProtocolVersion: ProtocolVersionMax + 1, + Addr: ldrT.EncodePeer(ldr.localID, ldr.localAddr), }, Term: ldr.getCurrentTerm() + 10, - Candidate: ldrT.EncodePeer(ldr.localID, ldr.localAddr), LastLogIndex: ldr.LastIndex(), LastLogTerm: ldr.getCurrentTerm(), } @@ -2007,6 +2072,35 @@ func TestRaft_ProtocolVersion_Upgrade_2_3(t *testing.T) { } } +func TestRaft_LeaderID_Propagated(t *testing.T) { + // Make a cluster on protocol version 3. + conf := inmemConfig(t) + c := MakeCluster(3, t, conf) + defer c.Close() + err := waitForLeader(c) + require.NoError(t, err) + + for _, n := range c.rafts { + require.Equal(t, ProtocolVersion(3), n.protocolVersion) + addr, id := n.LeaderWithID() + require.NotEmpty(t, id) + require.NotEmpty(t, addr) + } + for i := 0; i < 5; i++ { + future := c.Leader().Apply([]byte(fmt.Sprintf("test%d", i)), 0) + if err := future.Error(); err != nil { + t.Fatalf("[ERR] err: %v", err) + } + } + // Wait a while + time.Sleep(c.propagateTimeout) + + // Sanity check the cluster. + c.EnsureSame(t) + c.EnsureSamePeers(t) + c.EnsureLeader(t, c.Leader().localAddr) +} + func TestRaft_LeadershipTransferInProgress(t *testing.T) { r := &Raft{leaderState: leaderState{}} r.setupLeaderState() @@ -2472,6 +2566,173 @@ func TestRaft_InstallSnapshot_InvalidPeers(t *testing.T) { require.Contains(t, resp.Error.Error(), "failed to decode peers") } +func TestRaft_VoteNotGranted_WhenNodeNotInCluster(t *testing.T) { + // Make a cluster + c := MakeCluster(3, t, nil) + + defer c.Close() + + // Get the leader + leader := c.Leader() + + // Wait until we have 2 followers + limit := time.Now().Add(c.longstopTimeout) + var followers []*Raft + for time.Now().Before(limit) && len(followers) != 2 { + c.WaitEvent(nil, c.conf.CommitTimeout) + followers = c.GetInState(Follower) + } + if len(followers) != 2 { + t.Fatalf("expected two followers: %v", followers) + } + + // Remove a follower + followerRemoved := followers[0] + future := leader.RemoveServer(followerRemoved.localID, 0, 0) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait a while + time.Sleep(c.propagateTimeout) + + // Other nodes should have fewer peers + if configuration := c.getConfiguration(leader); len(configuration.Servers) != 2 { + t.Fatalf("too many peers") + } + if configuration := c.getConfiguration(followers[1]); len(configuration.Servers) != 2 { + t.Fatalf("too many peers") + } + waitForState(followerRemoved, Follower) + // The removed node should be still in Follower state + require.Equal(t, Follower, followerRemoved.getState()) + + // Prepare a Vote request from the removed follower + follower := followers[1] + followerRemovedT := c.trans[c.IndexOf(followerRemoved)] + reqVote := RequestVoteRequest{ + RPCHeader: followerRemoved.getRPCHeader(), + Term: followerRemoved.getCurrentTerm() + 10, + LastLogIndex: followerRemoved.LastIndex(), + LastLogTerm: followerRemoved.getCurrentTerm(), + LeadershipTransfer: false, + } + // a follower that thinks there's a leader should vote for that leader. + var resp RequestVoteResponse + + // partiton the leader to simulate an unstable cluster + c.Partition([]ServerAddress{leader.localAddr}) + time.Sleep(c.propagateTimeout) + + // wait for the remaining follower to trigger an election + waitForState(follower, Candidate) + require.Equal(t, Candidate, follower.getState()) + + // send a vote request from the removed follower to the Candidate follower + if err := followerRemovedT.RequestVote(follower.localID, follower.localAddr, &reqVote, &resp); err != nil { + t.Fatalf("RequestVote RPC failed %v", err) + } + + // the vote request should not be granted, because the voter is not part of the cluster anymore + if resp.Granted { + t.Fatalf("expected vote to not be granted, but it was %+v", resp) + } +} + +func TestRaft_VoteWithNoIDNoAddr(t *testing.T) { + // Make a cluster + c := MakeCluster(3, t, nil) + + defer c.Close() + waitForLeader(c) + + leader := c.Leader() + + // Wait until we have 2 followers + limit := time.Now().Add(c.longstopTimeout) + var followers []*Raft + for time.Now().Before(limit) && len(followers) != 2 { + c.WaitEvent(nil, c.conf.CommitTimeout) + followers = c.GetInState(Follower) + } + if len(followers) != 2 { + t.Fatalf("expected two followers: %v", followers) + } + + follower := followers[0] + + headers := follower.getRPCHeader() + headers.ID = nil + headers.Addr = nil + reqVote := RequestVoteRequest{ + RPCHeader: headers, + Term: follower.getCurrentTerm() + 10, + LastLogIndex: follower.LastIndex(), + LastLogTerm: follower.getCurrentTerm(), + Candidate: follower.trans.EncodePeer(follower.config().LocalID, follower.localAddr), + LeadershipTransfer: false, + } + // a follower that thinks there's a leader should vote for that leader. + var resp RequestVoteResponse + followerT := c.trans[c.IndexOf(followers[1])] + c.Partition([]ServerAddress{leader.localAddr}) + time.Sleep(c.propagateTimeout) + + // wait for the remaining follower to trigger an election + waitForState(follower, Candidate) + require.Equal(t, Candidate, follower.getState()) + // send a vote request from the removed follower to the Candidate follower + + if err := followerT.RequestVote(follower.localID, follower.localAddr, &reqVote, &resp); err != nil { + t.Fatalf("RequestVote RPC failed %v", err) + } + + // the vote request should not be granted, because the voter is not part of the cluster anymore + if !resp.Granted { + t.Fatalf("expected vote to not be granted, but it was %+v", resp) + } +} + +func waitForState(follower *Raft, state RaftState) { + count := 0 + for follower.getState() != state && count < 1000 { + count++ + time.Sleep(1 * time.Millisecond) + } +} + +func waitForLeader(c *cluster) error { + count := 0 + for count < 100 { + r := c.GetInState(Leader) + if len(r) >= 1 { + return nil + } + count++ + time.Sleep(50 * time.Millisecond) + } + return errors.New("no leader elected") +} + +func waitForNewLeader(c *cluster, id ServerID) error { + count := 0 + for count < 100 { + r := c.GetInState(Leader) + if len(r) >= 1 && r[0].localID != id { + return nil + } else { + if len(r) == 0 { + log.Println("no leader yet") + } else { + log.Printf("leader still %s\n", id) + } + } + count++ + time.Sleep(100 * time.Millisecond) + } + return errors.New("no leader elected") +} + func TestRaft_runFollower_State_Transition(t *testing.T) { type fields struct { conf *Config diff --git a/replication.go b/replication.go index f5e81924b..b3301b5c5 100644 --- a/replication.go +++ b/replication.go @@ -317,9 +317,10 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { // Setup the request req := InstallSnapshotRequest{ - RPCHeader: r.getRPCHeader(), - SnapshotVersion: meta.Version, - Term: s.currentTerm, + RPCHeader: r.getRPCHeader(), + SnapshotVersion: meta.Version, + Term: s.currentTerm, + // this is needed for retro compatibility, before RPCHeader.Addr was added Leader: r.trans.EncodePeer(r.localID, r.localAddr), LastLogIndex: meta.Index, LastLogTerm: meta.Term, @@ -381,8 +382,10 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { req := AppendEntriesRequest{ RPCHeader: r.getRPCHeader(), Term: s.currentTerm, - Leader: r.trans.EncodePeer(r.localID, r.localAddr), + // this is needed for retro compatibility, before RPCHeader.Addr was added + Leader: r.trans.EncodePeer(r.localID, r.localAddr), } + var resp AppendEntriesResponse for { // Wait for the next heartbeat interval or forced notify @@ -552,6 +555,7 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh, func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequest, nextIndex, lastIndex uint64) error { req.RPCHeader = r.getRPCHeader() req.Term = s.currentTerm + // this is needed for retro compatibility, before RPCHeader.Addr was added req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) req.LeaderCommitIndex = r.getCommitIndex() if err := r.setPreviousLog(req, nextIndex); err != nil { diff --git a/testing.go b/testing.go index 1dd61b94c..3c5d42e69 100644 --- a/testing.go +++ b/testing.go @@ -203,6 +203,15 @@ func (c *cluster) Merge(other *cluster) { c.rafts = append(c.rafts, other.rafts...) } +func (c *cluster) RemoveServer(id ServerID) { + for i, n := range c.rafts { + if n.localID == id { + c.rafts = append(c.rafts[:i], c.rafts[i+1:]...) + return + } + } +} + // notifyFailed will close the failed channel which can signal the goroutine // running the test that another goroutine has detected a failure in order to // terminate the test. @@ -529,15 +538,16 @@ func (c *cluster) EnsureLeader(t *testing.T, expect ServerAddress) { // think the leader is correct fail := false for _, r := range c.rafts { - leader := ServerAddress(r.Leader()) - if leader != expect { - if leader == "" { - leader = "[none]" + leaderAddr, _ := r.LeaderWithID() + + if leaderAddr != expect { + if leaderAddr == "" { + leaderAddr = "[none]" } if expect == "" { - c.logger.Error("peer sees incorrect leader", "peer", r, "leader", leader, "expected-leader", "[none]") + c.logger.Error("peer sees incorrect leader", "peer", r, "leader", leaderAddr, "expected-leader", "[none]") } else { - c.logger.Error("peer sees incorrect leader", "peer", r, "leader", leader, "expected-leader", expect) + c.logger.Error("peer sees incorrect leader", "peer", r, "leader", leaderAddr, "expected-leader", expect) } fail = true } diff --git a/transport_test.go b/transport_test.go index 5a59253df..255a6be79 100644 --- a/transport_test.go +++ b/transport_test.go @@ -41,7 +41,6 @@ func TestTransport_AppendEntries(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ Term: 10, - Leader: []byte("cartman"), PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ @@ -52,7 +51,9 @@ func TestTransport_AppendEntries(t *testing.T) { }, }, LeaderCommitIndex: 90, + RPCHeader: RPCHeader{Addr: []byte("cartman")}, } + resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -104,7 +105,6 @@ func TestTransport_AppendEntriesPipeline(t *testing.T) { // Make the RPC request args := AppendEntriesRequest{ Term: 10, - Leader: []byte("cartman"), PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*Log{ @@ -115,7 +115,9 @@ func TestTransport_AppendEntriesPipeline(t *testing.T) { }, }, LeaderCommitIndex: 90, + RPCHeader: RPCHeader{Addr: []byte("cartman")}, } + resp := AppendEntriesResponse{ Term: 4, LastLog: 90, @@ -185,9 +187,9 @@ func TestTransport_RequestVote(t *testing.T) { // Make the RPC request args := RequestVoteRequest{ Term: 20, - Candidate: []byte("butters"), LastLogIndex: 100, LastLogTerm: 19, + RPCHeader: RPCHeader{Addr: []byte("butters")}, } resp := RequestVoteResponse{ Term: 100, @@ -240,12 +242,13 @@ func TestTransport_InstallSnapshot(t *testing.T) { // Make the RPC request args := InstallSnapshotRequest{ Term: 10, - Leader: []byte("kyle"), LastLogIndex: 100, LastLogTerm: 9, Peers: []byte("blah blah"), Size: 10, + RPCHeader: RPCHeader{Addr: []byte("kyle")}, } + resp := InstallSnapshotResponse{ Term: 10, Success: true,