From 321afb9b1cacbf1e220245daf93b79a590b50749 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Sat, 19 Jun 2021 10:48:27 +0300 Subject: [PATCH] [Broker] Fix direct memory leak in getLastMessageId (#10977) (cherry picked from commit 7417ca87e623e56454b1d812ee957ddee511a78e) --- .../service/persistent/PersistentTopic.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 65a526e009a33..644b7266b8e1a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2571,13 +2571,18 @@ public CompletableFuture getLastMessageId() { ledgerImpl.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { - MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); - if (metadata.hasNumMessagesInBatch()) { - completableFuture.complete(new BatchMessageIdImpl(position.getLedgerId(), position.getEntryId(), - partitionIndex, metadata.getNumMessagesInBatch() - 1)); - } else { - completableFuture - .complete(new MessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex)); + try { + MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + if (metadata.hasNumMessagesInBatch()) { + completableFuture.complete(new BatchMessageIdImpl(position.getLedgerId(), position.getEntryId(), + partitionIndex, metadata.getNumMessagesInBatch() - 1)); + } else { + completableFuture + .complete(new MessageIdImpl(position.getLedgerId(), position.getEntryId(), + partitionIndex)); + } + } finally { + entry.release(); } }