Skip to content

Commit

Permalink
ZOOKEEPER-3642: Fix potential data inconsistency due to DIFF sync aft…
Browse files Browse the repository at this point in the history
…er partial SNAP sync.

Based on apache#1224 ; fixed unit test build issue.

Author: Fangmin Lyu <fangmin@apache.org>
Author: Michael Han <hanm@apache.org>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Originally developed by Fangmin Lyu <fangmin@apache.org>

Closes apache#1515 from hanm/ZOOKEEPER-3642
  • Loading branch information
hanm authored and RokLenarcic committed Aug 31, 2022
1 parent 48581ad commit 0b973ef
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,9 @@ public void shutdown() {
*/
public synchronized void shutdown(boolean fullyShutDown) {
if (!canShutdown()) {
if (fullyShutDown && zkDb != null) {
zkDb.clear();
}
LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,9 @@ public void shutdown() {
closeSocket();
// shutdown previous zookeeper
if (zk != null) {
zk.shutdown();
// If we haven't finished SNAP sync, force fully shutdown
// to avoid potential inconsistency
zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1615,11 +1615,139 @@ public void testLearnerRequestForwardBehavior() throws Exception {
}
}

/**
* If learner failed to do SNAP sync with leader before it's writing
* the snapshot to disk, it's possible that it might have DIFF sync
* with new leader or itself being elected as a leader.
*
* This test is trying to guarantee there is no data inconsistency for
* this case.
*/
@Test
public void testDiffSyncAfterSnap() throws Exception {
final int ENSEMBLE_SERVERS = 3;
MainThread[] mt = new MainThread[ENSEMBLE_SERVERS];
ZooKeeper[] zk = new ZooKeeper[ENSEMBLE_SERVERS];

try {
// 1. start a quorum
final int[] clientPorts = new int[ENSEMBLE_SERVERS];
StringBuilder sb = new StringBuilder();
String server;

for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
clientPorts[i] = PortAssignment.unique();
server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
+ ":" + PortAssignment.unique()
+ ":participant;127.0.0.1:" + clientPorts[i];
sb.append(server + "\n");
}
String currentQuorumCfgSection = sb.toString();

// start servers
Context[] contexts = new Context[ENSEMBLE_SERVERS];
for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
final Context context = new Context();
contexts[i] = context;
mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
@Override
public TestQPMain getTestQPMain() {
return new CustomizedQPMain(context);
}
};
mt[i].start();
zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
}
waitForAll(zk, States.CONNECTED);
LOG.info("all servers started");

final String nodePath = "/testDiffSyncAfterSnap";

// 2. find leader and a follower
int leaderId = -1;
int followerA = -1;
for (int i = ENSEMBLE_SERVERS - 1; i >= 0; i--) {
if (mt[i].main.quorumPeer.leader != null) {
leaderId = i;
} else if (followerA == -1) {
followerA = i;
}
}

// 3. stop follower A
LOG.info("shutdown follower {}", followerA);
mt[followerA].shutdown();
waitForOne(zk[followerA], States.CONNECTING);

// 4. issue some traffic
int index = 0;
int numOfRequests = 10;
for (int i = 0; i < numOfRequests; i++) {
zk[leaderId].create(nodePath + index++,
new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

CustomQuorumPeer leaderQuorumPeer = (CustomQuorumPeer) mt[leaderId].main.quorumPeer;

// 5. inject fault to cause the follower exit when received NEWLEADER
contexts[followerA].newLeaderReceivedCallback = new NewLeaderReceivedCallback() {
boolean processed = false;
@Override
public void process() throws IOException {
if (processed) {
return;
}
processed = true;
System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "false");
throw new IOException("read timedout");
}
};

// 6. force snap sync once
LOG.info("force snapshot sync");
System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");

// 7. start follower A
mt[followerA].start();
waitForOne(zk[followerA], States.CONNECTED);
LOG.info("verify the nodes are exist in memory");
for (int i = 0; i < index; i++) {
assertNotNull(zk[followerA].exists(nodePath + i, false));
}

// 8. issue another request which will be persisted on disk
zk[leaderId].create(nodePath + index++,
new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

// wait some time to let this get written to disk
Thread.sleep(500);

// 9. reload data from disk and make sure it's still consistent
LOG.info("restarting follower {}", followerA);
mt[followerA].shutdown();
waitForOne(zk[followerA], States.CONNECTING);
mt[followerA].start();
waitForOne(zk[followerA], States.CONNECTED);

for (int i = 0; i < index; i++) {
assertNotNull(zk[followerA].exists(nodePath + i, false), "node " + i + " should exist");
}

} finally {
System.clearProperty(LearnerHandler.FORCE_SNAP_SYNC);
for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
mt[i].shutdown();
zk[i].close();
}
}
}

static class Context {

boolean quitFollowing = false;
boolean exitWhenAckNewLeader = false;
NewLeaderAckCallback newLeaderAckCallback = null;
NewLeaderReceivedCallback newLeaderReceivedCallback = null;

}

Expand All @@ -1629,6 +1757,10 @@ interface NewLeaderAckCallback {

}

interface NewLeaderReceivedCallback {
void process() throws IOException;
}

interface StartForwardingListener {

void start();
Expand Down Expand Up @@ -1702,6 +1834,14 @@ void writePacket(QuorumPacket pp, boolean flush) throws IOException {
}
super.writePacket(pp, flush);
}

@Override
void readPacket(QuorumPacket qp) throws IOException {
super.readPacket(qp);
if (qp.getType() == Leader.NEWLEADER && context.newLeaderReceivedCallback != null) {
context.newLeaderReceivedCallback.process();
}
}
};
}

Expand Down

0 comments on commit 0b973ef

Please sign in to comment.