Skip to content

Commit

Permalink
[Issue 10896] add get message id by timestamp (#11139)
Browse files Browse the repository at this point in the history
Fixes #10896

### Motivation

Add getMessageIdByTimestamp in pulsar admin apis

### Modifications

The core searching implementation reuses `org.apache.bookkeeper.mledger.ManagedLedger#asyncFindPosition`.
Add client tool for cmd `pulsar-admin topics get-message-id`
  • Loading branch information
Jason918 committed Jul 5, 2021
1 parent af614ea commit a7e40ea
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 2 deletions.
Expand Up @@ -51,6 +51,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
Expand Down Expand Up @@ -87,6 +88,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
Expand Down Expand Up @@ -2328,6 +2330,59 @@ public void readEntryComplete(Entry entry, Object ctx) {
}
}

protected CompletableFuture<MessageId> internalGetMessageIdByTimestamp(long timestamp, boolean authoritative) {
try {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}

if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
authoritative, false).partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Get message ID by timestamp on a partitioned topic is not allowed, "
+ "please try do it on specific topic partition");
}

validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);

Topic topic = getTopicReference(topicName);
if (!(topic instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Get message ID by timestamp on a non-persistent topic is not allowed");
}

ManagedLedger ledger = ((PersistentTopic) topic).getManagedLedger();
return ledger.asyncFindPosition(entry -> {
MessageImpl<byte[]> msg = null;
try {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
return msg.publishedEarlierThan(timestamp);
} catch (Exception e) {
log.error("[{}] Error deserializing message for message position find", topicName, e);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
return false;
}).thenApply(position -> {
if (position == null) {
return null;
} else {
return new MessageIdImpl(position.getLedgerId(), position.getEntryId(),
topicName.getPartitionIndex());
}
});
} catch (WebApplicationException exception) {
return FutureUtil.failedFuture(exception);
} catch (Exception exception) {
return FutureUtil.failedFuture(new RestException(exception));
}
}

protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
// If the topic name is a partition name, no need to get partition topic metadata again
if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
Expand Down
Expand Up @@ -1514,6 +1514,48 @@ public void getMessageById(
}
}

@GET
@Path("/{tenant}/{namespace}/{topic}/messageid/{timestamp}")
@ApiOperation(value = "Get message ID published at or just after this absolute timestamp (in ms).")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic is not non-partitioned and persistent"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public void getMessageIdByTimestamp(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Specify the timestamp", required = true)
@PathParam("timestamp") long timestamp,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
internalGetMessageIdByTimestamp(timestamp, authoritative)
.thenAccept(messageId -> {
if (messageId == null) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Message ID not found"));
} else {
asyncResponse.resume(messageId);
}
})
.exceptionally(ex -> {
log.error("[{}] Failed to get message ID by timestamp {} from {}",
clientAppId(), timestamp, topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
@Path("{tenant}/{namespace}/{topic}/backlog")
@ApiOperation(value = "Get estimated backlog for offline topic.")
Expand Down
Expand Up @@ -32,13 +32,16 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
Expand All @@ -58,16 +61,17 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
Expand Down Expand Up @@ -796,4 +800,135 @@ public void testSetReplicatedSubscriptionStatus() {
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
}

@Test
public void testGetMessageIdByTimestamp() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Sets.newHashSet("test"));
final String topicName = "persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestamp";
admin.topics().createNonPartitionedTopic(topicName);

AtomicLong publishTime = new AtomicLong(0);
ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.intercept(new ProducerInterceptor() {
@Override
public void close() {

}

@Override
public boolean eligible(Message message) {
return true;
}

@Override
public Message beforeSend(Producer producer, Message message) {
return message;
}

@Override
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId,
Throwable exception) {
publishTime.set(message.getPublishTime());
}
})
.create();

MessageId id1 = producer.send("test1".getBytes());
long publish1 = publishTime.get();

Thread.sleep(10);
MessageId id2 = producer.send("test2".getBytes());
long publish2 = publishTime.get();

Assert.assertTrue(publish1 < publish2);

Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 - 1), id1);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1), id1);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 + 1), id2);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish2), id2);
Assert.assertTrue(admin.topics().getMessageIdByTimestamp(topicName, publish2 + 1)
.compareTo(id2) > 0);
}

@Test
public void testGetBatchMessageIdByTimestamp() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Sets.newHashSet("test"));
final String topicName = "persistent://tenant-xyz/ns-abc/testGetBatchMessageIdByTimestamp";
admin.topics().createNonPartitionedTopic(topicName);

Map<MessageId, Long> publishTimeMap = new ConcurrentHashMap<>();

ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MINUTES)
.batchingMaxMessages(2)
.intercept(new ProducerInterceptor() {
@Override
public void close() {

}

@Override
public boolean eligible(Message message) {
return true;
}

@Override
public Message beforeSend(Producer producer, Message message) {
return message;
}

@Override
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId,
Throwable exception) {
log.info("onSendAcknowledgement, message={}, msgId={},publish_time={},exception={}",
message, msgId, message.getPublishTime(), exception);
publishTimeMap.put(msgId, message.getPublishTime());

}
})
.create();

List<CompletableFuture<MessageId>> idFutureList = new ArrayList<>();
for (int i = 0; i < 4; i++) {
idFutureList.add(producer.sendAsync(new byte[]{(byte) i}));
Thread.sleep(5);
}

List<MessageIdImpl> ids = new ArrayList<>();
for (CompletableFuture<MessageId> future : idFutureList) {
MessageId id = future.get();
ids.add((MessageIdImpl) id);
}

for (MessageIdImpl messageId : ids) {
Assert.assertTrue(publishTimeMap.containsKey(messageId));
log.info("MessageId={},PublishTime={}", messageId, publishTimeMap.get(messageId));
}

//message 0, 1 are in the same batch, as batchingMaxMessages is set to 2.
Assert.assertEquals(ids.get(0).getLedgerId(), ids.get(1).getLedgerId());
MessageIdImpl id1 =
new MessageIdImpl(ids.get(0).getLedgerId(), ids.get(0).getEntryId(), ids.get(0).getPartitionIndex());
long publish1 = publishTimeMap.get(ids.get(0));

Assert.assertEquals(ids.get(2).getLedgerId(), ids.get(3).getLedgerId());
MessageIdImpl id2 =
new MessageIdImpl(ids.get(2).getLedgerId(), ids.get(2).getEntryId(), ids.get(2).getPartitionIndex());
long publish2 = publishTimeMap.get(ids.get(2));


Assert.assertTrue(publish1 < publish2);

Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 - 1), id1);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1), id1);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 + 1), id2);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish2), id2);
Assert.assertTrue(admin.topics().getMessageIdByTimestamp(topicName, publish2 + 1)
.compareTo(id2) > 0);
}
}
Expand Up @@ -1349,6 +1349,28 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
*/
CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId);

/**
* Get message ID published at or just after this absolute timestamp (in ms).
* @param topic
* Topic name
* @param timestamp
* Timestamp
* @return MessageId
* @throws PulsarAdminException
* Unexpected error
*/
MessageId getMessageIdByTimestamp(String topic, long timestamp) throws PulsarAdminException;

/**
* Get message ID published at or just after this absolute timestamp (in ms) asynchronously.
* @param topic
* Topic name
* @param timestamp
* Timestamp
* @return a future that can be used to track when the message ID is returned.
*/
CompletableFuture<MessageId> getMessageIdByTimestampAsync(String topic, long timestamp);

/**
* Create a new subscription on a topic.
*
Expand Down
Expand Up @@ -1215,6 +1215,44 @@ public Message<byte[]> getMessageById(String topic, long ledgerId, long entryId)
}
}

@Override
public CompletableFuture<MessageId> getMessageIdByTimestampAsync(String topic, long timestamp) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "messageid", Long.toString(timestamp));
final CompletableFuture<MessageId> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<MessageIdImpl>() {
@Override
public void completed(MessageIdImpl response) {
future.complete(response);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}


@Override
public MessageId getMessageIdByTimestamp(String topic, long timestamp)
throws PulsarAdminException {
try {
return getMessageIdByTimestampAsync(topic, timestamp).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}



@Override
public void createSubscription(String topic, String subscriptionName, MessageId messageId)
throws PulsarAdminException {
Expand Down

0 comments on commit a7e40ea

Please sign in to comment.