Skip to content

Commit

Permalink
Fixed ZKSessionTest.testReacquireLocksAfterSessionLost (#11886)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Sep 1, 2021
1 parent f42dd55 commit bc49191
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ private void handleSessionEvent(SessionEvent se) {
if (se == SessionEvent.SessionReestablished) {
log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
locks.values().forEach(ResourceLockImpl::revalidate);
} else if (se == SessionEvent.Reconnected) {
log.info("Metadata store connection has been re-established. Revalidating locks that were pending.");
locks.values().forEach(ResourceLockImpl::revalidateIfNeededAfterReconnection);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
private volatile T value;
private long version;
private final CompletableFuture<Void> expiredFuture;
private boolean revalidateAfterReconnection = false;

private enum State {
Init,
Expand Down Expand Up @@ -197,14 +198,30 @@ synchronized void lockWasInvalidated() {
.thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
.exceptionally(ex -> {
synchronized (ResourceLockImpl.this) {
log.warn("Failed to revalidate the lock at {}. Marked as expired", path);
state = State.Released;
expiredFuture.complete(null);
if (ex.getCause() instanceof BadVersionException) {
log.warn("Failed to revalidate the lock at {}. Marked as expired", path);
state = State.Released;
expiredFuture.complete(null);
} else {
// We failed to revalidate the lock due to connectivity issue
// Continue assuming we hold the lock, until we can revalidate it, either
// on Reconnected or SessionReestablished events.
log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection", path,
ex.getCause().getMessage());
}
}
return null;
});
}

synchronized void revalidateIfNeededAfterReconnection() {
if (revalidateAfterReconnection) {
revalidateAfterReconnection = false;
log.warn("Revalidate lock at {} after reconnection", path);
revalidate();
}
}

synchronized CompletableFuture<Void> revalidate() {
return store.get(path)
.thenCompose(optGetResult -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void testReacquireLocksAfterSessionLost() throws Exception {

ResourceLock<String> lock = lm1.acquireLock(path, "value-1").join();


zks.expireSession(((ZKMetadataStore) store).getZkSessionId());

SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS);
Expand All @@ -127,11 +128,8 @@ public void testReacquireLocksAfterSessionLost() throws Exception {
e = sessionEvents.poll(10, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.SessionReestablished);

Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertFalse(lock.getLockExpiredFuture().isDone());
});

assertTrue(store.get(path).join().isPresent());
Awaitility.await().untilAsserted(() -> assertTrue(store.get(path).join().isPresent()));
assertFalse(lock.getLockExpiredFuture().isDone());
}

@Test
Expand Down

0 comments on commit bc49191

Please sign in to comment.