Skip to content

Commit

Permalink
add ID to RequestVoteRequest and check if the node is part of the c…
Browse files Browse the repository at this point in the history
…luster before granting a vote
  • Loading branch information
dhiaayachi committed Sep 29, 2021
1 parent 8e0a9c3 commit 3907e87
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 1 deletion.
1 change: 1 addition & 0 deletions commands.go
Expand Up @@ -73,6 +73,7 @@ type RequestVoteRequest struct {
Term uint64
Candidate []byte

ID []byte
// Used to ensure safety
LastLogIndex uint64
LastLogTerm uint64
Expand Down
2 changes: 1 addition & 1 deletion config.go
Expand Up @@ -91,7 +91,7 @@ const (
// ProtocolVersionMin is the minimum protocol version
ProtocolVersionMin ProtocolVersion = 0
// ProtocolVersionMax is the maximum protocol version
ProtocolVersionMax = 3
ProtocolVersionMax = 4
)

// SnapshotVersion is the version of snapshots that this server can understand.
Expand Down
10 changes: 10 additions & 0 deletions inmem_transport.go
Expand Up @@ -194,6 +194,16 @@ func (i *InmemTransport) DecodePeer(buf []byte) ServerAddress {
return ServerAddress(buf)
}

// EncodePeer implements the Transport interface.
func (i *InmemTransport) EncodeID(id ServerID) []byte {
return []byte(id)
}

// DecodePeer implements the Transport interface.
func (i *InmemTransport) DecodeID(buf []byte) ServerID {
return ServerID(buf)
}

// Connect is used to connect this transport to another transport for
// a given peer name. This allows for local routing.
func (i *InmemTransport) Connect(peer ServerAddress, t Transport) {
Expand Down
10 changes: 10 additions & 0 deletions net_transport.go
Expand Up @@ -466,6 +466,16 @@ func (n *NetworkTransport) DecodePeer(buf []byte) ServerAddress {
return ServerAddress(buf)
}

// EncodeID implements the Transport interface.
func (n *NetworkTransport) EncodeID(id ServerID) []byte {
return []byte(id)
}

// DecodeID implements the Transport interface.
func (n *NetworkTransport) DecodeID(buf []byte) ServerID {
return ServerID(buf)
}

// TimeoutNow implements the Transport interface.
func (n *NetworkTransport) TimeoutNow(id ServerID, target ServerAddress, args *TimeoutNowRequest, resp *TimeoutNowResponse) error {
return n.genericRPC(id, target, rpcTimeoutNow, args, resp)
Expand Down
14 changes: 14 additions & 0 deletions raft.go
Expand Up @@ -1480,6 +1480,19 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
// there is a known leader. But if the leader initiated a leadership transfer,
// vote!
candidate := r.trans.DecodePeer(req.Candidate)

// Prior to version 4 the peer ID is not part of `RequestVoteRequest`,
// We assume that the peer is part of the configuration and skip this check
if r.protocolVersion > 3 {
candidateID := r.trans.DecodeID(req.ID)
// if the Servers list is empty that mean the cluster is very likely trying to bootstrap,
// Grant the vote
if len(r.configurations.latest.Servers) > 0 && !inConfig(r.configurations.latest, candidateID) {
r.logger.Warn("rejecting vote request since voter not part of the configuration",
"from", candidate)
return
}
}
if leader := r.Leader(); leader != "" && leader != candidate && !req.LeadershipTransfer {
r.logger.Warn("rejecting vote request since we have a leader",
"from", candidate,
Expand Down Expand Up @@ -1712,6 +1725,7 @@ func (r *Raft) electSelf() <-chan *voteResult {
RPCHeader: r.getRPCHeader(),
Term: r.getCurrentTerm(),
Candidate: r.trans.EncodePeer(r.localID, r.localAddr),
ID: r.trans.EncodeID(r.localID),
LastLogIndex: lastIdx,
LastLogTerm: lastTerm,
LeadershipTransfer: r.candidateFromLeadershipTransfer,
Expand Down
49 changes: 49 additions & 0 deletions raft_test.go
Expand Up @@ -1740,6 +1740,53 @@ func TestRaft_Voting(t *testing.T) {
RPCHeader: ldr.getRPCHeader(),
Term: ldr.getCurrentTerm() + 10,
Candidate: ldrT.EncodePeer(ldr.localID, ldr.localAddr),
ID: ldrT.EncodeID(ldr.localID),
LastLogIndex: ldr.LastIndex(),
LastLogTerm: ldr.getCurrentTerm(),
LeadershipTransfer: false,
}
// a follower that thinks there's a leader should vote for that leader.
var resp RequestVoteResponse
if err := ldrT.RequestVote(followers[0].localID, followers[0].localAddr, &reqVote, &resp); err != nil {
t.Fatalf("RequestVote RPC failed %v", err)
}
if !resp.Granted {
t.Fatalf("expected vote to be granted, but wasn't %+v", resp)
}
// a follower that thinks there's a leader shouldn't vote for a different candidate
reqVote.Candidate = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr)
if err := ldrT.RequestVote(followers[1].localID, followers[1].localAddr, &reqVote, &resp); err != nil {
t.Fatalf("RequestVote RPC failed %v", err)
}
if resp.Granted {
t.Fatalf("expected vote not to be granted, but was %+v", resp)
}
// a follower that thinks there's a leader, but the request has the leadership transfer flag, should
// vote for a different candidate
reqVote.LeadershipTransfer = true
reqVote.Candidate = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr)
if err := ldrT.RequestVote(followers[1].localID, followers[1].localAddr, &reqVote, &resp); err != nil {
t.Fatalf("RequestVote RPC failed %v", err)
}
if !resp.Granted {
t.Fatalf("expected vote to be granted, but wasn't %+v", resp)
}
}

func TestRaft_Voting_portocolVersion3(t *testing.T) {
conf := inmemConfig(t)
conf.ProtocolVersion = 3
c := MakeCluster(3, t, conf)
defer c.Close()
followers := c.Followers()
ldr := c.Leader()
ldrT := c.trans[c.IndexOf(ldr)]

reqVote := RequestVoteRequest{
RPCHeader: ldr.getRPCHeader(),
Term: ldr.getCurrentTerm() + 10,
Candidate: ldrT.EncodePeer(ldr.localID, ldr.localAddr),
ID: ldrT.EncodeID(ldr.localID),
LastLogIndex: ldr.LastIndex(),
LastLogTerm: ldr.getCurrentTerm(),
LeadershipTransfer: false,
Expand Down Expand Up @@ -2349,6 +2396,7 @@ func TestRaft_InstallSnapshot_InvalidPeers(t *testing.T) {
func TestRaft_RemovedFollower_Vote(t *testing.T) {
// Make a cluster
c := MakeCluster(3, t, nil)

defer c.Close()

// Get the leader
Expand Down Expand Up @@ -2393,6 +2441,7 @@ func TestRaft_RemovedFollower_Vote(t *testing.T) {
RPCHeader: followerRemoved.getRPCHeader(),
Term: followerRemoved.getCurrentTerm() + 10,
Candidate: followerRemovedT.EncodePeer(followerRemoved.localID, followerRemoved.localAddr),
ID: followerRemovedT.EncodeID(followerRemoved.localID),
LastLogIndex: followerRemoved.LastIndex(),
LastLogTerm: followerRemoved.getCurrentTerm(),
LeadershipTransfer: false,
Expand Down
6 changes: 6 additions & 0 deletions transport.go
Expand Up @@ -53,6 +53,12 @@ type Transport interface {
// DecodePeer is used to deserialize a peer's address.
DecodePeer([]byte) ServerAddress

// EncodePeer is used to serialize a peer's address.
EncodeID(id ServerID) []byte

// DecodePeer is used to deserialize a peer's address.
DecodeID([]byte) ServerID

// SetHeartbeatHandler is used to setup a heartbeat handler
// as a fast-pass. This is to avoid head-of-line blocking from
// disk IO. If a Transport does not support this, it can simply
Expand Down

0 comments on commit 3907e87

Please sign in to comment.