diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml index eee28587d35795..916d92eb4ec624 100644 --- a/pulsar-metadata/pom.xml +++ b/pulsar-metadata/pom.xml @@ -31,7 +31,6 @@ pulsar-metadata Pulsar Metadata - org.apache.pulsar @@ -58,6 +57,12 @@ test + + org.awaitility + awaitility + test + + org.apache.bookkeeper bookkeeper-server diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java index f565a8f09e02d6..24c4bd709007f5 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java @@ -26,7 +26,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import lombok.Cleanup; @@ -40,6 +39,7 @@ import org.apache.pulsar.metadata.api.extended.SessionEvent; import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl; import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.awaitility.Awaitility; import org.testng.annotations.Test; public class ZKSessionTest extends BaseMetadataStoreTest { @@ -61,10 +61,10 @@ public void testDisconnection() throws Exception { assertEquals(e, SessionEvent.ConnectionLost); zks.start(); - e = sessionEvents.poll(10, TimeUnit.SECONDS); + e = sessionEvents.poll(20, TimeUnit.SECONDS); assertEquals(e, SessionEvent.Reconnected); - e = sessionEvents.poll(1, TimeUnit.SECONDS); + e = sessionEvents.poll(5, TimeUnit.SECONDS); assertNull(e); } @@ -130,9 +130,9 @@ public void testReacquireLocksAfterSessionLost() throws Exception { e = sessionEvents.poll(10, TimeUnit.SECONDS); assertEquals(e, SessionEvent.SessionReestablished); - Thread.sleep(2_000); - - assertFalse(lock.getLockExpiredFuture().isDone()); + Awaitility.await().untilAsserted(() -> { + assertFalse(lock.getLockExpiredFuture().isDone()); + }); assertTrue(store.get(path).join().isPresent()); } @@ -171,7 +171,10 @@ public void testReacquireLeadershipAfterSessionLost() throws Exception { e = sessionEvents.poll(10, TimeUnit.SECONDS); assertEquals(e, SessionEvent.SessionLost); - assertEquals(le1.getState(), LeaderElectionState.Leading); + Awaitility.await().untilAsserted(() -> { + assertEquals(le1.getState(), LeaderElectionState.Leading); + }); + les = leaderElectionEvents.poll(); assertNull(les); @@ -180,9 +183,9 @@ public void testReacquireLeadershipAfterSessionLost() throws Exception { e = sessionEvents.poll(10, TimeUnit.SECONDS); assertEquals(e, SessionEvent.SessionReestablished); - Thread.sleep(2_000); - - assertEquals(le1.getState(), LeaderElectionState.Leading); + Awaitility.await().untilAsserted(() -> { + assertEquals(le1.getState(), LeaderElectionState.Leading); + }); les = leaderElectionEvents.poll(); assertNull(les);