From 9c3e904ae56b660ca7b0f7b201effcc884654fd5 Mon Sep 17 00:00:00 2001 From: Aloys Date: Sat, 19 Jun 2021 02:00:48 +0800 Subject: [PATCH] fix parseMessageMetadata error cause by not skip broker entry metadata (#10968) Fixes #10967 ### Motivation fix parseMessageMetadata error cause by not skip broker entry metadata ### Modifications skip broker entry metadata if exist before parsing message metadata (cherry picked from commit 0774b5fddddc0c9fe9b7cc00ae40e43322690ef1) --- .../admin/impl/PersistentTopicsBase.java | 2 - .../service/BrokerEntryMetadataE2ETest.java | 20 ++++++++ .../pulsar/client/impl/MessageImpl.java | 2 - .../pulsar/client/impl/MessageImplTest.java | 51 +++++++++++++++++++ .../pulsar/common/protocol/Commands.java | 5 +- 5 files changed, 74 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 4a03cd1d39c13..76211b8e3c419 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2451,8 +2451,6 @@ private Response generateResponseWithEntry(Entry entry) throws IOException { PositionImpl pos = (PositionImpl) entry.getPosition(); ByteBuf metadataAndPayload = entry.getDataBuffer(); - // moves the readerIndex to the payload - Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload); MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); ResponseBuilder responseBuilder = Response.ok(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java index 5cbaf3de9ae86..e7d98a829934b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java @@ -109,4 +109,24 @@ public void testPeekMessage() throws Exception { Assert.assertEquals(messages.size(), 1); Assert.assertEquals(messages.get(0).getData(), "hello".getBytes()); } + + @Test(timeOut = 20000) + public void testGetLastMessageId() throws Exception { + final String topic = "persistent://prop/ns-abc/topic-test"; + final String subscription = "my-sub"; + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topic) + .create(); + producer.newMessage().value("hello".getBytes()).send(); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName(subscription) + .subscribe(); + consumer.getLastMessageId(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index efb66e5d947a9..c9370f3d97b80 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -300,8 +300,6 @@ public static MessageImpl deserializeSkipBrokerEntryMetaData( @SuppressWarnings("unchecked") MessageImpl msg = (MessageImpl) RECYCLER.get(); - Commands.skipBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata); - Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata); msg.payload = headersAndPayloadWithBrokerEntryMetadata; msg.messageId = null; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java index 0a57d936549cf..17b77a2cab6a2 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java @@ -483,4 +483,55 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() { fail(); } } + + @Test(timeOut = 30000) + public void testParseMessageMetadataWithBrokerEntryMetadata() { + int MOCK_BATCH_SIZE = 10; + String data = "test-message"; + ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length()); + byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8)); + + // first, build a message with broker entry metadata + + // build message metadata + MessageMetadata messageMetadata = new MessageMetadata() + .setPublishTime(1) + .setProducerName("test") + .setSequenceId(1); + byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf); + + // build broker entry metadata + BrokerEntryMetadata brokerMetadata = new BrokerEntryMetadata() + .setIndex(MOCK_BATCH_SIZE - 1); + + // build final data which contains broker entry metadata + int brokerMetaSize = brokerMetadata.getSerializedSize(); + ByteBuf brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6); + brokerMeta.writeShort(Commands.magicBrokerEntryMetadata); + brokerMeta.writeInt(brokerMetaSize); + brokerMetadata.writeTo(brokerMeta); + + CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); + compositeByteBuf.addComponents(true, brokerMeta, byteBuf); + + CompositeByteBuf dupCompositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); + dupCompositeByteBuf.addComponents(true, brokerMeta, byteBuf); + + //second, parse message metadata without skip broker entry metadata + Commands.skipChecksumIfPresent(compositeByteBuf); + int metadataSize = (int) compositeByteBuf.readUnsignedInt(); + MessageMetadata md = new MessageMetadata(); + try { + md.parseFrom(compositeByteBuf, metadataSize); + Assert.fail("Parse operation should be failed."); + } catch (IllegalArgumentException e) { + // expected + } + + //third, parse message metadata with skip broker entry metadata first + MessageMetadata metadata = Commands.parseMessageMetadata(dupCompositeByteBuf); + assertEquals(metadata.getPublishTime(), 1); + assertEquals(metadata.getProducerName(), "test"); + assertEquals(metadata.getSequenceId(), 1); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 0c8039522e5d8..7c93ecc5d5aa2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -418,6 +418,9 @@ public static MessageMetadata parseMessageMetadata(ByteBuf buffer) { } public static void parseMessageMetadata(ByteBuf buffer, MessageMetadata msgMetadata) { + // initially reader-index may point to start of broker entry metadata : + // increment reader-index to start_of_headAndPayload to parse metadata + skipBrokerEntryMetadataIfExist(buffer); // initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata // to parse metadata skipChecksumIfPresent(buffer); @@ -1667,7 +1670,6 @@ public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, St try { // save the reader index and restore after parsing int readerIdx = metadataAndPayload.readerIndex(); - skipBrokerEntryMetadataIfExist(metadataAndPayload); MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); metadataAndPayload.readerIndex(readerIdx); @@ -1682,7 +1684,6 @@ public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, St public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) { try { int readerIdx = metadataAndPayload.readerIndex(); - skipBrokerEntryMetadataIfExist(metadataAndPayload); MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); metadataAndPayload.readerIndex(readerIdx); if (metadata.hasOrderingKey()) {