Skip to content

Commit

Permalink
merge: #9831
Browse files Browse the repository at this point in the history
9831: [Backport stable/8.0] fix: don't replicate snapshot if member already has the latest snapshot r=oleschoenburg a=backport-action

# Description
Backport of #9824 to `stable/8.0`.

relates to #9820

Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and lenaschoenburg committed Jul 18, 2022
2 parents 263d2d5 + 68900e7 commit d317691
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,12 @@ private boolean shouldReplicateSnapshot(final RaftMemberContext member) {
if (persistedSnapshot == null) {
return false;
}
if (member.getSnapshotIndex() >= persistedSnapshot.getIndex()) {
// Member has the latest snapshot, replicating the snapshot again wouldn't help.
// WARNING! This is load-bearing and not just an optimization. See
// https://github.com/camunda/zeebe/issues/9820 for context.
return false;
}
if (raft.getLog().getFirstIndex() > member.getCurrentIndex()) {
// Necessary events are not available anymore, we have to use the snapshot
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import static org.assertj.core.api.Assertions.assertThat;

import io.atomix.raft.RaftServer.Role;
import java.util.stream.Collectors;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -112,4 +115,47 @@ public void shouldReplicateSnapshotIfMemberLagAboveThreshold() throws Exception
assertThat(follower.getContext().getPersistedSnapshotStore().getCurrentSnapshotIndex())
.isEqualTo(snapshotIndex);
}

@Test
// Regression test for https://github.com/camunda/zeebe/issues/9820
public void shouldNotGetStuckInSnapshotReplicationLoop() throws Exception {
// given -- a cluster where follower's log starts at snapshot index
final var initialLeader = raftRule.getLeader().orElseThrow();

final var snapshotIndex = raftRule.appendEntries(5);
raftRule.doSnapshotOnMember(initialLeader, snapshotIndex, 3);
raftRule.appendEntries(5);

raftRule.getServers().stream()
.filter(s -> s.getRole() == Role.FOLLOWER)
.toList()
.forEach(
(follower) -> {
try {
raftRule.shutdownServer(follower);
// force data loss so that follower must receive the snapshot & has no preceding log
raftRule.triggerDataLossOnNode(follower.name());
raftRule.joinCluster(follower.name());
} catch (final Exception e) {
throw new RuntimeException(e);
}
});

// when -- letting the initial leader re-join after data loss
raftRule.shutdownServer(initialLeader);
raftRule.awaitNewLeader();
raftRule.triggerDataLossOnNode(initialLeader.name());
raftRule.joinCluster(initialLeader.name());

// then -- all members should have snapshot and latest log
raftRule.allNodesHaveSnapshotWithIndex(snapshotIndex);
Awaitility.await("All members should have the latest log")
.until(
() ->
raftRule.getServers().stream()
.map(s -> s.getContext().getLog().getLastIndex())
.collect(Collectors.toSet())
.size()
== 1);
}
}

0 comments on commit d317691

Please sign in to comment.