diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 96377de5a9492..541a2b9e45f40 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2336,6 +2336,8 @@ private Response generateResponseWithEntry(Entry entry) throws IOException { } if (metadata.hasNumMessagesInBatch()) { responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch()); + responseBuilder.header("X-Pulsar-batch-size", metadataAndPayload.readableBytes() + - metadata.getSerializedSize()); } if (metadata.hasNullValue()) { responseBuilder.header("X-Pulsar-null-value", metadata.hasNullValue()); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 550b7d491f81c..9f2a911e2f416 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -91,9 +91,10 @@ public class TopicsImpl extends BaseResource implements Topics { private final WebTarget adminTopics; private final WebTarget adminV2Topics; // CHECKSTYLE.OFF: MemberName - private final String BATCH_HEADER = "X-Pulsar-num-batch-message"; - private final String MESSAGE_ID = "X-Pulsar-Message-ID"; - private final String PUBLISH_TIME = "X-Pulsar-publish-time"; + private static final String BATCH_HEADER = "X-Pulsar-num-batch-message"; + private static final String BATCH_SIZE_HEADER = "X-Pulsar-batch-size"; + private static final String MESSAGE_ID = "X-Pulsar-Message-ID"; + private static final String PUBLISH_TIME = "X-Pulsar-publish-time"; // CHECKSTYLE.ON: MemberName public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) { @@ -1437,11 +1438,11 @@ private List> getMessagesFromHttpResponse(String topic, Response messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString())); } - tmp = headers.getFirst(BATCH_HEADER); - if (response.getHeaderString(BATCH_HEADER) != null) { - properties.put(BATCH_HEADER, (String) tmp); - return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata); + tmp = headers.getFirst(BATCH_SIZE_HEADER); + if (tmp != null) { + properties.put(BATCH_SIZE_HEADER, (String) tmp); } + for (Entry> entry : headers.entrySet()) { String header = entry.getKey(); if (header.contains("X-Pulsar-PROPERTY-")) { @@ -1450,6 +1451,12 @@ private List> getMessagesFromHttpResponse(String topic, Response } } + tmp = headers.getFirst(BATCH_HEADER); + if (response.getHeaderString(BATCH_HEADER) != null) { + properties.put(BATCH_HEADER, (String) tmp); + return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata); + } + return Collections.singletonList(new MessageImpl(topic, msgId, properties, Unpooled.wrappedBuffer(data), Schema.BYTES, messageMetadata)); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 65679d10229b8..dbc9ff2da6ea1 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -884,6 +884,17 @@ void run() throws PulsarAdminException { System.out.println("Cannot find any messages based on ledgerId:" + ledgerId + " entryId:" + entryId); } else { + if (message.getMessageId() instanceof BatchMessageIdImpl) { + BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); + System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex()); + } else { + MessageIdImpl msgId = (MessageIdImpl) message.getMessageId(); + System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); + } + if (message.getProperties().size() > 0) { + System.out.println("Properties:"); + print(message.getProperties()); + } ByteBuf date = Unpooled.wrappedBuffer(message.getData()); System.out.println(ByteBufUtil.prettyHexDump(date)); }