Skip to content

Commit

Permalink
Print message metadata when getting message by id (#11092)
Browse files Browse the repository at this point in the history
```
Batch Message ID: 10:1:0
Properties:
"X-Pulsar-batch-size    26825"
"X-Pulsar-num-batch-message    26"
"publish-time    2021-06-25T16:00:40.919+08:00"
```
  • Loading branch information
codelipenghui committed Jun 25, 2021
1 parent b71bc65 commit cadf59d
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
Expand Up @@ -2458,6 +2458,8 @@ private Response generateResponseWithEntry(Entry entry) throws IOException {
}
if (metadata.hasNumMessagesInBatch()) {
responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
responseBuilder.header("X-Pulsar-batch-size", metadataAndPayload.readableBytes()
- metadata.getSerializedSize());
}
if (metadata.hasNullValue()) {
responseBuilder.header("X-Pulsar-null-value", metadata.isNullValue());
Expand Down
Expand Up @@ -92,6 +92,7 @@ public class TopicsImpl extends BaseResource implements Topics {
private final WebTarget adminV2Topics;
// CHECKSTYLE.OFF: MemberName
private static final String BATCH_HEADER = "X-Pulsar-num-batch-message";
private static final String BATCH_SIZE_HEADER = "X-Pulsar-batch-size";
private static final String MESSAGE_ID = "X-Pulsar-Message-ID";
private static final String PUBLISH_TIME = "X-Pulsar-publish-time";
// CHECKSTYLE.ON: MemberName
Expand Down Expand Up @@ -1482,11 +1483,11 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response
messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString()));
}

tmp = headers.getFirst(BATCH_HEADER);
if (response.getHeaderString(BATCH_HEADER) != null) {
properties.put(BATCH_HEADER, (String) tmp);
return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata);
tmp = headers.getFirst(BATCH_SIZE_HEADER);
if (tmp != null) {
properties.put(BATCH_SIZE_HEADER, (String) tmp);
}

for (Entry<String, List<Object>> entry : headers.entrySet()) {
String header = entry.getKey();
if (header.contains("X-Pulsar-PROPERTY-")) {
Expand All @@ -1495,6 +1496,12 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response
}
}

tmp = headers.getFirst(BATCH_HEADER);
if (response.getHeaderString(BATCH_HEADER) != null) {
properties.put(BATCH_HEADER, (String) tmp);
return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata);
}

return Collections.singletonList(new MessageImpl<byte[]>(topic, msgId, properties,
Unpooled.wrappedBuffer(data), Schema.BYTES, messageMetadata));
}
Expand Down
Expand Up @@ -910,6 +910,17 @@ void run() throws PulsarAdminException {
System.out.println("Cannot find any messages based on ledgerId:"
+ ledgerId + " entryId:" + entryId);
} else {
if (message.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex());
} else {
MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
}
if (message.getProperties().size() > 0) {
System.out.println("Properties:");
print(message.getProperties());
}
ByteBuf date = Unpooled.wrappedBuffer(message.getData());
System.out.println(ByteBufUtil.prettyHexDump(date));
}
Expand Down

0 comments on commit cadf59d

Please sign in to comment.