From 3907e87543aca4e5e6721e4712b9a0df93ff8c97 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 29 Sep 2021 10:32:42 -0400 Subject: [PATCH] add ID to `RequestVoteRequest` and check if the node is part of the cluster before granting a vote --- commands.go | 1 + config.go | 2 +- inmem_transport.go | 10 ++++++++++ net_transport.go | 10 ++++++++++ raft.go | 14 +++++++++++++ raft_test.go | 49 ++++++++++++++++++++++++++++++++++++++++++++++ transport.go | 6 ++++++ 7 files changed, 91 insertions(+), 1 deletion(-) diff --git a/commands.go b/commands.go index 3358a3284..07ba41212 100644 --- a/commands.go +++ b/commands.go @@ -73,6 +73,7 @@ type RequestVoteRequest struct { Term uint64 Candidate []byte + ID []byte // Used to ensure safety LastLogIndex uint64 LastLogTerm uint64 diff --git a/config.go b/config.go index 78dde9225..eda4e4525 100644 --- a/config.go +++ b/config.go @@ -91,7 +91,7 @@ const ( // ProtocolVersionMin is the minimum protocol version ProtocolVersionMin ProtocolVersion = 0 // ProtocolVersionMax is the maximum protocol version - ProtocolVersionMax = 3 + ProtocolVersionMax = 4 ) // SnapshotVersion is the version of snapshots that this server can understand. diff --git a/inmem_transport.go b/inmem_transport.go index b5bdecc73..7613cb630 100644 --- a/inmem_transport.go +++ b/inmem_transport.go @@ -194,6 +194,16 @@ func (i *InmemTransport) DecodePeer(buf []byte) ServerAddress { return ServerAddress(buf) } +// EncodePeer implements the Transport interface. +func (i *InmemTransport) EncodeID(id ServerID) []byte { + return []byte(id) +} + +// DecodePeer implements the Transport interface. +func (i *InmemTransport) DecodeID(buf []byte) ServerID { + return ServerID(buf) +} + // Connect is used to connect this transport to another transport for // a given peer name. This allows for local routing. func (i *InmemTransport) Connect(peer ServerAddress, t Transport) { diff --git a/net_transport.go b/net_transport.go index 3ac845290..b36fe8b11 100644 --- a/net_transport.go +++ b/net_transport.go @@ -466,6 +466,16 @@ func (n *NetworkTransport) DecodePeer(buf []byte) ServerAddress { return ServerAddress(buf) } +// EncodeID implements the Transport interface. +func (n *NetworkTransport) EncodeID(id ServerID) []byte { + return []byte(id) +} + +// DecodeID implements the Transport interface. +func (n *NetworkTransport) DecodeID(buf []byte) ServerID { + return ServerID(buf) +} + // TimeoutNow implements the Transport interface. func (n *NetworkTransport) TimeoutNow(id ServerID, target ServerAddress, args *TimeoutNowRequest, resp *TimeoutNowResponse) error { return n.genericRPC(id, target, rpcTimeoutNow, args, resp) diff --git a/raft.go b/raft.go index 155a89df4..03ac74600 100644 --- a/raft.go +++ b/raft.go @@ -1480,6 +1480,19 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // there is a known leader. But if the leader initiated a leadership transfer, // vote! candidate := r.trans.DecodePeer(req.Candidate) + + // Prior to version 4 the peer ID is not part of `RequestVoteRequest`, + // We assume that the peer is part of the configuration and skip this check + if r.protocolVersion > 3 { + candidateID := r.trans.DecodeID(req.ID) + // if the Servers list is empty that mean the cluster is very likely trying to bootstrap, + // Grant the vote + if len(r.configurations.latest.Servers) > 0 && !inConfig(r.configurations.latest, candidateID) { + r.logger.Warn("rejecting vote request since voter not part of the configuration", + "from", candidate) + return + } + } if leader := r.Leader(); leader != "" && leader != candidate && !req.LeadershipTransfer { r.logger.Warn("rejecting vote request since we have a leader", "from", candidate, @@ -1712,6 +1725,7 @@ func (r *Raft) electSelf() <-chan *voteResult { RPCHeader: r.getRPCHeader(), Term: r.getCurrentTerm(), Candidate: r.trans.EncodePeer(r.localID, r.localAddr), + ID: r.trans.EncodeID(r.localID), LastLogIndex: lastIdx, LastLogTerm: lastTerm, LeadershipTransfer: r.candidateFromLeadershipTransfer, diff --git a/raft_test.go b/raft_test.go index 20b4cd628..842414f97 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1740,6 +1740,53 @@ func TestRaft_Voting(t *testing.T) { RPCHeader: ldr.getRPCHeader(), Term: ldr.getCurrentTerm() + 10, Candidate: ldrT.EncodePeer(ldr.localID, ldr.localAddr), + ID: ldrT.EncodeID(ldr.localID), + LastLogIndex: ldr.LastIndex(), + LastLogTerm: ldr.getCurrentTerm(), + LeadershipTransfer: false, + } + // a follower that thinks there's a leader should vote for that leader. + var resp RequestVoteResponse + if err := ldrT.RequestVote(followers[0].localID, followers[0].localAddr, &reqVote, &resp); err != nil { + t.Fatalf("RequestVote RPC failed %v", err) + } + if !resp.Granted { + t.Fatalf("expected vote to be granted, but wasn't %+v", resp) + } + // a follower that thinks there's a leader shouldn't vote for a different candidate + reqVote.Candidate = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) + if err := ldrT.RequestVote(followers[1].localID, followers[1].localAddr, &reqVote, &resp); err != nil { + t.Fatalf("RequestVote RPC failed %v", err) + } + if resp.Granted { + t.Fatalf("expected vote not to be granted, but was %+v", resp) + } + // a follower that thinks there's a leader, but the request has the leadership transfer flag, should + // vote for a different candidate + reqVote.LeadershipTransfer = true + reqVote.Candidate = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) + if err := ldrT.RequestVote(followers[1].localID, followers[1].localAddr, &reqVote, &resp); err != nil { + t.Fatalf("RequestVote RPC failed %v", err) + } + if !resp.Granted { + t.Fatalf("expected vote to be granted, but wasn't %+v", resp) + } +} + +func TestRaft_Voting_portocolVersion3(t *testing.T) { + conf := inmemConfig(t) + conf.ProtocolVersion = 3 + c := MakeCluster(3, t, conf) + defer c.Close() + followers := c.Followers() + ldr := c.Leader() + ldrT := c.trans[c.IndexOf(ldr)] + + reqVote := RequestVoteRequest{ + RPCHeader: ldr.getRPCHeader(), + Term: ldr.getCurrentTerm() + 10, + Candidate: ldrT.EncodePeer(ldr.localID, ldr.localAddr), + ID: ldrT.EncodeID(ldr.localID), LastLogIndex: ldr.LastIndex(), LastLogTerm: ldr.getCurrentTerm(), LeadershipTransfer: false, @@ -2349,6 +2396,7 @@ func TestRaft_InstallSnapshot_InvalidPeers(t *testing.T) { func TestRaft_RemovedFollower_Vote(t *testing.T) { // Make a cluster c := MakeCluster(3, t, nil) + defer c.Close() // Get the leader @@ -2393,6 +2441,7 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) { RPCHeader: followerRemoved.getRPCHeader(), Term: followerRemoved.getCurrentTerm() + 10, Candidate: followerRemovedT.EncodePeer(followerRemoved.localID, followerRemoved.localAddr), + ID: followerRemovedT.EncodeID(followerRemoved.localID), LastLogIndex: followerRemoved.LastIndex(), LastLogTerm: followerRemoved.getCurrentTerm(), LeadershipTransfer: false, diff --git a/transport.go b/transport.go index b18d24593..3b9fbd67d 100644 --- a/transport.go +++ b/transport.go @@ -53,6 +53,12 @@ type Transport interface { // DecodePeer is used to deserialize a peer's address. DecodePeer([]byte) ServerAddress + // EncodePeer is used to serialize a peer's address. + EncodeID(id ServerID) []byte + + // DecodePeer is used to deserialize a peer's address. + DecodeID([]byte) ServerID + // SetHeartbeatHandler is used to setup a heartbeat handler // as a fast-pass. This is to avoid head-of-line blocking from // disk IO. If a Transport does not support this, it can simply