Skip to content

Commit

Permalink
fix publish_time not set error when broker entry metadata enable with…
Browse files Browse the repository at this point in the history
…out AppendBrokerTimestampMetadataInterceptor
  • Loading branch information
aloyszhang committed Jun 24, 2021
1 parent 1623790 commit 158e9e3
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 9 deletions.
Expand Up @@ -275,8 +275,7 @@ public static MessageImpl<byte[]> deserializeBrokerEntryMetaDataFirst(

msg.brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);

if (msg.brokerEntryMetadata != null) {
if (msg.brokerEntryMetadata != null && msg.brokerEntryMetadata.hasBrokerTimestamp()) {
msg.msgMetadata.clear();
msg.payload = null;
msg.messageId = null;
Expand Down
Expand Up @@ -425,7 +425,7 @@ public void testTypedSchemaGetNullValue() {
assertNull(msg.getValue());
}

@Test(timeOut = 30000)
@Test(timeOut = 300000)
public void testMessageBrokerAndEntryMetadataTimestampMissed() {
int MOCK_BATCH_SIZE = 10;
String data = "test-message";
Expand All @@ -451,9 +451,7 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {
CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
MessageImpl messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
MessageImpl message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
assertTrue(message.isExpired(100));
assertTrue(messageWithEntryMetadata.isExpired(100));

// test BrokerTimestamp set.
byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
Expand All @@ -476,9 +474,7 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {
compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
assertFalse(message.isExpired(24 * 3600));
assertFalse(messageWithEntryMetadata.isExpired(24 * 3600));
} catch (IOException e) {
fail();
}
Expand Down

0 comments on commit 158e9e3

Please sign in to comment.