diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 65a526e009a33..644b7266b8e1a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2571,13 +2571,18 @@ public CompletableFuture getLastMessageId() { ledgerImpl.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { - MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); - if (metadata.hasNumMessagesInBatch()) { - completableFuture.complete(new BatchMessageIdImpl(position.getLedgerId(), position.getEntryId(), - partitionIndex, metadata.getNumMessagesInBatch() - 1)); - } else { - completableFuture - .complete(new MessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex)); + try { + MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + if (metadata.hasNumMessagesInBatch()) { + completableFuture.complete(new BatchMessageIdImpl(position.getLedgerId(), position.getEntryId(), + partitionIndex, metadata.getNumMessagesInBatch() - 1)); + } else { + completableFuture + .complete(new MessageIdImpl(position.getLedgerId(), position.getEntryId(), + partitionIndex)); + } + } finally { + entry.release(); } }