Skip to content

Commit

Permalink
Fix race condition of the SystemTopicBasedTopicPoliciesService (#11097)
Browse files Browse the repository at this point in the history
Currently, we are triggering the reader to read more messages not waiting for the init policies cache to complete,
This might lead to the init process got hasMessages=true but not able to read the message since the message has been
consumed by the read more entries process will lead to the `topic policy cache not init` exception.

Here are the details of the race condition:

https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L190

https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L248

https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L275

(cherry picked from commit 81063c0)
  • Loading branch information
codelipenghui committed Jun 27, 2021
1 parent 35a1b75 commit 6cecb5a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
result.completeExceptionally(ex);
} else {
initPolicesCache(reader, result);
readMorePolicies(reader);
result.thenRun(() -> readMorePolicies(reader));
}
});
}
Expand Down Expand Up @@ -251,6 +251,7 @@ private void initPolicesCache(SystemTopicClient.Reader reader, CompletableFuture
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
}
refreshTopicPoliciesCache(msg);
notifyListener(msg);
if (log.isDebugEnabled()) {
log.debug("[{}] Loop next event reading for system topic.",
reader.getSystemTopic().getTopicName().getNamespaceObject());
Expand All @@ -261,8 +262,9 @@ private void initPolicesCache(SystemTopicClient.Reader reader, CompletableFuture
if (log.isDebugEnabled()) {
log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName());
}
policyCacheInitMap.computeIfPresent(
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
future.complete(null);
policyCacheInitMap.computeIfPresent(reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,17 +627,18 @@ public void testRestart() throws Exception {
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,200,false);
admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);

Awaitility.await().atMost(3, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertNotNull(admin.topics().getInactiveTopicPolicies(topic)));
InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies;
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topics().getInactiveTopicPolicies(topic), finalInactiveTopicPolicies));

// restart broker, policy should still take effect
restartBroker();

// Trigger the cache init.
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();

InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies;
Awaitility.await().atMost(5, TimeUnit.SECONDS)

Awaitility.await()
.untilAsserted(() -> {
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Assert.assertEquals(persistentTopic.getInactiveTopicPolicies(), finalInactiveTopicPolicies);
Expand Down

0 comments on commit 6cecb5a

Please sign in to comment.