diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ff54668d356e8..7ce7a96c6c3c9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2466,6 +2466,8 @@ private Response generateResponseWithEntry(Entry entry) throws IOException { } if (metadata.hasNumMessagesInBatch()) { responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch()); + responseBuilder.header("X-Pulsar-batch-size", metadataAndPayload.readableBytes() + - metadata.getSerializedSize()); } if (metadata.hasNullValue()) { responseBuilder.header("X-Pulsar-null-value", metadata.isNullValue()); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 5f8cf1527f024..10fb9ad73ddc4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -92,6 +92,7 @@ public class TopicsImpl extends BaseResource implements Topics { private final WebTarget adminV2Topics; // CHECKSTYLE.OFF: MemberName private static final String BATCH_HEADER = "X-Pulsar-num-batch-message"; + private static final String BATCH_SIZE_HEADER = "X-Pulsar-batch-size"; private static final String MESSAGE_ID = "X-Pulsar-Message-ID"; private static final String PUBLISH_TIME = "X-Pulsar-publish-time"; // CHECKSTYLE.ON: MemberName @@ -1482,11 +1483,11 @@ private List> getMessagesFromHttpResponse(String topic, Response messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString())); } - tmp = headers.getFirst(BATCH_HEADER); - if (response.getHeaderString(BATCH_HEADER) != null) { - properties.put(BATCH_HEADER, (String) tmp); - return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata); + tmp = headers.getFirst(BATCH_SIZE_HEADER); + if (tmp != null) { + properties.put(BATCH_SIZE_HEADER, (String) tmp); } + for (Entry> entry : headers.entrySet()) { String header = entry.getKey(); if (header.contains("X-Pulsar-PROPERTY-")) { @@ -1495,6 +1496,12 @@ private List> getMessagesFromHttpResponse(String topic, Response } } + tmp = headers.getFirst(BATCH_HEADER); + if (response.getHeaderString(BATCH_HEADER) != null) { + properties.put(BATCH_HEADER, (String) tmp); + return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata); + } + return Collections.singletonList(new MessageImpl(topic, msgId, properties, Unpooled.wrappedBuffer(data), Schema.BYTES, messageMetadata)); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 64fe0178a5f5b..6eacf743fa77e 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -910,6 +910,17 @@ void run() throws PulsarAdminException { System.out.println("Cannot find any messages based on ledgerId:" + ledgerId + " entryId:" + entryId); } else { + if (message.getMessageId() instanceof BatchMessageIdImpl) { + BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); + System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex()); + } else { + MessageIdImpl msgId = (MessageIdImpl) message.getMessageId(); + System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); + } + if (message.getProperties().size() > 0) { + System.out.println("Properties:"); + print(message.getProperties()); + } ByteBuf date = Unpooled.wrappedBuffer(message.getData()); System.out.println(ByteBufUtil.prettyHexDump(date)); }