Skip to content

Commit

Permalink
fix parseMessageMetadata error cause by not skip broker entry metadata (
Browse files Browse the repository at this point in the history
apache#10968)

Fixes apache#10967

### Motivation
fix parseMessageMetadata error cause by not skip broker entry metadata

### Modifications

skip broker entry metadata if exist before parsing message metadata

(cherry picked from commit 0774b5f)
  • Loading branch information
aloyszhang authored and eolivelli committed Sep 6, 2021
1 parent d419734 commit 10e74d9
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 6 deletions.
Expand Up @@ -2449,8 +2449,6 @@ private Response generateResponseWithEntry(Entry entry) throws IOException {
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();

// moves the readerIndex to the payload
Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);

ResponseBuilder responseBuilder = Response.ok();
Expand Down
Expand Up @@ -109,4 +109,24 @@ public void testPeekMessage() throws Exception {
Assert.assertEquals(messages.size(), 1);
Assert.assertEquals(messages.get(0).getData(), "hello".getBytes());
}

@Test(timeOut = 20000)
public void testGetLastMessageId() throws Exception {
final String topic = "persistent://prop/ns-abc/topic-test";
final String subscription = "my-sub";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
producer.newMessage().value("hello".getBytes()).send();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(subscription)
.subscribe();
consumer.getLastMessageId();
}
}
Expand Up @@ -301,8 +301,6 @@ public static MessageImpl<byte[]> deserializeSkipBrokerEntryMetaData(
@SuppressWarnings("unchecked")
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();

Commands.skipBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);

Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata);
msg.payload = headersAndPayloadWithBrokerEntryMetadata;
msg.messageId = null;
Expand Down
Expand Up @@ -483,4 +483,55 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {
fail();
}
}

@Test(timeOut = 30000)
public void testParseMessageMetadataWithBrokerEntryMetadata() {
int MOCK_BATCH_SIZE = 10;
String data = "test-message";
ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));

// first, build a message with broker entry metadata

// build message metadata
MessageMetadata messageMetadata = new MessageMetadata()
.setPublishTime(1)
.setProducerName("test")
.setSequenceId(1);
byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);

// build broker entry metadata
BrokerEntryMetadata brokerMetadata = new BrokerEntryMetadata()
.setIndex(MOCK_BATCH_SIZE - 1);

// build final data which contains broker entry metadata
int brokerMetaSize = brokerMetadata.getSerializedSize();
ByteBuf brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6);
brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
brokerMeta.writeInt(brokerMetaSize);
brokerMetadata.writeTo(brokerMeta);

CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(true, brokerMeta, byteBuf);

CompositeByteBuf dupCompositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
dupCompositeByteBuf.addComponents(true, brokerMeta, byteBuf);

//second, parse message metadata without skip broker entry metadata
Commands.skipChecksumIfPresent(compositeByteBuf);
int metadataSize = (int) compositeByteBuf.readUnsignedInt();
MessageMetadata md = new MessageMetadata();
try {
md.parseFrom(compositeByteBuf, metadataSize);
Assert.fail("Parse operation should be failed.");
} catch (IllegalArgumentException e) {
// expected
}

//third, parse message metadata with skip broker entry metadata first
MessageMetadata metadata = Commands.parseMessageMetadata(dupCompositeByteBuf);
assertEquals(metadata.getPublishTime(), 1);
assertEquals(metadata.getProducerName(), "test");
assertEquals(metadata.getSequenceId(), 1);
}
}
Expand Up @@ -418,6 +418,9 @@ public static MessageMetadata parseMessageMetadata(ByteBuf buffer) {
}

public static void parseMessageMetadata(ByteBuf buffer, MessageMetadata msgMetadata) {
// initially reader-index may point to start of broker entry metadata :
// increment reader-index to start_of_headAndPayload to parse metadata
skipBrokerEntryMetadataIfExist(buffer);
// initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata
// to parse metadata
skipChecksumIfPresent(buffer);
Expand Down Expand Up @@ -1667,7 +1670,6 @@ public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, St
try {
// save the reader index and restore after parsing
int readerIdx = metadataAndPayload.readerIndex();
skipBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.readerIndex(readerIdx);

Expand All @@ -1682,7 +1684,6 @@ public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, St
public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) {
try {
int readerIdx = metadataAndPayload.readerIndex();
skipBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.readerIndex(readerIdx);
if (metadata.hasOrderingKey()) {
Expand Down

0 comments on commit 10e74d9

Please sign in to comment.