Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix parseMessageMetadata error cause by not skip broker entry metadata #10968

Merged
merged 4 commits into from Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -2450,8 +2450,6 @@ private Response generateResponseWithEntry(Entry entry) throws IOException {
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();

// moves the readerIndex to the payload
Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);

ResponseBuilder responseBuilder = Response.ok();
Expand Down
Expand Up @@ -109,4 +109,24 @@ public void testPeekMessage() throws Exception {
Assert.assertEquals(messages.size(), 1);
Assert.assertEquals(messages.get(0).getData(), "hello".getBytes());
}

@Test(timeOut = 20000)
public void testGetLastMessageId() throws Exception {
final String topic = "persistent://prop/ns-abc/topic-test";
final String subscription = "my-sub";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
producer.newMessage().value("hello".getBytes()).send();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(subscription)
.subscribe();
System.out.println(consumer.getLastMessageId());
aloyszhang marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand Up @@ -300,8 +300,6 @@ public static MessageImpl<byte[]> deserializeSkipBrokerEntryMetaData(
@SuppressWarnings("unchecked")
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();

Commands.skipBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);

Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata);
msg.payload = headersAndPayloadWithBrokerEntryMetadata;
msg.messageId = null;
Expand Down
Expand Up @@ -483,4 +483,55 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {
fail();
}
}

@Test(timeOut = 30000)
public void testParseMessageMetadataWithBrokerEntryMetadata() {
int MOCK_BATCH_SIZE = 10;
String data = "test-message";
ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));

// first, build a message with broker entry metadata

// build message metadata
MessageMetadata messageMetadata = new MessageMetadata()
.setPublishTime(1)
.setProducerName("test")
.setSequenceId(1);
byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);

// build broker entry metadata
BrokerEntryMetadata brokerMetadata = new BrokerEntryMetadata()
.setIndex(MOCK_BATCH_SIZE - 1);

// build final data which contains broker entry metadata
int brokerMetaSize = brokerMetadata.getSerializedSize();
ByteBuf brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6);
brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
brokerMeta.writeInt(brokerMetaSize);
brokerMetadata.writeTo(brokerMeta);

CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(true, brokerMeta, byteBuf);

CompositeByteBuf dupCompositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
dupCompositeByteBuf.addComponents(true, brokerMeta, byteBuf);

//second, parse message metadata without skip broker entry metadata
try {
Commands.skipChecksumIfPresent(compositeByteBuf);
int metadataSize = (int) compositeByteBuf.readUnsignedInt();
MessageMetadata md = new MessageMetadata();
md.parseFrom(compositeByteBuf, metadataSize);
Assert.fail("Parse operation should be failed.");
} catch (Exception e) {
// expected
}
aloyszhang marked this conversation as resolved.
Show resolved Hide resolved

//third, parse message metadata with skip broker entry metadata first
MessageMetadata metadata = Commands.parseMessageMetadata(dupCompositeByteBuf);
assertEquals(metadata.getPublishTime(), 1);
assertEquals(metadata.getProducerName(), "test");
assertEquals(metadata.getSequenceId(), 1);
}
}
Expand Up @@ -418,6 +418,9 @@ public static MessageMetadata parseMessageMetadata(ByteBuf buffer) {
}

public static void parseMessageMetadata(ByteBuf buffer, MessageMetadata msgMetadata) {
// initially reader-index may point to start of broker entry metadata :
// increment reader-index to start_of_headAndPayload to parse metadata
skipBrokerEntryMetadataIfExist(buffer);
// initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata
// to parse metadata
skipChecksumIfPresent(buffer);
Expand Down Expand Up @@ -1667,7 +1670,6 @@ public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, St
try {
// save the reader index and restore after parsing
int readerIdx = metadataAndPayload.readerIndex();
skipBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.readerIndex(readerIdx);

Expand All @@ -1682,7 +1684,6 @@ public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, St
public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) {
try {
int readerIdx = metadataAndPayload.readerIndex();
skipBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.readerIndex(readerIdx);
if (metadata.hasOrderingKey()) {
Expand Down