Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node not part of the cluster is not allowed to vote #477

Merged
merged 32 commits into from Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8d01054
add test to verify that a removed node is not able to vote
dhiaayachi Sep 22, 2021
9ed9594
add some comments to the tests
dhiaayachi Sep 22, 2021
3a2e405
add wait loop to test
dhiaayachi Sep 22, 2021
8da6d8d
remove test related to removed node voting
dhiaayachi Sep 23, 2021
b4e2273
add test that check a removed node is not allowed to vote
dhiaayachi Sep 23, 2021
47da40c
add wait loop to make test more robust
dhiaayachi Sep 23, 2021
10c6f7c
increase timeout
dhiaayachi Sep 23, 2021
4aa895c
Revert "increase timeout"
dhiaayachi Sep 23, 2021
f22dae0
add ID to `RequestVoteRequest` and check if the node is part of the c…
dhiaayachi Sep 29, 2021
822cde2
use request protocol version to ensure we have the right version
dhiaayachi Sep 30, 2021
e7a1af3
add `ID` and `Addr` as part of `RPCHeader` and do not fill `Candidate…
dhiaayachi Oct 5, 2021
512d338
return `LeaderID` as part of the `Leader()` api
dhiaayachi Oct 7, 2021
9e1e9c2
fix docstring
dhiaayachi Oct 26, 2021
9e751db
fix retro compatibility with version 3
dhiaayachi Oct 26, 2021
8074fe5
fix string casting
dhiaayachi Oct 26, 2021
2768524
remove `EncodeID` and `DecodeID` from `transport` interface
dhiaayachi Oct 28, 2021
6ea0aa4
add missing `leaderID` initialization
dhiaayachi Oct 28, 2021
ecd60c2
add protocol version 3 to version 4 upgrade test
dhiaayachi Oct 28, 2021
bfc9e23
increase test timeout
dhiaayachi Oct 28, 2021
b3507f1
split test, and clean code
dhiaayachi Oct 28, 2021
aea65ca
add docstrings to `RPCHeader`
dhiaayachi Oct 28, 2021
4620c71
Apply suggestions from code review
dhiaayachi Nov 24, 2021
86d22fa
Fix comment
dhiaayachi Nov 24, 2021
3621401
fix review comments
dhiaayachi Nov 26, 2021
8631c97
do not increment protocolVersion to 4 and rely on Addr/ID field being…
dhiaayachi Nov 29, 2021
634a602
deprecate `Leader` in favor of `LeaderWithID`
dhiaayachi Nov 30, 2021
a8af0db
remove duplicate test and code clean up
dhiaayachi Dec 1, 2021
d4932d0
add deprecation for `Leader` and `Candidate` fields
dhiaayachi Dec 1, 2021
86c4d96
fix deprecation comments
dhiaayachi Dec 2, 2021
eff27ce
Merge branch 'main' into noncluster-node-vote
dhiaayachi Jan 5, 2022
4fa2eb0
check if node is voter instead if it's part of the cluster, nonVoter …
dhiaayachi Jan 5, 2022
0e724f0
add deprecation notice for Leader() API.
dhiaayachi Mar 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 18 additions & 4 deletions api.go
Expand Up @@ -111,8 +111,10 @@ type Raft struct {
lastContact time.Time
lastContactLock sync.RWMutex

// Leader is the current cluster leader
leader ServerAddress
// leaderAddr is the current cluster leader Address
leaderAddr ServerAddress
// LeaderID is the current cluster leader ID
leaderID ServerID
leaderLock sync.RWMutex

// leaderCh is used to notify of leadership changes
Expand Down Expand Up @@ -716,11 +718,23 @@ func (r *Raft) BootstrapCluster(configuration Configuration) Future {
// Leader is used to return the current leader of the cluster.
// It may return empty string if there is no current leader
// or the leader is unknown.
// Deprecated: Leader is deprecated, use LeaderWithID instead.
func (r *Raft) Leader() ServerAddress {
r.leaderLock.RLock()
leader := r.leader
leaderAddr := r.leaderAddr
r.leaderLock.RUnlock()
return leader
return leaderAddr
}

// LeaderWithID is used to return the current leader address and ID of the cluster.
// It may return empty strings if there is no current leader
// or the leader is unknown.
func (r *Raft) LeaderWithID() (ServerAddress, ServerID) {
r.leaderLock.RLock()
leaderAddr := r.leaderAddr
leaderID := r.leaderID
r.leaderLock.RUnlock()
return leaderAddr, leaderID
}

// Apply is used to apply a command to the FSM in a highly consistent
Expand Down
4 changes: 4 additions & 0 deletions commands.go
Expand Up @@ -8,6 +8,10 @@ type RPCHeader struct {
// ProtocolVersion is the version of the protocol the sender is
// speaking.
ProtocolVersion ProtocolVersion
// ID is the ServerID of the node sending the RPC Request or Response
ID []byte
// Addr is the ServerAddr of the node sending the RPC Request or Response
Addr []byte
dhiaayachi marked this conversation as resolved.
Show resolved Hide resolved
}

// WithRPCHeader is an interface that exposes the RPC header.
Expand Down
8 changes: 7 additions & 1 deletion fuzzy/verifier.go
Expand Up @@ -45,7 +45,13 @@ func (v *appendEntriesVerifier) PreRequestVote(src, target string, rv *raft.Requ

func (v *appendEntriesVerifier) PreAppendEntries(src, target string, req *raft.AppendEntriesRequest) (*raft.AppendEntriesResponse, error) {
term := req.Term
ldr := string(req.Leader)
var ldr string
if len(req.RPCHeader.Addr) > 0 {
ldr = string(req.RPCHeader.Addr)
} else {
ldr = string(req.Leader)
}
Comment on lines +49 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to do this? Any reason not to continue to use req.Leader? Why do we prefer the new RPCHeader.Addr ?

If we are trying to phase it out, we should document that on the Leader field.

Same question about the other two places we look at Leader or Candidate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should aim to phaseout Leader and Candidate parameters, it make more sense to infer those values from the RPCHeader now and removing those will reduce the on the wire packet size sensibly. Also if we keep them it mean we need to document/test special cases behaviour (different values, Addr empty and Candidate not...).
I will add the deprecation comment for both Candidate and Leader


if ldr != src {
v.Lock()
defer v.Unlock()
Expand Down
7 changes: 6 additions & 1 deletion net_transport.go
Expand Up @@ -567,8 +567,13 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en
}
rpc.Command = &req

leaderAddr := req.RPCHeader.Addr
if len(leaderAddr) == 0 {
leaderAddr = req.Leader
}

// Check if this is a heartbeat
if req.Term != 0 && req.Leader != nil &&
if req.Term != 0 && leaderAddr != nil &&
req.PrevLogEntry == 0 && req.PrevLogTerm == 0 &&
len(req.Entries) == 0 && req.LeaderCommitIndex == 0 {
isHeartbeat = true
Expand Down
27 changes: 18 additions & 9 deletions net_transport_test.go
Expand Up @@ -35,7 +35,6 @@ func TestNetworkTransport_CloseStreams(t *testing.T) {
// Make the RPC request
args := AppendEntriesRequest{
Term: 10,
Leader: []byte("cartman"),
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
Expand All @@ -46,7 +45,9 @@ func TestNetworkTransport_CloseStreams(t *testing.T) {
},
},
LeaderCommitIndex: 90,
RPCHeader: RPCHeader{Addr: []byte("cartman")},
}

resp := AppendEntriesResponse{
Term: 4,
LastLog: 90,
Expand Down Expand Up @@ -138,9 +139,11 @@ func TestNetworkTransport_Heartbeat_FastPath(t *testing.T) {

// Make the RPC request
args := AppendEntriesRequest{
Term: 10,
Leader: []byte("cartman"),
Term: 10,
RPCHeader: RPCHeader{ProtocolVersion: ProtocolVersionMax, Addr: []byte("cartman")},
Leader: []byte("cartman"),
}

resp := AppendEntriesResponse{
Term: 4,
LastLog: 90,
Expand Down Expand Up @@ -197,7 +200,6 @@ func TestNetworkTransport_AppendEntries(t *testing.T) {
// Make the RPC request
args := AppendEntriesRequest{
Term: 10,
Leader: []byte("cartman"),
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
Expand All @@ -208,7 +210,9 @@ func TestNetworkTransport_AppendEntries(t *testing.T) {
},
},
LeaderCommitIndex: 90,
RPCHeader: RPCHeader{Addr: []byte("cartman")},
}

resp := AppendEntriesResponse{
Term: 4,
LastLog: 90,
Expand Down Expand Up @@ -267,7 +271,6 @@ func TestNetworkTransport_AppendEntriesPipeline(t *testing.T) {
// Make the RPC request
args := AppendEntriesRequest{
Term: 10,
Leader: []byte("cartman"),
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
Expand All @@ -278,7 +281,9 @@ func TestNetworkTransport_AppendEntriesPipeline(t *testing.T) {
},
},
LeaderCommitIndex: 90,
RPCHeader: RPCHeader{Addr: []byte("cartman")},
}

resp := AppendEntriesResponse{
Term: 4,
LastLog: 90,
Expand Down Expand Up @@ -352,7 +357,6 @@ func TestNetworkTransport_AppendEntriesPipeline_CloseStreams(t *testing.T) {
// Make the RPC request
args := AppendEntriesRequest{
Term: 10,
Leader: []byte("cartman"),
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
Expand All @@ -363,7 +367,9 @@ func TestNetworkTransport_AppendEntriesPipeline_CloseStreams(t *testing.T) {
},
},
LeaderCommitIndex: 90,
RPCHeader: RPCHeader{Addr: []byte("cartman")},
}

resp := AppendEntriesResponse{
Term: 4,
LastLog: 90,
Expand Down Expand Up @@ -462,10 +468,11 @@ func TestNetworkTransport_RequestVote(t *testing.T) {
// Make the RPC request
args := RequestVoteRequest{
Term: 20,
Candidate: []byte("butters"),
LastLogIndex: 100,
LastLogTerm: 19,
RPCHeader: RPCHeader{Addr: []byte("butters")},
}

resp := RequestVoteResponse{
Term: 100,
Granted: false,
Expand Down Expand Up @@ -523,12 +530,13 @@ func TestNetworkTransport_InstallSnapshot(t *testing.T) {
// Make the RPC request
args := InstallSnapshotRequest{
Term: 10,
Leader: []byte("kyle"),
LastLogIndex: 100,
LastLogTerm: 9,
Peers: []byte("blah blah"),
Size: 10,
RPCHeader: RPCHeader{Addr: []byte("kyle")},
}

resp := InstallSnapshotResponse{
Term: 10,
Success: true,
Expand Down Expand Up @@ -631,7 +639,6 @@ func TestNetworkTransport_PooledConn(t *testing.T) {
// Make the RPC request
args := AppendEntriesRequest{
Term: 10,
Leader: []byte("cartman"),
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
Expand All @@ -642,7 +649,9 @@ func TestNetworkTransport_PooledConn(t *testing.T) {
},
},
LeaderCommitIndex: 90,
RPCHeader: RPCHeader{Addr: []byte("cartman")},
}

resp := AppendEntriesResponse{
Term: 4,
LastLog: 90,
Expand Down
3 changes: 2 additions & 1 deletion observer.go
Expand Up @@ -19,7 +19,8 @@ type Observation struct {

// LeaderObservation is used for the data when leadership changes.
type LeaderObservation struct {
Leader ServerAddress
LeaderAddr ServerAddress
LeaderID ServerID
}

// PeerObservation is sent to observers when peers change.
Expand Down