diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index f4002c28ef347a..d2b4f2a12d8142 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -727,8 +727,11 @@ protected boolean canEnqueueMessage(Message message) { } protected boolean enqueueMessageAndCheckBatchReceive(Message message) { + int messageSize = message.size(); if (canEnqueueMessage(message) && incomingMessages.offer(message)) { - increaseIncomingMessageSize(message); + // After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message instance + // anymore, since for pooled messages, this instance was possibly already been released and recycled. + INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize); } return hasEnoughMessagesForBatchReceive(); } @@ -970,10 +973,6 @@ protected boolean hasPendingBatchReceive() { return pendingBatchReceives != null && hasNextBatchReceive(); } - protected void increaseIncomingMessageSize(final Message message) { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size()); - } - protected void resetIncomingMessageSize() { INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); }