From 54b05fef84e5b9b1642fa66ead8b37061cb7a61f Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 22 Jul 2021 09:57:02 +0300 Subject: [PATCH] [Broker] Fix replicated subscriptions direct memory leak (#11396) - .release() wasn't called for marker messages sent by ReplicatedSubscriptionsController. - Use writeMarker method to publish marker messages and call release in try-finally block. (cherry picked from commit 8841c81ff687c3aa8f8990272425d76df59fd311) --- .../persistent/ReplicatedSubscriptionsController.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index 5a8875770451a..c893b6c7cc76d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -117,7 +117,7 @@ public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscrip } ByteBuf subscriptionUpdate = Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds); - topic.publishMessage(subscriptionUpdate, this); + writeMarker(subscriptionUpdate); } private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) { @@ -134,8 +134,7 @@ private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest requ request.getSourceCluster(), localCluster, lastMsgId.getLedgerId(), lastMsgId.getEntryId()); - - topic.publishMessage(marker, this); + writeMarker(marker); } private void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) { @@ -228,7 +227,11 @@ void snapshotCompleted(String snapshotId) { } void writeMarker(ByteBuf marker) { - topic.publishMessage(marker, this); + try { + topic.publishMessage(marker, this); + } finally { + marker.release(); + } } /**