Skip to content

Commit

Permalink
Fix namespace policy override ignored when creating subscription (apa…
Browse files Browse the repository at this point in the history
…che#12699)

PR apache#6471 introduced the possibility to set auto topic creation settings at the namespace level
PR apache#6685 is a fix to take into account the auto topic creation setting when creating a subscription
but it uses the broker configuration and doesn't see the namespace level setting that was introduced in apache#6471

This fix uses the   method that fetches config overrides in ZK instead of the one from the broker configuration.
This way the  method of the pulsar-admin uses the value set for the namespace for auto-topic-creation

(cherry picked from commit 965a80f)
(cherry picked from commit a034d14)
  • Loading branch information
cbornet authored and nicoloboschi committed Dec 10, 2021
1 parent 4f4072b commit 8f6691c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2005,7 +2005,7 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
subscriptionName, targetMessageId, authoritative, replicated);
} else {
boolean allowAutoTopicCreation = pulsar().getConfiguration().isAllowAutoTopicCreation();
boolean allowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);
getPartitionedTopicMetadataAsync(topicName,
authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
Expand Down Expand Up @@ -2090,7 +2090,7 @@ private void internalCreateSubscriptionForNonPartitionedTopic(
AsyncResponse asyncResponse, String subscriptionName,
MessageIdImpl targetMessageId, boolean authoritative, boolean replicated) {

boolean isAllowAutoTopicCreation = pulsar().getConfiguration().isAllowAutoTopicCreation();
boolean isAllowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);

validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,21 @@ public void testNotAllowSubscriptionTopicCreation() throws Exception{

}

@Test
public void testAutoCreationNamespaceOverridesSubscriptionTopicCreation() throws Exception {
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
String topicString = "persistent://prop/ns-abc/non-partitioned-topic" + System.currentTimeMillis();
String subscriptionName = "non-partitioned-topic-sub";
final TopicName topicName = TopicName.get(topicString);
pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
AutoTopicCreationOverride.builder()
.allowAutoTopicCreation(true)
.topicType(TopicType.NON_PARTITIONED.toString())
.build());

admin.topics().createSubscription(topicString, subscriptionName, MessageId.earliest);
}

@Test
public void testMaxNumPartitionsPerPartitionedTopicTopicCreation() {
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
Expand Down

0 comments on commit 8f6691c

Please sign in to comment.