From 0967e00f08bbabe8ba85c5376ee30d53965208f2 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Sat, 28 Aug 2021 08:46:41 -0700 Subject: [PATCH] Fixed accessing MessageImpl after it was enqueued on user queue (#11824) --- .../java/org/apache/pulsar/client/impl/ConsumerBase.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index f4002c28ef347a..d2b4f2a12d8142 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -727,8 +727,11 @@ protected boolean canEnqueueMessage(Message message) { } protected boolean enqueueMessageAndCheckBatchReceive(Message message) { + int messageSize = message.size(); if (canEnqueueMessage(message) && incomingMessages.offer(message)) { - increaseIncomingMessageSize(message); + // After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message instance + // anymore, since for pooled messages, this instance was possibly already been released and recycled. + INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize); } return hasEnoughMessagesForBatchReceive(); } @@ -970,10 +973,6 @@ protected boolean hasPendingBatchReceive() { return pendingBatchReceives != null && hasNextBatchReceive(); } - protected void increaseIncomingMessageSize(final Message message) { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size()); - } - protected void resetIncomingMessageSize() { INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); }