Skip to content

Commit

Permalink
Fix race condition of the SystemTopicBasedTopicPoliciesService (apach…
Browse files Browse the repository at this point in the history
…e#11097)

### Motivation

Currently, we are triggering the reader to read more messages not waiting for the init policies cache to complete,
This might lead to the init process got hasMessages=true but not able to read the message since the message has been
consumed by the read more entries process will lead to the `topic policy cache not init` exception.

Here are the details of the race condition:

https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L190

https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L248

https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L275
  • Loading branch information
codelipenghui authored and ciaocloud committed Oct 16, 2021
1 parent bf70cf4 commit 4de39b7
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
result.completeExceptionally(ex);
} else {
initPolicesCache(reader, result);
readMorePolicies(reader);
result.thenRun(() -> readMorePolicies(reader));
}
});
}
Expand Down Expand Up @@ -254,6 +254,7 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
}
refreshTopicPoliciesCache(msg);
notifyListener(msg);
if (log.isDebugEnabled()) {
log.debug("[{}] Loop next event reading for system topic.",
reader.getSystemTopic().getTopicName().getNamespaceObject());
Expand All @@ -264,9 +265,9 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
if (log.isDebugEnabled()) {
log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName());
}
future.complete(null);
policyCacheInitMap.computeIfPresent(
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
future.complete(null);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,16 +947,17 @@ public void testRestart() throws Exception {
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,200,false);
admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);

InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies;
Awaitility.await()
.untilAsserted(() -> Assert.assertNotNull(admin.topics().getInactiveTopicPolicies(topic)));
.untilAsserted(() -> Assert.assertEquals(admin.topics().getInactiveTopicPolicies(topic), finalInactiveTopicPolicies));

// restart broker, policy should still take effect
restartBroker();

// Trigger the cache init.
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();

InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies;

Awaitility.await()
.untilAsserted(() -> {
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Expand Down

0 comments on commit 4de39b7

Please sign in to comment.