diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index c821096a8d4e8c..58ff0deb4e5670 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -276,8 +276,7 @@ public static MessageImpl deserializeBrokerEntryMetaDataFirst( msg.brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata); - - if (msg.brokerEntryMetadata != null) { + if (msg.brokerEntryMetadata != null && msg.brokerEntryMetadata.hasBrokerTimestamp()) { msg.msgMetadata.clear(); msg.payload = null; msg.messageId = null; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java index 17b77a2cab6a2e..252cbbb7ca9229 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java @@ -451,9 +451,7 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() { CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); compositeByteBuf.addComponents(true, brokerMeta, byteBuf); MessageImpl messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf); - MessageImpl message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf); - message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata()); - assertTrue(message.isExpired(100)); + assertTrue(messageWithEntryMetadata.isExpired(100)); // test BrokerTimestamp set. byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length()); @@ -476,9 +474,7 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() { compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); compositeByteBuf.addComponents(true, brokerMeta, byteBuf); messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf); - message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf); - message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata()); - assertFalse(message.isExpired(24 * 3600)); + assertFalse(messageWithEntryMetadata.isExpired(24 * 3600)); } catch (IOException e) { fail(); }