Skip to content

Commit

Permalink
[Broker] waitingCursors potential heap memory leak (apache#13939)
Browse files Browse the repository at this point in the history
  • Loading branch information
gaozhangmin authored and nicklixinyang committed Apr 20, 2022
1 parent 3bd1e2d commit 2c07ae9
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 0 deletions.
Expand Up @@ -292,6 +292,13 @@ ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionNam
*/
void deleteCursor(String name) throws InterruptedException, ManagedLedgerException;

/**
* Remove a ManagedCursor from this ManagedLedger's waitingCursors.
*
* @param cursor the ManagedCursor
*/
void removeWaitingCursor(ManagedCursor cursor);

/**
* Open a ManagedCursor asynchronously.
*
Expand Down
Expand Up @@ -3485,6 +3485,10 @@ public void deactivateCursor(ManagedCursor cursor) {
}
}

public void removeWaitingCursor(ManagedCursor cursor) {
this.waitingCursors.remove(cursor);
}

public boolean isCursorActive(ManagedCursor cursor) {
return activeCursors.get(cursor.getName()) != null;
}
Expand Down
Expand Up @@ -308,6 +308,7 @@ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor

if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
deactivateCursor();
topic.getManagedLedger().removeWaitingCursor(cursor);

if (!cursor.isDurable()) {
// If cursor is not durable, we need to clean up the subscription as well
Expand Down
Expand Up @@ -32,20 +32,24 @@
import java.io.IOException;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.core.Response.Status;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
Expand Down Expand Up @@ -348,4 +352,18 @@ public void createSubscriptionBySpecifyingStringPosition() throws IOException, P

producer.close();
}

@Test
public void testWaitingCurosrCausedMemoryLeak() throws Exception {
String topic = "persistent://my-property/my-ns/my-topic";
for (int i = 0; i < 10; i ++) {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
.subscriptionType(SubscriptionType.Failover).subscriptionName("test" + i).subscribe();
Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected()));
consumer.close();
}
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
ManagedLedgerImpl ml = (ManagedLedgerImpl)(topicRef.getManagedLedger());
assertEquals(ml.getWaitingCursorsCount(), 0);
}
}
Expand Up @@ -137,6 +137,11 @@ public void deleteCursor(String name) throws InterruptedException, ManagedLedger

}

@Override
public void removeWaitingCursor(ManagedCursor cursor) {

}

@Override
public void asyncOpenCursor(String name, AsyncCallbacks.OpenCursorCallback callback, Object ctx) {

Expand Down

0 comments on commit 2c07ae9

Please sign in to comment.