Skip to content

Commit

Permalink
Expose broker entry metadata and deliverAtTime to peekMessages/getMes…
Browse files Browse the repository at this point in the history
…sageById/examineMessage (apache#11279)
  • Loading branch information
aloyszhang authored and ciaocloud committed Oct 16, 2021
1 parent e7676ad commit 1285d1c
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 13 deletions.
Expand Up @@ -90,6 +90,7 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeyValue;
Expand Down Expand Up @@ -2494,19 +2495,32 @@ private Response generateResponseWithEntry(Entry entry) throws IOException {
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();

BrokerEntryMetadata brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);

ResponseBuilder responseBuilder = Response.ok();
responseBuilder.header("X-Pulsar-Message-ID", pos.toString());
for (KeyValue keyValue : metadata.getPropertiesList()) {
responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue());
}
if (brokerEntryMetadata != null) {
if (brokerEntryMetadata.hasBrokerTimestamp()) {
responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-timestamp",
DateFormatter.format(brokerEntryMetadata.getBrokerTimestamp()));
}
if (brokerEntryMetadata.hasIndex()) {
responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-index", brokerEntryMetadata.getIndex());
}
}
if (metadata.hasPublishTime()) {
responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime()));
}
if (metadata.hasEventTime()) {
responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime()));
}
if (metadata.hasDeliverAtTime()) {
responseBuilder.header("X-Pulsar-deliver-at-time", DateFormatter.format(metadata.getDeliverAtTime()));
}
if (metadata.hasNumMessagesInBatch()) {
responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
responseBuilder.header("X-Pulsar-batch-size", metadataAndPayload.readableBytes()
Expand Down
Expand Up @@ -25,6 +25,9 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.assertj.core.util.Sets;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand All @@ -38,6 +41,7 @@
@Test(groups = "broker")
public class BrokerEntryMetadataE2ETest extends BrokerTestBase {


@DataProvider(name = "subscriptionTypes")
public static Object[] subscriptionTypes() {
return new Object[] {
Expand Down Expand Up @@ -97,17 +101,98 @@ public void testProduceAndConsume(SubscriptionType subType) throws Exception {
public void testPeekMessage() throws Exception {
final String topic = newTopicName();
final String subscription = "my-sub";
final long eventTime= 200;
final long deliverAtTime = 300;

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
producer.newMessage().value("hello".getBytes()).send();

long sendTime = System.currentTimeMillis();
producer.newMessage()
.eventTime(eventTime)
.deliverAt(deliverAtTime)
.value("hello".getBytes())
.send();

admin.topics().createSubscription(topic, subscription, MessageId.earliest);
final List<Message<byte[]>> messages = admin.topics().peekMessages(topic, subscription, 1);
Assert.assertEquals(messages.size(), 1);
Assert.assertEquals(messages.get(0).getData(), "hello".getBytes());
MessageImpl message = (MessageImpl) messages.get(0);
Assert.assertEquals(message.getData(), "hello".getBytes());
Assert.assertEquals(message.getEventTime(), eventTime);
Assert.assertEquals(message.getDeliverAtTime(), deliverAtTime);
Assert.assertTrue(message.getPublishTime() >= sendTime);

BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata();
Assert.assertEquals(entryMetadata.getIndex(), 0);
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
}

@Test(timeOut = 20000)
public void testGetMessageById() throws Exception {
final String topic = newTopicName();
final String subscription = "my-sub";
final long eventTime= 200;
final long deliverAtTime = 300;

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();

long sendTime = System.currentTimeMillis();
MessageIdImpl messageId = (MessageIdImpl) producer.newMessage()
.eventTime(eventTime)
.deliverAt(deliverAtTime)
.value("hello".getBytes())
.send();

admin.topics().createSubscription(topic, subscription, MessageId.earliest);
MessageImpl message = (MessageImpl) admin.topics()
.getMessageById(topic, messageId.getLedgerId(), messageId.getEntryId());
Assert.assertEquals(message.getData(), "hello".getBytes());
Assert.assertEquals(message.getEventTime(), eventTime);
Assert.assertEquals(message.getDeliverAtTime(), deliverAtTime);
Assert.assertTrue(message.getPublishTime() >= sendTime);

BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata();
Assert.assertEquals(entryMetadata.getIndex(), 0);
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
}


@Test(timeOut = 20000)
public void testExamineMessage() throws Exception {
final String topic = newTopicName();
final String subscription = "my-sub";
final long eventTime= 200;
final long deliverAtTime = 300;

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();

long sendTime = System.currentTimeMillis();
producer.newMessage()
.eventTime(eventTime)
.deliverAt(deliverAtTime)
.value("hello".getBytes())
.send();

admin.topics().createSubscription(topic, subscription, MessageId.earliest);
MessageImpl message =
(MessageImpl) admin.topics().examineMessage(topic, "earliest", 1);
Assert.assertEquals(message.getData(), "hello".getBytes());
Assert.assertEquals(message.getEventTime(), eventTime);
Assert.assertEquals(message.getDeliverAtTime(), deliverAtTime);
Assert.assertTrue(message.getPublishTime() >= sendTime);

BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata();
Assert.assertEquals(entryMetadata.getIndex(), 0);
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
}

@Test(timeOut = 20000)
Expand Down
Expand Up @@ -58,6 +58,7 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
Expand All @@ -84,6 +85,7 @@
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -95,6 +97,10 @@ public class TopicsImpl extends BaseResource implements Topics {
private static final String BATCH_SIZE_HEADER = "X-Pulsar-batch-size";
private static final String MESSAGE_ID = "X-Pulsar-Message-ID";
private static final String PUBLISH_TIME = "X-Pulsar-publish-time";
private static final String EVENT_TIME = "X-Pulsar-event-time";
private static final String DELIVER_AT_TIME = "X-Pulsar-deliver-at-time";
private static final String BROKER_ENTRY_TIMESTAMP = "X-Pulsar-Broker-Entry-METADATA-timestamp";
private static final String BROKER_ENTRY_INDEX = "X-Pulsar-Broker-Entry-METADATA-index";
// CHECKSTYLE.ON: MemberName

public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
Expand Down Expand Up @@ -1504,6 +1510,24 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response
}

String msgId = response.getHeaderString(MESSAGE_ID);

// build broker entry metadata if exist
String brokerEntryTimestamp = response.getHeaderString(BROKER_ENTRY_TIMESTAMP);
String brokerEntryIndex = response.getHeaderString(BROKER_ENTRY_INDEX);
BrokerEntryMetadata brokerEntryMetadata;
if (brokerEntryTimestamp == null && brokerEntryIndex == null) {
brokerEntryMetadata = null;
} else {
brokerEntryMetadata = new BrokerEntryMetadata();
if (brokerEntryTimestamp != null) {
brokerEntryMetadata.setBrokerTimestamp(DateFormatter.parse(brokerEntryTimestamp.toString()));
}

if (brokerEntryIndex != null) {
brokerEntryMetadata.setIndex(Long.parseLong(brokerEntryIndex));
}
}

MessageMetadata messageMetadata = new MessageMetadata();
try (InputStream stream = (InputStream) response.getEntity()) {
byte[] data = new byte[stream.available()];
Expand All @@ -1513,7 +1537,17 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response
MultivaluedMap<String, Object> headers = response.getHeaders();
Object tmp = headers.getFirst(PUBLISH_TIME);
if (tmp != null) {
properties.put("publish-time", (String) tmp);
messageMetadata.setPublishTime(DateFormatter.parse(tmp.toString()));
}

tmp = headers.getFirst(EVENT_TIME);
if (tmp != null) {
messageMetadata.setEventTime(DateFormatter.parse(tmp.toString()));
}

tmp = headers.getFirst(DELIVER_AT_TIME);
if (tmp != null) {
messageMetadata.setDeliverAtTime(DateFormatter.parse(tmp.toString()));
}

tmp = headers.getFirst("X-Pulsar-null-value");
Expand Down Expand Up @@ -1546,16 +1580,21 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response
}

if (!isEncrypted && response.getHeaderString(BATCH_HEADER) != null) {
return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata);
return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata, brokerEntryMetadata);
}

return Collections.singletonList(new MessageImpl<byte[]>(topic, msgId, properties,
Unpooled.wrappedBuffer(data), Schema.BYTES, messageMetadata));
MessageImpl message = new MessageImpl(topic, msgId, properties,
Unpooled.wrappedBuffer(data), Schema.BYTES, messageMetadata);
if (brokerEntryMetadata != null) {
message.setBrokerEntryMetadata(brokerEntryMetadata);
}
return Collections.singletonList(message);
}
}

private List<Message<byte[]>> getIndividualMsgsFromBatch(String topic, String msgId, byte[] data,
Map<String, String> properties, MessageMetadata msgMetadataBuilder) {
Map<String, String> properties, MessageMetadata msgMetadataBuilder,
BrokerEntryMetadata brokerEntryMetadata) {
List<Message<byte[]>> ret = new ArrayList<>();
int batchSize = Integer.parseInt(properties.get(BATCH_HEADER));
ByteBuf buf = Unpooled.wrappedBuffer(data);
Expand All @@ -1570,8 +1609,12 @@ private List<Message<byte[]>> getIndividualMsgsFromBatch(String topic, String ms
properties.put(entry.getKey(), entry.getValue());
}
}
ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload,
Schema.BYTES, msgMetadataBuilder));
MessageImpl message = new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload,
Schema.BYTES, msgMetadataBuilder);
if (brokerEntryMetadata != null) {
message.setBrokerEntryMetadata(brokerEntryMetadata);
}
ret.add(message);
} catch (Exception ex) {
log.error("Exception occurred while trying to get BatchMsgId: {}", batchMsgId, ex);
}
Expand Down
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
Expand Down Expand Up @@ -837,18 +838,36 @@ void run() throws PulsarAdminException {
List<Message<byte[]>> messages = getTopics().peekMessages(persistentTopic, subName, numMessages);
int position = 0;
for (Message<byte[]> msg : messages) {
MessageImpl message = (MessageImpl) msg;
if (++position != 1) {
System.out.println("-------------------------------------------------------------------------\n");
}
if (msg.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId();
if (message.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex());
} else {
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
}
if (msg.getProperties().size() > 0) {
System.out.println("Tenants:");

System.out.println("Publish time: " + message.getPublishTime());
System.out.println("Event time: " + message.getEventTime());

if (message.getDeliverAtTime() != 0) {
System.out.println("Deliver at time: " + message.getDeliverAtTime());
}

if (message.getBrokerEntryMetadata() != null) {
if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) {
System.out.println("Broker entry metadata timestamp: " + message.getBrokerEntryMetadata().getBrokerTimestamp());
}
if (message.getBrokerEntryMetadata().hasIndex()) {
System.out.println("Broker entry metadata index: " + message.getBrokerEntryMetadata().getIndex());
}
}

if (message.getProperties().size() > 0) {
System.out.println("Properties:");
print(msg.getProperties());
}
ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
Expand Down
Expand Up @@ -335,6 +335,13 @@ public long getEventTime() {
return 0;
}

public long getDeliverAtTime() {
if (msgMetadata.hasDeliverAtTime()) {
return msgMetadata.getDeliverAtTime();
}
return 0;
}

public boolean isExpired(int messageTTLInSeconds) {
return messageTTLInSeconds != 0 && (brokerEntryMetadata == null || !brokerEntryMetadata.hasBrokerTimestamp()
? (System.currentTimeMillis() >
Expand Down

0 comments on commit 1285d1c

Please sign in to comment.