From 6430c541dc3971f58a26b915a8458812a646fe40 Mon Sep 17 00:00:00 2001 From: lipenghui Date: Sun, 27 Jun 2021 20:30:15 +0800 Subject: [PATCH] Fix race condition of the SystemTopicBasedTopicPoliciesService (#11097) ### Motivation Currently, we are triggering the reader to read more messages not waiting for the init policies cache to complete, This might lead to the init process got hasMessages=true but not able to read the message since the message has been consumed by the read more entries process will lead to the `topic policy cache not init` exception. Here are the details of the race condition: https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L190 https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L248 https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L275 (cherry picked from commit 81063c04870ba7fa26222e57e4d4e94145e0a1e0) --- .../broker/service/SystemTopicBasedTopicPoliciesService.java | 5 +++-- .../org/apache/pulsar/broker/admin/TopicPoliciesTest.java | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 87ff3b8710f04..e2d2e7450f03b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -188,7 +188,7 @@ public CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle name result.completeExceptionally(ex); } else { initPolicesCache(reader, result); - readMorePolicies(reader); + result.thenRun(() -> readMorePolicies(reader)); } }); } @@ -254,6 +254,7 @@ private void initPolicesCache(SystemTopicClient.Reader reader, Comp readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); } refreshTopicPoliciesCache(msg); + notifyListener(msg); if (log.isDebugEnabled()) { log.debug("[{}] Loop next event reading for system topic.", reader.getSystemTopic().getTopicName().getNamespaceObject()); @@ -264,9 +265,9 @@ private void initPolicesCache(SystemTopicClient.Reader reader, Comp if (log.isDebugEnabled()) { log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName()); } - future.complete(null); policyCacheInitMap.computeIfPresent( reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true); + future.complete(null); } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index f66f464490aa4..ecd53b276c6c7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -947,8 +947,9 @@ public void testRestart() throws Exception { new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,200,false); admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies); + InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies; Awaitility.await() - .untilAsserted(() -> Assert.assertNotNull(admin.topics().getInactiveTopicPolicies(topic))); + .untilAsserted(() -> Assert.assertEquals(admin.topics().getInactiveTopicPolicies(topic), finalInactiveTopicPolicies)); // restart broker, policy should still take effect restartBroker(); @@ -956,7 +957,7 @@ public void testRestart() throws Exception { // Trigger the cache init. Producer producer = pulsarClient.newProducer().topic(topic).create(); - InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies; + Awaitility.await() .untilAsserted(() -> { PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();