diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml index 6e10fced3d3a7f..eee28587d35795 100644 --- a/pulsar-metadata/pom.xml +++ b/pulsar-metadata/pom.xml @@ -67,6 +67,12 @@ com.github.ben-manes.caffeine caffeine + + + org.awaitility + awaitility + test + @@ -90,4 +96,4 @@ - \ No newline at end of file + diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java index 4d8327042cec9c..2247f6bfc1667f 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java @@ -52,6 +52,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; import org.apache.pulsar.metadata.api.MetadataStoreFactory; import org.apache.pulsar.metadata.api.Stat; +import org.awaitility.Awaitility; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -140,25 +141,28 @@ private void multiStoreAddDelete(List> caches, int addOn, MyClass value1 = new MyClass(testName, 1); addCache.create(key1, value1).join(); + // all time for changes to propagate to other caches - Thread.sleep(100); - for (MetadataCache cache: caches) { - if (cache == addCache) { + Awaitility.await().ignoreExceptions().untilAsserted(() -> { + for (MetadataCache cache : caches) { + if (cache == addCache) { + assertEquals(cache.getIfCached(key1), Optional.of(value1)); + } + assertEquals(cache.get(key1).join(), Optional.of(value1)); assertEquals(cache.getIfCached(key1), Optional.of(value1)); } - assertEquals(cache.get(key1).join(), Optional.of(value1)); - assertEquals(cache.getIfCached(key1), Optional.of(value1)); - } + }); delCache.delete(key1).join(); // all time for changes to propagate to other caches - Thread.sleep(100); - // The entry should get removed from all caches - for (MetadataCache cache: caches) { - assertEquals(cache.getIfCached(key1), Optional.empty()); - assertEquals(cache.get(key1).join(), Optional.empty()); - } + Awaitility.await().ignoreExceptions().untilAsserted(() -> { + // The entry should get removed from all caches + for (MetadataCache cache : caches) { + assertEquals(cache.getIfCached(key1), Optional.empty()); + assertEquals(cache.get(key1).join(), Optional.empty()); + } + }); } @Test(dataProvider = "impl")