From 2c07ae93026645c0326d686dde2941e8e16d3eae Mon Sep 17 00:00:00 2001 From: gaozhangmin Date: Tue, 22 Feb 2022 02:18:56 +0800 Subject: [PATCH] [Broker] waitingCursors potential heap memory leak (#13939) --- .../bookkeeper/mledger/ManagedLedger.java | 7 +++++++ .../mledger/impl/ManagedLedgerImpl.java | 4 ++++ .../persistent/PersistentSubscription.java | 1 + .../broker/admin/CreateSubscriptionTest.java | 18 ++++++++++++++++++ .../offload/jcloud/impl/MockManagedLedger.java | 5 +++++ 5 files changed, 35 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 5c62dba9d0d8ed..1f6e0d3af4641b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -292,6 +292,13 @@ ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionNam */ void deleteCursor(String name) throws InterruptedException, ManagedLedgerException; + /** + * Remove a ManagedCursor from this ManagedLedger's waitingCursors. + * + * @param cursor the ManagedCursor + */ + void removeWaitingCursor(ManagedCursor cursor); + /** * Open a ManagedCursor asynchronously. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 23fd63ad3baf8c..bfa33369547dca 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3485,6 +3485,10 @@ public void deactivateCursor(ManagedCursor cursor) { } } + public void removeWaitingCursor(ManagedCursor cursor) { + this.waitingCursors.remove(cursor); + } + public boolean isCursorActive(ManagedCursor cursor) { return activeCursors.get(cursor.getName()) != null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 6d74b531493f8e..fc4d0f2d3dc54d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -308,6 +308,7 @@ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor if (dispatcher != null && dispatcher.getConsumers().isEmpty()) { deactivateCursor(); + topic.getManagedLedger().removeWaitingCursor(cursor); if (!cursor.isDurable()) { // If cursor is not durable, we need to clean up the subscription as well diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java index 59ebdd5c6dd24a..5b43419462a6a9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java @@ -32,6 +32,7 @@ import java.io.IOException; import javax.ws.rs.ClientErrorException; import javax.ws.rs.core.Response.Status; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; @@ -39,13 +40,16 @@ import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; @@ -348,4 +352,18 @@ public void createSubscriptionBySpecifyingStringPosition() throws IOException, P producer.close(); } + + @Test + public void testWaitingCurosrCausedMemoryLeak() throws Exception { + String topic = "persistent://my-property/my-ns/my-topic"; + for (int i = 0; i < 10; i ++) { + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionType(SubscriptionType.Failover).subscriptionName("test" + i).subscribe(); + Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected())); + consumer.close(); + } + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl)(topicRef.getManagedLedger()); + assertEquals(ml.getWaitingCursorsCount(), 0); + } } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java index d025cd1168eefc..c92c2f805bd288 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java @@ -137,6 +137,11 @@ public void deleteCursor(String name) throws InterruptedException, ManagedLedger } + @Override + public void removeWaitingCursor(ManagedCursor cursor) { + + } + @Override public void asyncOpenCursor(String name, AsyncCallbacks.OpenCursorCallback callback, Object ctx) {