Skip to content

Commit

Permalink
[Broker] waitingCursors potential heap memory leak (#13939)
Browse files Browse the repository at this point in the history
(cherry picked from commit 478fd36)
  • Loading branch information
gaozhangmin authored and gaoran10 committed Mar 1, 2022
1 parent 44408bf commit e583b05
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 0 deletions.
Expand Up @@ -288,6 +288,13 @@ ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionNam
*/
void deleteCursor(String name) throws InterruptedException, ManagedLedgerException;

/**
* Remove a ManagedCursor from this ManagedLedger's waitingCursors.
*
* @param cursor the ManagedCursor
*/
void removeWaitingCursor(ManagedCursor cursor);

/**
* Open a ManagedCursor asynchronously.
*
Expand Down
Expand Up @@ -3415,6 +3415,10 @@ public void deactivateCursor(ManagedCursor cursor) {
}
}

public void removeWaitingCursor(ManagedCursor cursor) {
this.waitingCursors.remove(cursor);
}

public boolean isCursorActive(ManagedCursor cursor) {
return activeCursors.get(cursor.getName()) != null;
}
Expand Down
Expand Up @@ -299,6 +299,7 @@ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor

if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
deactivateCursor();
topic.getManagedLedger().removeWaitingCursor(cursor);

if (!cursor.isDurable()) {
// If cursor is not durable, we need to clean up the subscription as well
Expand Down
Expand Up @@ -19,15 +19,22 @@
package org.apache.pulsar.broker.admin;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.core.Response.Status;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -127,4 +134,19 @@ public void createSubscriptionOnPartitionedTopicWithPartialFailure() throws Exce
Lists.newArrayList("sub-1"));
}
}

@Test
public void testWaitingCurosrCausedMemoryLeak() throws Exception {
String topic = "persistent://my-property/my-ns/my-topic";
for (int i = 0; i < 10; i ++) {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
.subscriptionType(SubscriptionType.Failover).subscriptionName("test" + i).subscribe();
Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected()));
consumer.close();
}
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
ManagedLedgerImpl ml = (ManagedLedgerImpl)(topicRef.getManagedLedger());
assertEquals(ml.getWaitingCursorsCount(), 0);
}

}
Expand Up @@ -137,6 +137,11 @@ public void deleteCursor(String name) throws InterruptedException, ManagedLedger

}

@Override
public void removeWaitingCursor(ManagedCursor cursor) {

}

@Override
public void asyncOpenCursor(String name, AsyncCallbacks.OpenCursorCallback callback, Object ctx) {

Expand Down

0 comments on commit e583b05

Please sign in to comment.