diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index 9073447b34631..dc03962fcb15c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -123,7 +123,7 @@ public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscrip } ByteBuf subscriptionUpdate = Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds); - topic.publishMessage(subscriptionUpdate, this); + writeMarker(subscriptionUpdate); } private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) { @@ -140,8 +140,7 @@ private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest requ request.getSourceCluster(), localCluster, lastMsgId.getLedgerId(), lastMsgId.getEntryId()); - - topic.publishMessage(marker, this); + writeMarker(marker); } private void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) { @@ -276,7 +275,11 @@ void snapshotCompleted(String snapshotId) { } void writeMarker(ByteBuf marker) { - topic.publishMessage(marker, this); + try { + topic.publishMessage(marker, this); + } finally { + marker.release(); + } } /**