diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index babd624ea3ce9b..578ce1ebdb97c1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -275,8 +275,7 @@ public static MessageImpl deserializeBrokerEntryMetaDataFirst( msg.brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata); - - if (msg.brokerEntryMetadata != null) { + if (msg.brokerEntryMetadata != null && msg.brokerEntryMetadata.hasBrokerTimestamp()) { msg.msgMetadata.clear(); msg.payload = null; msg.messageId = null; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java index 17b77a2cab6a2e..735e622846827b 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java @@ -425,7 +425,7 @@ public void testTypedSchemaGetNullValue() { assertNull(msg.getValue()); } - @Test(timeOut = 30000) + @Test(timeOut = 300000) public void testMessageBrokerAndEntryMetadataTimestampMissed() { int MOCK_BATCH_SIZE = 10; String data = "test-message"; @@ -451,9 +451,7 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() { CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); compositeByteBuf.addComponents(true, brokerMeta, byteBuf); MessageImpl messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf); - MessageImpl message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf); - message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata()); - assertTrue(message.isExpired(100)); + assertTrue(messageWithEntryMetadata.isExpired(100)); // test BrokerTimestamp set. byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length()); @@ -476,9 +474,7 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() { compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); compositeByteBuf.addComponents(true, brokerMeta, byteBuf); messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf); - message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf); - message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata()); - assertFalse(message.isExpired(24 * 3600)); + assertFalse(messageWithEntryMetadata.isExpired(24 * 3600)); } catch (IOException e) { fail(); }