diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index 5a8875770451a..c893b6c7cc76d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -117,7 +117,7 @@ public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscrip } ByteBuf subscriptionUpdate = Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds); - topic.publishMessage(subscriptionUpdate, this); + writeMarker(subscriptionUpdate); } private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) { @@ -134,8 +134,7 @@ private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest requ request.getSourceCluster(), localCluster, lastMsgId.getLedgerId(), lastMsgId.getEntryId()); - - topic.publishMessage(marker, this); + writeMarker(marker); } private void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) { @@ -228,7 +227,11 @@ void snapshotCompleted(String snapshotId) { } void writeMarker(ByteBuf marker) { - topic.publishMessage(marker, this); + try { + topic.publishMessage(marker, this); + } finally { + marker.release(); + } } /**