From ea628541950a2cd02746ef0cfce02528b7824fed Mon Sep 17 00:00:00 2001 From: Aloys Date: Mon, 12 Jul 2021 22:23:54 +0800 Subject: [PATCH] Expose broker entry metadata and deliverAtTime to peekMessages/getMessageById/examineMessage (#11279) --- .../admin/impl/PersistentTopicsBase.java | 14 +++ .../service/BrokerEntryMetadataE2ETest.java | 89 ++++++++++++++++++- .../client/admin/internal/TopicsImpl.java | 57 ++++++++++-- .../apache/pulsar/admin/cli/CmdTopics.java | 27 +++++- .../pulsar/client/impl/MessageImpl.java | 7 ++ 5 files changed, 181 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index a3c4c3e30ce8d..4c137ea5ecc02 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -90,6 +90,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeyValue; @@ -2494,6 +2495,7 @@ private Response generateResponseWithEntry(Entry entry) throws IOException { PositionImpl pos = (PositionImpl) entry.getPosition(); ByteBuf metadataAndPayload = entry.getDataBuffer(); + BrokerEntryMetadata brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(metadataAndPayload); MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); ResponseBuilder responseBuilder = Response.ok(); @@ -2501,12 +2503,24 @@ private Response generateResponseWithEntry(Entry entry) throws IOException { for (KeyValue keyValue : metadata.getPropertiesList()) { responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue()); } + if (brokerEntryMetadata != null) { + if (brokerEntryMetadata.hasBrokerTimestamp()) { + responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-timestamp", + DateFormatter.format(brokerEntryMetadata.getBrokerTimestamp())); + } + if (brokerEntryMetadata.hasIndex()) { + responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-index", brokerEntryMetadata.getIndex()); + } + } if (metadata.hasPublishTime()) { responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime())); } if (metadata.hasEventTime()) { responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime())); } + if (metadata.hasDeliverAtTime()) { + responseBuilder.header("X-Pulsar-deliver-at-time", DateFormatter.format(metadata.getDeliverAtTime())); + } if (metadata.hasNumMessagesInBatch()) { responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch()); responseBuilder.header("X-Pulsar-batch-size", metadataAndPayload.readableBytes() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java index e7d98a829934b..52c8375fa78a3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java @@ -25,6 +25,9 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.assertj.core.util.Sets; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -38,6 +41,7 @@ @Test(groups = "broker") public class BrokerEntryMetadataE2ETest extends BrokerTestBase { + @DataProvider(name = "subscriptionTypes") public static Object[] subscriptionTypes() { return new Object[] { @@ -97,17 +101,98 @@ public void testProduceAndConsume(SubscriptionType subType) throws Exception { public void testPeekMessage() throws Exception { final String topic = newTopicName(); final String subscription = "my-sub"; + final long eventTime= 200; + final long deliverAtTime = 300; @Cleanup Producer producer = pulsarClient.newProducer() .topic(topic) .create(); - producer.newMessage().value("hello".getBytes()).send(); + + long sendTime = System.currentTimeMillis(); + producer.newMessage() + .eventTime(eventTime) + .deliverAt(deliverAtTime) + .value("hello".getBytes()) + .send(); admin.topics().createSubscription(topic, subscription, MessageId.earliest); final List> messages = admin.topics().peekMessages(topic, subscription, 1); Assert.assertEquals(messages.size(), 1); - Assert.assertEquals(messages.get(0).getData(), "hello".getBytes()); + MessageImpl message = (MessageImpl) messages.get(0); + Assert.assertEquals(message.getData(), "hello".getBytes()); + Assert.assertEquals(message.getEventTime(), eventTime); + Assert.assertEquals(message.getDeliverAtTime(), deliverAtTime); + Assert.assertTrue(message.getPublishTime() >= sendTime); + + BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata(); + Assert.assertEquals(entryMetadata.getIndex(), 0); + Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime); + } + + @Test(timeOut = 20000) + public void testGetMessageById() throws Exception { + final String topic = newTopicName(); + final String subscription = "my-sub"; + final long eventTime= 200; + final long deliverAtTime = 300; + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topic) + .create(); + + long sendTime = System.currentTimeMillis(); + MessageIdImpl messageId = (MessageIdImpl) producer.newMessage() + .eventTime(eventTime) + .deliverAt(deliverAtTime) + .value("hello".getBytes()) + .send(); + + admin.topics().createSubscription(topic, subscription, MessageId.earliest); + MessageImpl message = (MessageImpl) admin.topics() + .getMessageById(topic, messageId.getLedgerId(), messageId.getEntryId()); + Assert.assertEquals(message.getData(), "hello".getBytes()); + Assert.assertEquals(message.getEventTime(), eventTime); + Assert.assertEquals(message.getDeliverAtTime(), deliverAtTime); + Assert.assertTrue(message.getPublishTime() >= sendTime); + + BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata(); + Assert.assertEquals(entryMetadata.getIndex(), 0); + Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime); + } + + + @Test(timeOut = 20000) + public void testExamineMessage() throws Exception { + final String topic = newTopicName(); + final String subscription = "my-sub"; + final long eventTime= 200; + final long deliverAtTime = 300; + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topic) + .create(); + + long sendTime = System.currentTimeMillis(); + producer.newMessage() + .eventTime(eventTime) + .deliverAt(deliverAtTime) + .value("hello".getBytes()) + .send(); + + admin.topics().createSubscription(topic, subscription, MessageId.earliest); + MessageImpl message = + (MessageImpl) admin.topics().examineMessage(topic, "earliest", 1); + Assert.assertEquals(message.getData(), "hello".getBytes()); + Assert.assertEquals(message.getEventTime(), eventTime); + Assert.assertEquals(message.getDeliverAtTime(), deliverAtTime); + Assert.assertTrue(message.getPublishTime() >= sendTime); + + BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata(); + Assert.assertEquals(entryMetadata.getIndex(), 0); + Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime); } @Test(timeOut = 20000) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index d122ded01996b..f611358f44ba4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -58,6 +58,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.ResetCursorData; +import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.SingleMessageMetadata; @@ -84,6 +85,7 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.DateFormatter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,6 +97,10 @@ public class TopicsImpl extends BaseResource implements Topics { private static final String BATCH_SIZE_HEADER = "X-Pulsar-batch-size"; private static final String MESSAGE_ID = "X-Pulsar-Message-ID"; private static final String PUBLISH_TIME = "X-Pulsar-publish-time"; + private static final String EVENT_TIME = "X-Pulsar-event-time"; + private static final String DELIVER_AT_TIME = "X-Pulsar-deliver-at-time"; + private static final String BROKER_ENTRY_TIMESTAMP = "X-Pulsar-Broker-Entry-METADATA-timestamp"; + private static final String BROKER_ENTRY_INDEX = "X-Pulsar-Broker-Entry-METADATA-index"; // CHECKSTYLE.ON: MemberName public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) { @@ -1504,6 +1510,24 @@ private List> getMessagesFromHttpResponse(String topic, Response } String msgId = response.getHeaderString(MESSAGE_ID); + + // build broker entry metadata if exist + String brokerEntryTimestamp = response.getHeaderString(BROKER_ENTRY_TIMESTAMP); + String brokerEntryIndex = response.getHeaderString(BROKER_ENTRY_INDEX); + BrokerEntryMetadata brokerEntryMetadata; + if (brokerEntryTimestamp == null && brokerEntryIndex == null) { + brokerEntryMetadata = null; + } else { + brokerEntryMetadata = new BrokerEntryMetadata(); + if (brokerEntryTimestamp != null) { + brokerEntryMetadata.setBrokerTimestamp(DateFormatter.parse(brokerEntryTimestamp.toString())); + } + + if (brokerEntryIndex != null) { + brokerEntryMetadata.setIndex(Long.parseLong(brokerEntryIndex)); + } + } + MessageMetadata messageMetadata = new MessageMetadata(); try (InputStream stream = (InputStream) response.getEntity()) { byte[] data = new byte[stream.available()]; @@ -1513,7 +1537,17 @@ private List> getMessagesFromHttpResponse(String topic, Response MultivaluedMap headers = response.getHeaders(); Object tmp = headers.getFirst(PUBLISH_TIME); if (tmp != null) { - properties.put("publish-time", (String) tmp); + messageMetadata.setPublishTime(DateFormatter.parse(tmp.toString())); + } + + tmp = headers.getFirst(EVENT_TIME); + if (tmp != null) { + messageMetadata.setEventTime(DateFormatter.parse(tmp.toString())); + } + + tmp = headers.getFirst(DELIVER_AT_TIME); + if (tmp != null) { + messageMetadata.setDeliverAtTime(DateFormatter.parse(tmp.toString())); } tmp = headers.getFirst("X-Pulsar-null-value"); @@ -1546,16 +1580,21 @@ private List> getMessagesFromHttpResponse(String topic, Response } if (!isEncrypted && response.getHeaderString(BATCH_HEADER) != null) { - return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata); + return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata, brokerEntryMetadata); } - return Collections.singletonList(new MessageImpl(topic, msgId, properties, - Unpooled.wrappedBuffer(data), Schema.BYTES, messageMetadata)); + MessageImpl message = new MessageImpl(topic, msgId, properties, + Unpooled.wrappedBuffer(data), Schema.BYTES, messageMetadata); + if (brokerEntryMetadata != null) { + message.setBrokerEntryMetadata(brokerEntryMetadata); + } + return Collections.singletonList(message); } } private List> getIndividualMsgsFromBatch(String topic, String msgId, byte[] data, - Map properties, MessageMetadata msgMetadataBuilder) { + Map properties, MessageMetadata msgMetadataBuilder, + BrokerEntryMetadata brokerEntryMetadata) { List> ret = new ArrayList<>(); int batchSize = Integer.parseInt(properties.get(BATCH_HEADER)); ByteBuf buf = Unpooled.wrappedBuffer(data); @@ -1570,8 +1609,12 @@ private List> getIndividualMsgsFromBatch(String topic, String ms properties.put(entry.getKey(), entry.getValue()); } } - ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload, - Schema.BYTES, msgMetadataBuilder)); + MessageImpl message = new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload, + Schema.BYTES, msgMetadataBuilder); + if (brokerEntryMetadata != null) { + message.setBrokerEntryMetadata(brokerEntryMetadata); + } + ret.add(message); } catch (Exception ex) { log.error("Exception occurred while trying to get BatchMsgId: {}", batchMsgId, ex); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 96454da3cc01b..fbb94b2001134 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -837,18 +838,36 @@ void run() throws PulsarAdminException { List> messages = getTopics().peekMessages(persistentTopic, subName, numMessages); int position = 0; for (Message msg : messages) { + MessageImpl message = (MessageImpl) msg; if (++position != 1) { System.out.println("-------------------------------------------------------------------------\n"); } - if (msg.getMessageId() instanceof BatchMessageIdImpl) { - BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId(); + if (message.getMessageId() instanceof BatchMessageIdImpl) { + BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex()); } else { MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); } - if (msg.getProperties().size() > 0) { - System.out.println("Tenants:"); + + System.out.println("Publish time: " + message.getPublishTime()); + System.out.println("Event time: " + message.getEventTime()); + + if (message.getDeliverAtTime() != 0) { + System.out.println("Deliver at time: " + message.getDeliverAtTime()); + } + + if (message.getBrokerEntryMetadata() != null) { + if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) { + System.out.println("Broker entry metadata timestamp: " + message.getBrokerEntryMetadata().getBrokerTimestamp()); + } + if (message.getBrokerEntryMetadata().hasIndex()) { + System.out.println("Broker entry metadata index: " + message.getBrokerEntryMetadata().getIndex()); + } + } + + if (message.getProperties().size() > 0) { + System.out.println("Properties:"); print(msg.getProperties()); } ByteBuf data = Unpooled.wrappedBuffer(msg.getData()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 618aa0888ec10..1af69159b4820 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -335,6 +335,13 @@ public long getEventTime() { return 0; } + public long getDeliverAtTime() { + if (msgMetadata.hasDeliverAtTime()) { + return msgMetadata.getDeliverAtTime(); + } + return 0; + } + public boolean isExpired(int messageTTLInSeconds) { return messageTTLInSeconds != 0 && (brokerEntryMetadata == null || !brokerEntryMetadata.hasBrokerTimestamp() ? (System.currentTimeMillis() >