diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 23fd63ad3baf8c..e1d44d2bc41e6e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3482,6 +3482,7 @@ public void deactivateCursor(ManagedCursor cursor) { if (!cursor.isDurable()) { nonDurableActiveCursors.removeCursor(cursor.getName()); } + waitingCursors.remove(cursor); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java index 59ebdd5c6dd24a..2f876efd880967 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java @@ -32,6 +32,7 @@ import java.io.IOException; import javax.ws.rs.ClientErrorException; import javax.ws.rs.core.Response.Status; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; @@ -39,13 +40,16 @@ import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; @@ -348,4 +352,17 @@ public void createSubscriptionBySpecifyingStringPosition() throws IOException, P producer.close(); } + + @Test + public void testWaitingCurosrCausedMemoryLeak() throws Exception { + String topic = "persistent://my-property/my-ns/my-topic"; + for (int i=0;i<10;i++) { + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionType(SubscriptionType.Failover).subscriptionName("test" + i).subscribe(); + consumer.close(); + } + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl)(topicRef.getManagedLedger()); + assertEquals(ml.getWaitingCursorsCount(), 0); + } }