Skip to content

Commit

Permalink
Make MetadataCacheTest reliable. (#10877)
Browse files Browse the repository at this point in the history
Avoid issue due to async update by using Awaitility

Co-authored-by: Surinder Singh <surinders@splunk.com>
(cherry picked from commit 4cf1008)
  • Loading branch information
sursingh authored and hangc0276 committed Aug 25, 2021
1 parent 587548e commit 9ea5180
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
8 changes: 7 additions & 1 deletion pulsar-metadata/pom.xml
Expand Up @@ -72,6 +72,12 @@
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -95,4 +101,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.Stat;
import org.awaitility.Awaitility;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -140,25 +141,28 @@ private void multiStoreAddDelete(List<MetadataCache<MyClass>> caches, int addOn,
MyClass value1 = new MyClass(testName, 1);

addCache.create(key1, value1).join();

// all time for changes to propagate to other caches
Thread.sleep(100);
for (MetadataCache<MyClass> cache: caches) {
if (cache == addCache) {
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
for (MetadataCache<MyClass> cache : caches) {
if (cache == addCache) {
assertEquals(cache.getIfCached(key1), Optional.of(value1));
}
assertEquals(cache.get(key1).join(), Optional.of(value1));
assertEquals(cache.getIfCached(key1), Optional.of(value1));
}
assertEquals(cache.get(key1).join(), Optional.of(value1));
assertEquals(cache.getIfCached(key1), Optional.of(value1));
}
});

delCache.delete(key1).join();

// all time for changes to propagate to other caches
Thread.sleep(100);
// The entry should get removed from all caches
for (MetadataCache<MyClass> cache: caches) {
assertEquals(cache.getIfCached(key1), Optional.empty());
assertEquals(cache.get(key1).join(), Optional.empty());
}
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
// The entry should get removed from all caches
for (MetadataCache<MyClass> cache : caches) {
assertEquals(cache.getIfCached(key1), Optional.empty());
assertEquals(cache.get(key1).join(), Optional.empty());
}
});
}

@Test(dataProvider = "impl")
Expand Down

0 comments on commit 9ea5180

Please sign in to comment.