diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index eee28587d3579..916d92eb4ec62 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -31,7 +31,6 @@
pulsar-metadata
Pulsar Metadata
-
org.apache.pulsar
@@ -58,6 +57,12 @@
test
+
+ org.awaitility
+ awaitility
+ test
+
+
org.apache.bookkeeper
bookkeeper-server
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
index f565a8f09e02d..24c4bd709007f 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
@@ -26,7 +26,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
import lombok.Cleanup;
@@ -40,6 +39,7 @@
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.awaitility.Awaitility;
import org.testng.annotations.Test;
public class ZKSessionTest extends BaseMetadataStoreTest {
@@ -61,10 +61,10 @@ public void testDisconnection() throws Exception {
assertEquals(e, SessionEvent.ConnectionLost);
zks.start();
- e = sessionEvents.poll(10, TimeUnit.SECONDS);
+ e = sessionEvents.poll(20, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.Reconnected);
- e = sessionEvents.poll(1, TimeUnit.SECONDS);
+ e = sessionEvents.poll(5, TimeUnit.SECONDS);
assertNull(e);
}
@@ -130,9 +130,9 @@ public void testReacquireLocksAfterSessionLost() throws Exception {
e = sessionEvents.poll(10, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.SessionReestablished);
- Thread.sleep(2_000);
-
- assertFalse(lock.getLockExpiredFuture().isDone());
+ Awaitility.await().untilAsserted(() -> {
+ assertFalse(lock.getLockExpiredFuture().isDone());
+ });
assertTrue(store.get(path).join().isPresent());
}
@@ -171,7 +171,10 @@ public void testReacquireLeadershipAfterSessionLost() throws Exception {
e = sessionEvents.poll(10, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.SessionLost);
- assertEquals(le1.getState(), LeaderElectionState.Leading);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(le1.getState(), LeaderElectionState.Leading);
+ });
+
les = leaderElectionEvents.poll();
assertNull(les);
@@ -180,9 +183,9 @@ public void testReacquireLeadershipAfterSessionLost() throws Exception {
e = sessionEvents.poll(10, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.SessionReestablished);
- Thread.sleep(2_000);
-
- assertEquals(le1.getState(), LeaderElectionState.Leading);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(le1.getState(), LeaderElectionState.Leading);
+ });
les = leaderElectionEvents.poll();
assertNull(les);