Skip to content

Commit

Permalink
[Broker] Remove subscription when closing Reader on non-persistent to…
Browse files Browse the repository at this point in the history
…pics (apache#11731)

* Remove the subscription from the topic when closing Reader subscription.

* remove useless code
  • Loading branch information
gaoran10 authored and ciaocloud committed Oct 16, 2021
1 parent 04cc940 commit ffab65d
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 3 deletions.
Expand Up @@ -72,13 +72,17 @@ public class NonPersistentSubscription implements Subscription {
private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
private final LongAdder msgOutFromRemovedConsumer = new LongAdder();

public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName) {
// If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription.
private final boolean isDurable;

public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName, boolean isDurable) {
this.topic = topic;
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
IS_FENCED_UPDATER.set(this, FALSE);
this.lastActive = System.currentTimeMillis();
this.isDurable = isDurable;
}

@Override
Expand Down Expand Up @@ -200,6 +204,9 @@ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor
ConsumerStatsImpl stats = consumer.getStats();
bytesOutFromRemovedConsumers.add(stats.bytesOutCounter);
msgOutFromRemovedConsumer.add(stats.msgOutCounter);
if (!isDurable) {
topic.unsubscribe(subName);
}

// invalid consumer remove will throw an exception
// decrement usage is triggered only for valid consumer close
Expand Down
Expand Up @@ -270,7 +270,7 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
}

NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> new NonPersistentSubscription(this, subscriptionName));
name -> new NonPersistentSubscription(this, subscriptionName, isDurable));
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, MessageId.latest);
addConsumerToSubscription(subscription, consumer).thenRun(() -> {
Expand Down Expand Up @@ -320,7 +320,7 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
@Override
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
boolean replicateSubscriptionState) {
return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName, true));
}

@Override
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
Expand All @@ -56,6 +57,7 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -554,4 +556,50 @@ public void testAvoidUsingIoThreadToGetValueOfMessage() throws Exception {
latch.await();
Assert.assertEquals(received.size(), 1);
}

@Test(timeOut = 1000 * 10)
public void removeNonPersistentTopicReaderTest() throws Exception {
final String topic = "non-persistent://my-property/my-ns/non-topic";

Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.earliest)
.create();
Reader<byte[]> reader2 = pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.earliest)
.create();

Awaitility.await()
.pollDelay(3, TimeUnit.SECONDS)
.until(() -> {
TopicStats topicStats = admin.topics().getStats(topic);
System.out.println("subscriptions size: " + topicStats.getSubscriptions().size());
return topicStats.getSubscriptions().size() == 2;
});

reader.close();
reader2.close();

Awaitility.await().until(() -> {
TopicStats topicStats = admin.topics().getStats(topic);
System.out.println("subscriptions size: " + topicStats.getSubscriptions().size());
return topicStats.getSubscriptions().size() == 0;
});

Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub")
.subscribe();
consumer.close();

Awaitility.await()
.pollDelay(3, TimeUnit.SECONDS)
.until(() -> {
TopicStats topicStats = admin.topics().getStats(topic);
System.out.println("subscriptions size: " + topicStats.getSubscriptions().size());
return topicStats.getSubscriptions().size() == 1;
});
}

}

0 comments on commit ffab65d

Please sign in to comment.