From 56996bcf144f6c59fdf1f3a56b8dd0bd784e9457 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Thu, 24 Jun 2021 22:09:24 +0800 Subject: [PATCH] fix publish_time not set error when broker entry metadata enable without AppendBrokerTimestampMetadataInterceptor --- .../java/org/apache/pulsar/client/impl/MessageImpl.java | 3 +-- .../org/apache/pulsar/client/impl/MessageImplTest.java | 8 ++------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index c821096a8d4e8c..58ff0deb4e5670 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -276,8 +276,7 @@ public static MessageImpl deserializeBrokerEntryMetaDataFirst( msg.brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata); - - if (msg.brokerEntryMetadata != null) { + if (msg.brokerEntryMetadata != null && msg.brokerEntryMetadata.hasBrokerTimestamp()) { msg.msgMetadata.clear(); msg.payload = null; msg.messageId = null; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java index 17b77a2cab6a2e..252cbbb7ca9229 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java @@ -451,9 +451,7 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() { CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); compositeByteBuf.addComponents(true, brokerMeta, byteBuf); MessageImpl messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf); - MessageImpl message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf); - message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata()); - assertTrue(message.isExpired(100)); + assertTrue(messageWithEntryMetadata.isExpired(100)); // test BrokerTimestamp set. byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length()); @@ -476,9 +474,7 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() { compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); compositeByteBuf.addComponents(true, brokerMeta, byteBuf); messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf); - message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf); - message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata()); - assertFalse(message.isExpired(24 * 3600)); + assertFalse(messageWithEntryMetadata.isExpired(24 * 3600)); } catch (IOException e) { fail(); }