diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml index 0741a46e85c27..11267b880cf55 100644 --- a/pulsar-metadata/pom.xml +++ b/pulsar-metadata/pom.xml @@ -72,6 +72,12 @@ com.github.ben-manes.caffeine caffeine + + + org.awaitility + awaitility + test + @@ -95,4 +101,4 @@ - \ No newline at end of file + diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java index 4d8327042cec9..2247f6bfc1667 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java @@ -52,6 +52,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; import org.apache.pulsar.metadata.api.MetadataStoreFactory; import org.apache.pulsar.metadata.api.Stat; +import org.awaitility.Awaitility; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -140,25 +141,28 @@ private void multiStoreAddDelete(List> caches, int addOn, MyClass value1 = new MyClass(testName, 1); addCache.create(key1, value1).join(); + // all time for changes to propagate to other caches - Thread.sleep(100); - for (MetadataCache cache: caches) { - if (cache == addCache) { + Awaitility.await().ignoreExceptions().untilAsserted(() -> { + for (MetadataCache cache : caches) { + if (cache == addCache) { + assertEquals(cache.getIfCached(key1), Optional.of(value1)); + } + assertEquals(cache.get(key1).join(), Optional.of(value1)); assertEquals(cache.getIfCached(key1), Optional.of(value1)); } - assertEquals(cache.get(key1).join(), Optional.of(value1)); - assertEquals(cache.getIfCached(key1), Optional.of(value1)); - } + }); delCache.delete(key1).join(); // all time for changes to propagate to other caches - Thread.sleep(100); - // The entry should get removed from all caches - for (MetadataCache cache: caches) { - assertEquals(cache.getIfCached(key1), Optional.empty()); - assertEquals(cache.get(key1).join(), Optional.empty()); - } + Awaitility.await().ignoreExceptions().untilAsserted(() -> { + // The entry should get removed from all caches + for (MetadataCache cache : caches) { + assertEquals(cache.getIfCached(key1), Optional.empty()); + assertEquals(cache.get(key1).join(), Optional.empty()); + } + }); } @Test(dataProvider = "impl")