diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 0200e25cff132..cd39919a3b357 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -288,6 +288,13 @@ ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionNam */ void deleteCursor(String name) throws InterruptedException, ManagedLedgerException; + /** + * Remove a ManagedCursor from this ManagedLedger's waitingCursors. + * + * @param cursor the ManagedCursor + */ + void removeWaitingCursor(ManagedCursor cursor); + /** * Open a ManagedCursor asynchronously. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 528816047b709..87287530a54a8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3415,6 +3415,10 @@ public void deactivateCursor(ManagedCursor cursor) { } } + public void removeWaitingCursor(ManagedCursor cursor) { + this.waitingCursors.remove(cursor); + } + public boolean isCursorActive(ManagedCursor cursor) { return activeCursors.get(cursor.getName()) != null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index f69db6825e907..8637ecd1e15b8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -299,6 +299,7 @@ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor if (dispatcher != null && dispatcher.getConsumers().isEmpty()) { deactivateCursor(); + topic.getManagedLedger().removeWaitingCursor(cursor); if (!cursor.isDurable()) { // If cursor is not durable, we need to clean up the subscription as well diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java index e0d1720dc1f9a..09f2c91cc2ab6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java @@ -19,15 +19,22 @@ package org.apache.pulsar.broker.admin; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Lists; import javax.ws.rs.ClientErrorException; import javax.ws.rs.core.Response.Status; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -127,4 +134,19 @@ public void createSubscriptionOnPartitionedTopicWithPartialFailure() throws Exce Lists.newArrayList("sub-1")); } } + + @Test + public void testWaitingCurosrCausedMemoryLeak() throws Exception { + String topic = "persistent://my-property/my-ns/my-topic"; + for (int i = 0; i < 10; i ++) { + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionType(SubscriptionType.Failover).subscriptionName("test" + i).subscribe(); + Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected())); + consumer.close(); + } + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl)(topicRef.getManagedLedger()); + assertEquals(ml.getWaitingCursorsCount(), 0); + } + } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java index 229cd666acff8..907aba67b9ef3 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java @@ -137,6 +137,11 @@ public void deleteCursor(String name) throws InterruptedException, ManagedLedger } + @Override + public void removeWaitingCursor(ManagedCursor cursor) { + + } + @Override public void asyncOpenCursor(String name, AsyncCallbacks.OpenCursorCallback callback, Object ctx) {