Skip to content

Commit

Permalink
Print message metadata when getting message by id (#11092)
Browse files Browse the repository at this point in the history
```
Batch Message ID: 10:1:0
Properties:
"X-Pulsar-batch-size    26825"
"X-Pulsar-num-batch-message    26"
"publish-time    2021-06-25T16:00:40.919+08:00"
```

(cherry picked from commit cadf59d)
  • Loading branch information
codelipenghui committed Jun 29, 2021
1 parent 7415036 commit 0622649
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 7 deletions.
Expand Up @@ -2336,6 +2336,8 @@ private Response generateResponseWithEntry(Entry entry) throws IOException {
}
if (metadata.hasNumMessagesInBatch()) {
responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
responseBuilder.header("X-Pulsar-batch-size", metadataAndPayload.readableBytes()
- metadata.getSerializedSize());
}
if (metadata.hasNullValue()) {
responseBuilder.header("X-Pulsar-null-value", metadata.hasNullValue());
Expand Down
Expand Up @@ -91,9 +91,10 @@ public class TopicsImpl extends BaseResource implements Topics {
private final WebTarget adminTopics;
private final WebTarget adminV2Topics;
// CHECKSTYLE.OFF: MemberName
private final String BATCH_HEADER = "X-Pulsar-num-batch-message";
private final String MESSAGE_ID = "X-Pulsar-Message-ID";
private final String PUBLISH_TIME = "X-Pulsar-publish-time";
private static final String BATCH_HEADER = "X-Pulsar-num-batch-message";
private static final String BATCH_SIZE_HEADER = "X-Pulsar-batch-size";
private static final String MESSAGE_ID = "X-Pulsar-Message-ID";
private static final String PUBLISH_TIME = "X-Pulsar-publish-time";
// CHECKSTYLE.ON: MemberName

public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
Expand Down Expand Up @@ -1437,11 +1438,11 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response
messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString()));
}

tmp = headers.getFirst(BATCH_HEADER);
if (response.getHeaderString(BATCH_HEADER) != null) {
properties.put(BATCH_HEADER, (String) tmp);
return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata);
tmp = headers.getFirst(BATCH_SIZE_HEADER);
if (tmp != null) {
properties.put(BATCH_SIZE_HEADER, (String) tmp);
}

for (Entry<String, List<Object>> entry : headers.entrySet()) {
String header = entry.getKey();
if (header.contains("X-Pulsar-PROPERTY-")) {
Expand All @@ -1450,6 +1451,12 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response
}
}

tmp = headers.getFirst(BATCH_HEADER);
if (response.getHeaderString(BATCH_HEADER) != null) {
properties.put(BATCH_HEADER, (String) tmp);
return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata);
}

return Collections.singletonList(new MessageImpl<byte[]>(topic, msgId, properties,
Unpooled.wrappedBuffer(data), Schema.BYTES, messageMetadata));
}
Expand Down
Expand Up @@ -884,6 +884,17 @@ void run() throws PulsarAdminException {
System.out.println("Cannot find any messages based on ledgerId:"
+ ledgerId + " entryId:" + entryId);
} else {
if (message.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex());
} else {
MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
}
if (message.getProperties().size() > 0) {
System.out.println("Properties:");
print(message.getProperties());
}
ByteBuf date = Unpooled.wrappedBuffer(message.getData());
System.out.println(ByteBufUtil.prettyHexDump(date));
}
Expand Down

0 comments on commit 0622649

Please sign in to comment.