diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index c605f2388af79..a400fae7912ec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -72,13 +72,17 @@ public class NonPersistentSubscription implements Subscription { private final LongAdder bytesOutFromRemovedConsumers = new LongAdder(); private final LongAdder msgOutFromRemovedConsumer = new LongAdder(); - public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName) { + // If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription. + private final boolean isDurable; + + public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName, boolean isDurable) { this.topic = topic; this.topicName = topic.getName(); this.subName = subscriptionName; this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString(); IS_FENCED_UPDATER.set(this, FALSE); this.lastActive = System.currentTimeMillis(); + this.isDurable = isDurable; } @Override @@ -200,6 +204,9 @@ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor ConsumerStatsImpl stats = consumer.getStats(); bytesOutFromRemovedConsumers.add(stats.bytesOutCounter); msgOutFromRemovedConsumer.add(stats.msgOutCounter); + if (!isDurable) { + topic.unsubscribe(subName); + } // invalid consumer remove will throw an exception // decrement usage is triggered only for valid consumer close diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index a9c0f7f97b6c9..90b49639c178c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -270,7 +270,7 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs } NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName, - name -> new NonPersistentSubscription(this, subscriptionName)); + name -> new NonPersistentSubscription(this, subscriptionName, isDurable)); Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, MessageId.latest); addConsumerToSubscription(subscription, consumer).thenRun(() -> { @@ -320,7 +320,7 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs @Override public CompletableFuture createSubscription(String subscriptionName, InitialPosition initialPosition, boolean replicateSubscriptionState) { - return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName)); + return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName, true)); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 4a2466f53e347..205264699eb90 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -42,6 +42,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -56,6 +57,7 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.Murmur3_32Hash; import org.apache.pulsar.schema.Schemas; import org.awaitility.Awaitility; @@ -554,4 +556,50 @@ public void testAvoidUsingIoThreadToGetValueOfMessage() throws Exception { latch.await(); Assert.assertEquals(received.size(), 1); } + + @Test(timeOut = 1000 * 10) + public void removeNonPersistentTopicReaderTest() throws Exception { + final String topic = "non-persistent://my-property/my-ns/non-topic"; + + Reader reader = pulsarClient.newReader() + .topic(topic) + .startMessageId(MessageId.earliest) + .create(); + Reader reader2 = pulsarClient.newReader() + .topic(topic) + .startMessageId(MessageId.earliest) + .create(); + + Awaitility.await() + .pollDelay(3, TimeUnit.SECONDS) + .until(() -> { + TopicStats topicStats = admin.topics().getStats(topic); + System.out.println("subscriptions size: " + topicStats.getSubscriptions().size()); + return topicStats.getSubscriptions().size() == 2; + }); + + reader.close(); + reader2.close(); + + Awaitility.await().until(() -> { + TopicStats topicStats = admin.topics().getStats(topic); + System.out.println("subscriptions size: " + topicStats.getSubscriptions().size()); + return topicStats.getSubscriptions().size() == 0; + }); + + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub") + .subscribe(); + consumer.close(); + + Awaitility.await() + .pollDelay(3, TimeUnit.SECONDS) + .until(() -> { + TopicStats topicStats = admin.topics().getStats(topic); + System.out.println("subscriptions size: " + topicStats.getSubscriptions().size()); + return topicStats.getSubscriptions().size() == 1; + }); + } + }