Skip to content

Commit

Permalink
[fix][broker] Fix multiple race conditions in topic unloading and loa…
Browse files Browse the repository at this point in the history
…ding

- possible related issues are apache#5284, apache#14941 and apache#20526
  • Loading branch information
lhotari committed Jun 8, 2023
1 parent 19face6 commit 3b84a36
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 77 deletions.
Expand Up @@ -151,6 +151,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
protected final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
protected volatile Pair<String, List<EntryFilter>> entryFilters;

private volatile TopicCacheCleanupFunction cleanupFunction;

public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
Expand Down Expand Up @@ -1310,6 +1312,23 @@ public HierarchyTopicPolicies getHierarchyTopicPolicies() {
return topicPolicies;
}

@Override
public void registerTopicCacheCleanupFunction(TopicCacheCleanupFunction cleanupFunction) {
if (this.cleanupFunction != null) {
log.warn("Topic {} has already been cached. It should have been removed before re-adding.", topic);
}
this.cleanupFunction = cleanupFunction;
}

@Override
public void cleanupTopicCache(CompletableFuture<Optional<Topic>> topicFuture) {
TopicCacheCleanupFunction c = this.cleanupFunction;
this.cleanupFunction = null;
if (c != null) {
c.cleanup(topicFuture);
}
}

public void updateBrokerSubscriptionDispatchRate() {
topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(
subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
Expand Down

0 comments on commit 3b84a36

Please sign in to comment.