Skip to content

Commit

Permalink
node not part of the cluster is not allowed to vote (#477)
Browse files Browse the repository at this point in the history
* add test to verify that a removed node is not able to vote

* add some comments to the tests

* add wait loop to test

* remove test related to removed node voting

* add test that check a removed node is not allowed to vote

* add wait loop to make test more robust

* increase timeout

* Revert "increase timeout"

This reverts commit 316c59a.

* add ID to `RequestVoteRequest` and check if the node is part of the cluster before granting a vote

* use request protocol version to ensure we have the right version

* add `ID` and `Addr` as part of `RPCHeader` and do not fill `Candidate` and `Leader` in version > 3

* return `LeaderID` as part of the `Leader()` api

* fix docstring

* fix retro compatibility with version 3

* fix string casting

* remove `EncodeID` and `DecodeID` from `transport` interface

* add missing `leaderID` initialization

* add protocol version 3 to version 4 upgrade test

* increase test timeout

* split test, and clean code

* add docstrings to `RPCHeader`

* Apply suggestions from code review

Co-authored-by: Matt Keeler <mkeeler@users.noreply.github.com>

* Fix comment

Co-authored-by: Matt Keeler <mkeeler@users.noreply.github.com>

* fix review comments

* do not increment protocolVersion to 4 and rely on Addr/ID field being nil to fallback on the old fields.

* deprecate `Leader` in favor of `LeaderWithID`

* remove duplicate test and code clean up

* add deprecation for `Leader` and `Candidate` fields

* fix deprecation comments

* check if node is voter instead if it's part of the cluster, nonVoter should not have votes granted.

* add deprecation notice for Leader() API.

Co-authored-by: Matt Keeler <mkeeler@users.noreply.github.com>
  • Loading branch information
dhiaayachi and mkeeler committed Mar 7, 2022
1 parent 1979b11 commit 38cb186
Show file tree
Hide file tree
Showing 11 changed files with 425 additions and 66 deletions.
23 changes: 19 additions & 4 deletions api.go
Expand Up @@ -115,8 +115,10 @@ type Raft struct {
lastContact time.Time
lastContactLock sync.RWMutex

// Leader is the current cluster leader
leader ServerAddress
// leaderAddr is the current cluster leader Address
leaderAddr ServerAddress
// LeaderID is the current cluster leader ID
leaderID ServerID
leaderLock sync.RWMutex

// leaderCh is used to notify of leadership changes
Expand Down Expand Up @@ -736,13 +738,26 @@ func (r *Raft) BootstrapCluster(configuration Configuration) Future {
}

// Leader is used to return the current leader of the cluster.
// Deprecated: use LeaderWithID instead
// It may return empty string if there is no current leader
// or the leader is unknown.
// Deprecated: use LeaderWithID instead.
func (r *Raft) Leader() ServerAddress {
r.leaderLock.RLock()
leader := r.leader
leaderAddr := r.leaderAddr
r.leaderLock.RUnlock()
return leader
return leaderAddr
}

// LeaderWithID is used to return the current leader address and ID of the cluster.
// It may return empty strings if there is no current leader
// or the leader is unknown.
func (r *Raft) LeaderWithID() (ServerAddress, ServerID) {
r.leaderLock.RLock()
leaderAddr := r.leaderAddr
leaderID := r.leaderID
r.leaderLock.RUnlock()
return leaderAddr, leaderID
}

// Apply is used to apply a command to the FSM in a highly consistent
Expand Down
15 changes: 12 additions & 3 deletions commands.go
Expand Up @@ -8,6 +8,10 @@ type RPCHeader struct {
// ProtocolVersion is the version of the protocol the sender is
// speaking.
ProtocolVersion ProtocolVersion
// ID is the ServerID of the node sending the RPC Request or Response
ID []byte
// Addr is the ServerAddr of the node sending the RPC Request or Response
Addr []byte
}

// WithRPCHeader is an interface that exposes the RPC header.
Expand All @@ -21,7 +25,9 @@ type AppendEntriesRequest struct {
RPCHeader

// Provide the current term and leader
Term uint64
Term uint64

// Deprecated: use RPCHeader.Addr instead
Leader []byte

// Provide the previous entries for integrity checking
Expand Down Expand Up @@ -70,7 +76,9 @@ type RequestVoteRequest struct {
RPCHeader

// Provide the term and our id
Term uint64
Term uint64

// Deprecated: use RPCHeader.Addr instead
Candidate []byte

// Used to ensure safety
Expand Down Expand Up @@ -122,9 +130,10 @@ type InstallSnapshotRequest struct {
LastLogIndex uint64
LastLogTerm uint64

// Peer Set in the snapshot. This is deprecated in favor of Configuration
// Peer Set in the snapshot.
// but remains here in case we receive an InstallSnapshot from a leader
// that's running old code.
// Deprecated: This is deprecated in favor of Configuration
Peers []byte

// Cluster membership.
Expand Down
8 changes: 7 additions & 1 deletion fuzzy/verifier.go
Expand Up @@ -45,7 +45,13 @@ func (v *appendEntriesVerifier) PreRequestVote(src, target string, rv *raft.Requ

func (v *appendEntriesVerifier) PreAppendEntries(src, target string, req *raft.AppendEntriesRequest) (*raft.AppendEntriesResponse, error) {
term := req.Term
ldr := string(req.Leader)
var ldr string
if len(req.RPCHeader.Addr) > 0 {
ldr = string(req.RPCHeader.Addr)
} else {
ldr = string(req.Leader)
}

if ldr != src {
v.Lock()
defer v.Unlock()
Expand Down
7 changes: 6 additions & 1 deletion net_transport.go
Expand Up @@ -584,8 +584,13 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en
}
rpc.Command = &req

leaderAddr := req.RPCHeader.Addr
if len(leaderAddr) == 0 {
leaderAddr = req.Leader
}

// Check if this is a heartbeat
if req.Term != 0 && req.Leader != nil &&
if req.Term != 0 && leaderAddr != nil &&
req.PrevLogEntry == 0 && req.PrevLogTerm == 0 &&
len(req.Entries) == 0 && req.LeaderCommitIndex == 0 {
isHeartbeat = true
Expand Down
27 changes: 18 additions & 9 deletions net_transport_test.go
Expand Up @@ -35,7 +35,6 @@ func TestNetworkTransport_CloseStreams(t *testing.T) {
// Make the RPC request
args := AppendEntriesRequest{
Term: 10,
Leader: []byte("cartman"),
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
Expand All @@ -46,7 +45,9 @@ func TestNetworkTransport_CloseStreams(t *testing.T) {
},
},
LeaderCommitIndex: 90,
RPCHeader: RPCHeader{Addr: []byte("cartman")},
}

resp := AppendEntriesResponse{
Term: 4,
LastLog: 90,
Expand Down Expand Up @@ -138,9 +139,11 @@ func TestNetworkTransport_Heartbeat_FastPath(t *testing.T) {

// Make the RPC request
args := AppendEntriesRequest{
Term: 10,
Leader: []byte("cartman"),
Term: 10,
RPCHeader: RPCHeader{ProtocolVersion: ProtocolVersionMax, Addr: []byte("cartman")},
Leader: []byte("cartman"),
}

resp := AppendEntriesResponse{
Term: 4,
LastLog: 90,
Expand Down Expand Up @@ -197,7 +200,6 @@ func TestNetworkTransport_AppendEntries(t *testing.T) {
// Make the RPC request
args := AppendEntriesRequest{
Term: 10,
Leader: []byte("cartman"),
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
Expand All @@ -208,7 +210,9 @@ func TestNetworkTransport_AppendEntries(t *testing.T) {
},
},
LeaderCommitIndex: 90,
RPCHeader: RPCHeader{Addr: []byte("cartman")},
}

resp := AppendEntriesResponse{
Term: 4,
LastLog: 90,
Expand Down Expand Up @@ -267,7 +271,6 @@ func TestNetworkTransport_AppendEntriesPipeline(t *testing.T) {
// Make the RPC request
args := AppendEntriesRequest{
Term: 10,
Leader: []byte("cartman"),
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
Expand All @@ -278,7 +281,9 @@ func TestNetworkTransport_AppendEntriesPipeline(t *testing.T) {
},
},
LeaderCommitIndex: 90,
RPCHeader: RPCHeader{Addr: []byte("cartman")},
}

resp := AppendEntriesResponse{
Term: 4,
LastLog: 90,
Expand Down Expand Up @@ -352,7 +357,6 @@ func TestNetworkTransport_AppendEntriesPipeline_CloseStreams(t *testing.T) {
// Make the RPC request
args := AppendEntriesRequest{
Term: 10,
Leader: []byte("cartman"),
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
Expand All @@ -363,7 +367,9 @@ func TestNetworkTransport_AppendEntriesPipeline_CloseStreams(t *testing.T) {
},
},
LeaderCommitIndex: 90,
RPCHeader: RPCHeader{Addr: []byte("cartman")},
}

resp := AppendEntriesResponse{
Term: 4,
LastLog: 90,
Expand Down Expand Up @@ -462,10 +468,11 @@ func TestNetworkTransport_RequestVote(t *testing.T) {
// Make the RPC request
args := RequestVoteRequest{
Term: 20,
Candidate: []byte("butters"),
LastLogIndex: 100,
LastLogTerm: 19,
RPCHeader: RPCHeader{Addr: []byte("butters")},
}

resp := RequestVoteResponse{
Term: 100,
Granted: false,
Expand Down Expand Up @@ -523,12 +530,13 @@ func TestNetworkTransport_InstallSnapshot(t *testing.T) {
// Make the RPC request
args := InstallSnapshotRequest{
Term: 10,
Leader: []byte("kyle"),
LastLogIndex: 100,
LastLogTerm: 9,
Peers: []byte("blah blah"),
Size: 10,
RPCHeader: RPCHeader{Addr: []byte("kyle")},
}

resp := InstallSnapshotResponse{
Term: 10,
Success: true,
Expand Down Expand Up @@ -631,7 +639,6 @@ func TestNetworkTransport_PooledConn(t *testing.T) {
// Make the RPC request
args := AppendEntriesRequest{
Term: 10,
Leader: []byte("cartman"),
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
Expand All @@ -642,7 +649,9 @@ func TestNetworkTransport_PooledConn(t *testing.T) {
},
},
LeaderCommitIndex: 90,
RPCHeader: RPCHeader{Addr: []byte("cartman")},
}

resp := AppendEntriesResponse{
Term: 4,
LastLog: 90,
Expand Down
3 changes: 2 additions & 1 deletion observer.go
Expand Up @@ -19,7 +19,8 @@ type Observation struct {

// LeaderObservation is used for the data when leadership changes.
type LeaderObservation struct {
Leader ServerAddress
LeaderAddr ServerAddress
LeaderID ServerID
}

// PeerObservation is sent to observers when peers change.
Expand Down

0 comments on commit 38cb186

Please sign in to comment.