From bef46a65245b8f968349ae4cbed1f94264a66ead Mon Sep 17 00:00:00 2001 From: lipenghui Date: Sat, 26 Jun 2021 00:27:00 +0800 Subject: [PATCH] Print message metadata when getting message by id (#11092) ``` Batch Message ID: 10:1:0 Properties: "X-Pulsar-batch-size 26825" "X-Pulsar-num-batch-message 26" "publish-time 2021-06-25T16:00:40.919+08:00" ``` (cherry picked from commit cadf59d18d6f71f1da06f425709bffa1b10fcf11) --- .../admin/impl/PersistentTopicsBase.java | 2 ++ .../client/admin/internal/TopicsImpl.java | 21 ++++++++++++------- .../apache/pulsar/admin/cli/CmdTopics.java | 11 ++++++++++ 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index c794f20d3c7ed..1cdba231fc976 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2458,6 +2458,8 @@ private Response generateResponseWithEntry(Entry entry) throws IOException { } if (metadata.hasNumMessagesInBatch()) { responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch()); + responseBuilder.header("X-Pulsar-batch-size", metadataAndPayload.readableBytes() + - metadata.getSerializedSize()); } if (metadata.hasNullValue()) { responseBuilder.header("X-Pulsar-null-value", metadata.isNullValue()); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 1885f704f416f..877448d1e835c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -91,9 +91,10 @@ public class TopicsImpl extends BaseResource implements Topics { private final WebTarget adminTopics; private final WebTarget adminV2Topics; // CHECKSTYLE.OFF: MemberName - static private final String BATCH_HEADER = "X-Pulsar-num-batch-message"; - static private final String MESSAGE_ID = "X-Pulsar-Message-ID"; - static private final String PUBLISH_TIME = "X-Pulsar-publish-time"; + private static final String BATCH_HEADER = "X-Pulsar-num-batch-message"; + private static final String BATCH_SIZE_HEADER = "X-Pulsar-batch-size"; + private static final String MESSAGE_ID = "X-Pulsar-Message-ID"; + private static final String PUBLISH_TIME = "X-Pulsar-publish-time"; // CHECKSTYLE.ON: MemberName public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) { @@ -1482,11 +1483,11 @@ private List> getMessagesFromHttpResponse(String topic, Response messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString())); } - tmp = headers.getFirst(BATCH_HEADER); - if (response.getHeaderString(BATCH_HEADER) != null) { - properties.put(BATCH_HEADER, (String) tmp); - return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata); + tmp = headers.getFirst(BATCH_SIZE_HEADER); + if (tmp != null) { + properties.put(BATCH_SIZE_HEADER, (String) tmp); } + for (Entry> entry : headers.entrySet()) { String header = entry.getKey(); if (header.contains("X-Pulsar-PROPERTY-")) { @@ -1495,6 +1496,12 @@ private List> getMessagesFromHttpResponse(String topic, Response } } + tmp = headers.getFirst(BATCH_HEADER); + if (response.getHeaderString(BATCH_HEADER) != null) { + properties.put(BATCH_HEADER, (String) tmp); + return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata); + } + return Collections.singletonList(new MessageImpl(topic, msgId, properties, Unpooled.wrappedBuffer(data), Schema.BYTES, messageMetadata)); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index d3e4c2550c6fb..da42686ffc15a 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -912,6 +912,17 @@ void run() throws PulsarAdminException { System.out.println("Cannot find any messages based on ledgerId:" + ledgerId + " entryId:" + entryId); } else { + if (message.getMessageId() instanceof BatchMessageIdImpl) { + BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); + System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex()); + } else { + MessageIdImpl msgId = (MessageIdImpl) message.getMessageId(); + System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); + } + if (message.getProperties().size() > 0) { + System.out.println("Properties:"); + print(message.getProperties()); + } ByteBuf date = Unpooled.wrappedBuffer(message.getData()); System.out.println(ByteBufUtil.prettyHexDump(date)); }