Skip to content

Commit

Permalink
make sure tests send a snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
briankassouf committed Apr 21, 2020
1 parent 6b56a61 commit 064e244
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 17 deletions.
21 changes: 21 additions & 0 deletions physical/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,26 @@ func connectPeers(nodes ...*RaftBackend) {
}
}

func stepDownLeader(t *testing.T, node *RaftBackend) {
t.Helper()

if err := node.raft.LeadershipTransfer().Error(); err != nil {
t.Fatal(err)
}

timeout := time.Now().Add(time.Second * 10)
for !time.Now().After(timeout) {
if err := node.raft.VerifyLeader().Error(); err != nil {
return
}
time.Sleep(100 * time.Millisecond)
}

t.Fatal("still leader")
}

func waitForLeader(t *testing.T, nodes ...*RaftBackend) *RaftBackend {
t.Helper()
timeout := time.Now().Add(time.Second * 10)
for !time.Now().After(timeout) {
for _, node := range nodes {
Expand All @@ -105,6 +124,7 @@ func waitForLeader(t *testing.T, nodes ...*RaftBackend) *RaftBackend {
}

func compareFSMs(t *testing.T, fsm1, fsm2 *FSM) {
t.Helper()
if err := compareFSMsWithErr(t, fsm1, fsm2); err != nil {
t.Fatal(err)
}
Expand All @@ -126,6 +146,7 @@ func compareFSMsWithErr(t *testing.T, fsm1, fsm2 *FSM) error {
}

func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB) error {
t.Helper()
db1 := make(map[string]string)
db2 := make(map[string]string)

Expand Down
35 changes: 18 additions & 17 deletions physical/raft/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,13 +367,22 @@ func TestRaft_Snapshot_ErrorRecovery(t *testing.T) {
}
}

// Take a snapshot
// Take a snapshot on each node to ensure we no longer have older logs
snapFuture := raft1.raft.Snapshot()
if err := snapFuture.Error(); err != nil {
t.Fatal(err)
}
// Advance FSM's index past configuration change
raft1.Put(context.Background(), &physical.Entry{

stepDownLeader(t, raft1)
leader := waitForLeader(t, raft1, raft2)

snapFuture = leader.raft.Snapshot()
if err := snapFuture.Error(); err != nil {
t.Fatal(err)
}

// Advance FSM's index past snapshot index
leader.Put(context.Background(), &physical.Entry{
Key: "key",
Value: []byte("value"),
})
Expand All @@ -382,7 +391,7 @@ func TestRaft_Snapshot_ErrorRecovery(t *testing.T) {
raft3.fsm.testSnapshotRestoreError = true

// Add raft3 to the cluster
addPeer(t, raft1, raft3)
addPeer(t, leader, raft3)

time.Sleep(2 * time.Second)

Expand All @@ -393,25 +402,16 @@ func TestRaft_Snapshot_ErrorRecovery(t *testing.T) {
}

// Ensure the databases are not equal
if err := compareFSMsWithErr(t, raft1.fsm, raft3.fsm); err == nil {
if err := compareFSMsWithErr(t, leader.fsm, raft3.fsm); err == nil {
t.Fatal("nil error")
}

// Remove error and make sure we can reconcile state
raft3.fsm.testSnapshotRestoreError = false

// Shutdown raft1
if err := raft1.TeardownCluster(nil); err != nil {
t.Fatal(err)
}

// Start Raft1
if err := raft1.SetupCluster(context.Background(), SetupOpts{}); err != nil {
t.Fatal(err)
}

connectPeers(raft1, raft2)
waitForLeader(t, raft1, raft2)
// Step down leader node
stepDownLeader(t, leader)
leader = waitForLeader(t, raft1, raft2)

// Start Raft3
if err := raft3.SetupCluster(context.Background(), SetupOpts{}); err != nil {
Expand All @@ -422,6 +422,7 @@ func TestRaft_Snapshot_ErrorRecovery(t *testing.T) {
waitForLeader(t, raft1, raft2)

time.Sleep(5 * time.Second)

// Make sure state gets re-replicated.
compareFSMs(t, raft1.fsm, raft3.fsm)
}
Expand Down

0 comments on commit 064e244

Please sign in to comment.