diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index adc5b2b099434..4d2e3f477bc76 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -59,6 +59,7 @@ import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException; import org.apache.pulsar.client.impl.Backoff; +import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.DispatchRate; @@ -794,6 +795,24 @@ public void initializeDispatchRateLimiterIfNeeded(Optional policies) { } } + @Override + public boolean trackDelayedDelivery(long ledgerId, long entryId, PulsarApi.MessageMetadata msgMetadata) { + if (!topic.isDelayedDeliveryEnabled()) { + // If broker has the feature disabled, always deliver messages immediately + return false; + } + synchronized (this) { + if (!delayedDeliveryTracker.isPresent()) { + // Initialize the tracker the first time we need to use it + delayedDeliveryTracker = Optional + .of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this)); + } + + delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis()); + return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime()); + } + } + protected synchronized Set getMessagesToReplayNow(int maxMessagesToRead) { if (!redeliveryMessages.isEmpty()) { return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);