Skip to content

Commit

Permalink
[Issuse 13640][broker] Fix non persistent topic subscription error. (#…
Browse files Browse the repository at this point in the history
…13685)

Fixes #13640

If pulsar broker started with `allowAutoSubscriptionCreation=false`, there are no way to subscribe non persistent topic.

Add `isPersistent` check in `ServerCnx`.

(cherry picked from commit 3dbe418)
  • Loading branch information
dragonls authored and codelipenghui committed Jan 18, 2022
1 parent 44b617b commit 8c86b3d
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ public void createSubscription(@Suspended final AsyncResponse asyncResponse, @Pa
try {
validateTopicName(property, cluster, namespace, topic);
if (!topicName.isPersistent()) {
throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic"
throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic "
+ "can only be done through client");
}
internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,7 @@ public void createSubscription(
try {
validateTopicName(tenant, namespace, topic);
if (!topicName.isPersistent()) {
throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic"
throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic "
+ "can only be done through client");
}
internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {

boolean rejectSubscriptionIfDoesNotExist = isDurable
&& !service.isAllowAutoSubscriptionCreation(topicName.toString())
&& !topic.getSubscriptions().containsKey(subscriptionName);
&& !topic.getSubscriptions().containsKey(subscriptionName)
&& topic.isPersistent();

if (rejectSubscriptionIfDoesNotExist) {
return FutureUtil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ default boolean isSystemTopic() {
return false;
}

boolean isPersistent();

/* ------ Transaction related ------ */

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1075,4 +1075,9 @@ public CompletableFuture<Void> truncate() {
protected boolean isTerminated() {
return false;
}

@Override
public boolean isPersistent() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2927,6 +2927,11 @@ public boolean isSystemTopic() {
return false;
}

@Override
public boolean isPersistent() {
return true;
}

private synchronized void fence() {
isFenced = true;
ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,18 @@ public void testAutoSubscriptionCreationNamespaceDisallowOverridesBroker() throw
assertFalse(admin.topics().getSubscriptions(topicName.toString()).contains(subscriptionName));
}

@Test
public void testNonPersistentTopicSubscriptionCreationWithAutoCreationDisable() throws Exception {
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);

final String topicName = "non-persistent://prop/ns-abc/test-subtopic-" + testId.getAndIncrement();
final String subscriptionName = "test-subtopic-sub";

admin.topics().createNonPartitionedTopic(topicName);

// Subscribe operation should be successful
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
assertTrue(admin.topics().getSubscriptions(topicName).contains(subscriptionName));
}

}

0 comments on commit 8c86b3d

Please sign in to comment.