From 63f2eb6b3167ce6c58ec7815418d27bc07991f2e Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Fri, 18 Jun 2021 13:20:19 +0800 Subject: [PATCH 1/4] fix parseMessageMetadata error cause by not skip broker entry metadata --- .../main/java/org/apache/pulsar/common/protocol/Commands.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 00f970583b0ea..cae8d94b3ade4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -418,6 +418,9 @@ public static MessageMetadata parseMessageMetadata(ByteBuf buffer) { } public static void parseMessageMetadata(ByteBuf buffer, MessageMetadata msgMetadata) { + // initially reader-index may point to start of broker entry metadata : + // increment reader-index to start_of_headAndPayload to parse metadata + skipBrokerEntryMetadataIfExist(buffer); // initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata // to parse metadata skipChecksumIfPresent(buffer); From 4cf27ca6b3ec6c5cdb3e63b89a4384ce09d41379 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Fri, 18 Jun 2021 16:33:04 +0800 Subject: [PATCH 2/4] add test for parse message metadata when broker entry metadata enabled --- .../service/BrokerEntryMetadataE2ETest.java | 20 ++++++++ .../pulsar/client/impl/MessageImplTest.java | 51 +++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java index 5cbaf3de9ae86..5f653637362f3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java @@ -109,4 +109,24 @@ public void testPeekMessage() throws Exception { Assert.assertEquals(messages.size(), 1); Assert.assertEquals(messages.get(0).getData(), "hello".getBytes()); } + + @Test(timeOut = 20000) + public void testGetLastMessageId() throws Exception { + final String topic = "persistent://prop/ns-abc/topic-test"; + final String subscription = "my-sub"; + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topic) + .create(); + producer.newMessage().value("hello".getBytes()).send(); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName(subscription) + .subscribe(); + System.out.println(consumer.getLastMessageId()); + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java index 0a57d936549cf..dfc36fc28c724 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java @@ -483,4 +483,55 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() { fail(); } } + + @Test(timeOut = 30000) + public void testParseMessageMetadataWithBrokerEntryMetadata() { + int MOCK_BATCH_SIZE = 10; + String data = "test-message"; + ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length()); + byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8)); + + // first, build a message with broker entry metadata + + // build message metadata + MessageMetadata messageMetadata = new MessageMetadata() + .setPublishTime(1) + .setProducerName("test") + .setSequenceId(1); + byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf); + + // build broker entry metadata + BrokerEntryMetadata brokerMetadata = new BrokerEntryMetadata() + .setIndex(MOCK_BATCH_SIZE - 1); + + // build final data which contains broker entry metadata + int brokerMetaSize = brokerMetadata.getSerializedSize(); + ByteBuf brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6); + brokerMeta.writeShort(Commands.magicBrokerEntryMetadata); + brokerMeta.writeInt(brokerMetaSize); + brokerMetadata.writeTo(brokerMeta); + + CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); + compositeByteBuf.addComponents(true, brokerMeta, byteBuf); + + CompositeByteBuf dupCompositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); + dupCompositeByteBuf.addComponents(true, brokerMeta, byteBuf); + + //second, parse message metadata without skip broker entry metadata + try { + Commands.skipChecksumIfPresent(compositeByteBuf); + int metadataSize = (int) compositeByteBuf.readUnsignedInt(); + MessageMetadata md = new MessageMetadata(); + md.parseFrom(compositeByteBuf, metadataSize); + Assert.fail("Parse operation should be failed."); + } catch (Exception e) { + // expected + } + + //third, parse message metadata with skip broker entry metadata first + MessageMetadata metadata = Commands.parseMessageMetadata(dupCompositeByteBuf); + assertEquals(metadata.getPublishTime(), 1); + assertEquals(metadata.getProducerName(), "test"); + assertEquals(metadata.getSequenceId(), 1); + } } From 41db6d343b86a0195415560031aa6dbdd88795bc Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Fri, 18 Jun 2021 16:33:38 +0800 Subject: [PATCH 3/4] remove extra call of skipBrokerEntryMetadataIfExist --- .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 2 -- .../main/java/org/apache/pulsar/client/impl/MessageImpl.java | 2 -- .../main/java/org/apache/pulsar/common/protocol/Commands.java | 2 -- 3 files changed, 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 97f4a8dbbf2bf..eacfff224b0dc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2450,8 +2450,6 @@ private Response generateResponseWithEntry(Entry entry) throws IOException { PositionImpl pos = (PositionImpl) entry.getPosition(); ByteBuf metadataAndPayload = entry.getDataBuffer(); - // moves the readerIndex to the payload - Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload); MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); ResponseBuilder responseBuilder = Response.ok(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 82eec0959fcbf..babd624ea3ce9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -300,8 +300,6 @@ public static MessageImpl deserializeSkipBrokerEntryMetaData( @SuppressWarnings("unchecked") MessageImpl msg = (MessageImpl) RECYCLER.get(); - Commands.skipBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata); - Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata); msg.payload = headersAndPayloadWithBrokerEntryMetadata; msg.messageId = null; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index cae8d94b3ade4..cf997baf94896 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1670,7 +1670,6 @@ public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, St try { // save the reader index and restore after parsing int readerIdx = metadataAndPayload.readerIndex(); - skipBrokerEntryMetadataIfExist(metadataAndPayload); MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); metadataAndPayload.readerIndex(readerIdx); @@ -1685,7 +1684,6 @@ public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, St public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) { try { int readerIdx = metadataAndPayload.readerIndex(); - skipBrokerEntryMetadataIfExist(metadataAndPayload); MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); metadataAndPayload.readerIndex(readerIdx); if (metadata.hasOrderingKey()) { From 61556be6ab5d4dd4d726d73e43c8c406ae69443b Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Fri, 18 Jun 2021 19:43:02 +0800 Subject: [PATCH 4/4] apply comment on test code --- .../pulsar/broker/service/BrokerEntryMetadataE2ETest.java | 2 +- .../org/apache/pulsar/client/impl/MessageImplTest.java | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java index 5f653637362f3..e7d98a829934b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java @@ -127,6 +127,6 @@ public void testGetLastMessageId() throws Exception { .subscriptionType(SubscriptionType.Exclusive) .subscriptionName(subscription) .subscribe(); - System.out.println(consumer.getLastMessageId()); + consumer.getLastMessageId(); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java index dfc36fc28c724..17b77a2cab6a2 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java @@ -518,13 +518,13 @@ public void testParseMessageMetadataWithBrokerEntryMetadata() { dupCompositeByteBuf.addComponents(true, brokerMeta, byteBuf); //second, parse message metadata without skip broker entry metadata + Commands.skipChecksumIfPresent(compositeByteBuf); + int metadataSize = (int) compositeByteBuf.readUnsignedInt(); + MessageMetadata md = new MessageMetadata(); try { - Commands.skipChecksumIfPresent(compositeByteBuf); - int metadataSize = (int) compositeByteBuf.readUnsignedInt(); - MessageMetadata md = new MessageMetadata(); md.parseFrom(compositeByteBuf, metadataSize); Assert.fail("Parse operation should be failed."); - } catch (Exception e) { + } catch (IllegalArgumentException e) { // expected }