Skip to content

Commit

Permalink
[Issue 10860][pulsar-metadata] Ensure metadata cache consistency acro…
Browse files Browse the repository at this point in the history
…ss brokers (apache#10862)

* Added metadata cache test to simulate multi broker cache

* fix create and delete ops on cache

1. During create we should add a watch for the path in zookeeper. Without this
we will not be notified if the znode is changed on another brokers

2. similarly when deleting, the cache should be invalidated. But we shouldn't add an
entry to the cache. This could get added again on some other broker. In that
case we need to go a fetch the entry from the zookeeper. Adding an empty
entry to the cache prevents that.

* Address review comments

Also add a small delay in test to allow notifications to propagate to other
caches. Without this the tests are occasionally failing

Co-authored-by: Surinder Singh <surinders@splunk.com>
  • Loading branch information
2 people authored and yangl committed Jun 23, 2021
1 parent 42021e4 commit 9c12151
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.function.Supplier;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
Expand All @@ -50,6 +51,7 @@
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.Stat;

@Slf4j
public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notification> {

private static final long CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5);
Expand Down Expand Up @@ -209,8 +211,17 @@ public CompletableFuture<Void> create(String path, T value) {
store.put(path, content, Optional.of(-1L))
.thenAccept(stat -> {
// Make sure we have the value cached before the operation is completed
objCache.put(path, FutureUtils.value(Optional.of(new CacheGetResult<>(value, stat))));
future.complete(null);
// In addition to caching the value, we need to add a watch on the path,
// so when/if it changes on any other node, we are notified and we can
// update the cache
objCache.get(path).whenComplete( (stat2, ex) -> {
if (ex == null) {
future.complete(null);
} else {
log.error("Exception while getting path {}", path, ex);
future.completeExceptionally(ex.getCause());
}
});
}).exceptionally(ex -> {
if (ex.getCause() instanceof BadVersionException) {
// Use already exists exception to provide more self-explanatory error message
Expand All @@ -226,11 +237,7 @@ public CompletableFuture<Void> create(String path, T value) {

@Override
public CompletableFuture<Void> delete(String path) {
return store.delete(path, Optional.empty())
.thenAccept(v -> {
// Mark in the cache that the object was removed
objCache.put(path, FutureUtils.value(Optional.empty()));
});
return store.delete(path, Optional.empty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
Expand All @@ -50,6 +52,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.Stat;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class MetadataCacheTest extends BaseMetadataStoreTest {
Expand Down Expand Up @@ -90,6 +93,74 @@ public void emptyCacheTest(String provider, String url) throws Exception {
}
}

@DataProvider(name = "zk")
public Object[][] zkimplementations() {
return new Object[][] {
{ "ZooKeeper", zks.getConnectionString() },
};
}

@Test(dataProvider = "zk")
public void crossStoreUpdates(String provider, String url) throws Exception {
@Cleanup
MetadataStore store1 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());

@Cleanup
MetadataStore store2 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());

@Cleanup
MetadataStore store3 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());

MetadataCache<MyClass> objCache1 = store1.getMetadataCache(MyClass.class);
MetadataCache<MyClass> objCache2 = store2.getMetadataCache(MyClass.class);
MetadataCache<MyClass> objCache3 = store3.getMetadataCache(MyClass.class);

List<MetadataCache<MyClass>> allCaches = new ArrayList<>();
allCaches.add(objCache1);
allCaches.add(objCache2);
allCaches.add(objCache3);

// Add on one cache and remove from another
multiStoreAddDelete(allCaches, 0, 1, "add cache0 del cache1");
// retry same order to rule out any stale state
multiStoreAddDelete(allCaches, 0, 1, "add cache0 del cache1");
// Reverse the operations
multiStoreAddDelete(allCaches, 1, 0, "add cache1 del cache0");
// Ensure that working on same cache continues to work.
multiStoreAddDelete(allCaches, 1, 1, "add cache1 del cache1");
}

private void multiStoreAddDelete(List<MetadataCache<MyClass>> caches, int addOn, int delFrom, String testName) throws InterruptedException {
MetadataCache<MyClass> addCache = caches.get(addOn);
MetadataCache<MyClass> delCache = caches.get(delFrom);

String key1 = "/test-key1";
assertEquals(addCache.getIfCached(key1), Optional.empty());

MyClass value1 = new MyClass(testName, 1);

addCache.create(key1, value1).join();
// all time for changes to propagate to other caches
Thread.sleep(100);
for (MetadataCache<MyClass> cache: caches) {
if (cache == addCache) {
assertEquals(cache.getIfCached(key1), Optional.of(value1));
}
assertEquals(cache.get(key1).join(), Optional.of(value1));
assertEquals(cache.getIfCached(key1), Optional.of(value1));
}

delCache.delete(key1).join();

// all time for changes to propagate to other caches
Thread.sleep(100);
// The entry should get removed from all caches
for (MetadataCache<MyClass> cache: caches) {
assertEquals(cache.getIfCached(key1), Optional.empty());
assertEquals(cache.get(key1).join(), Optional.empty());
}
}

@Test(dataProvider = "impl")
public void insertionDeletionWitGenericType(String provider, String url) throws Exception {
@Cleanup
Expand Down

0 comments on commit 9c12151

Please sign in to comment.