diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index a296052a41191..66718f69e4ab2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -213,6 +213,16 @@ CompletableFuture close( void checkCursorsToCacheEntries(); + /** + * Indicate if the current topic enabled server side deduplication. + * This is a dynamic configuration, user may update it by namespace/topic policies. + * + * @return whether enabled server side deduplication + */ + default boolean isDeduplicationEnabled() { + return false; + } + void checkDeduplicationSnapshot(); void checkMessageExpiry(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index e508661364d74..ab3b799093be6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -217,7 +217,7 @@ public Status getStatus() { * returning a future to track the completion of the task */ public CompletableFuture checkStatus() { - boolean shouldBeEnabled = isDeduplicationEnabled(); + boolean shouldBeEnabled = topic.isDeduplicationEnabled(); synchronized (this) { if (status == Status.Recovering || status == Status.Removing) { // If there's already a transition happening, check later for status @@ -472,10 +472,6 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { }, null); } - private boolean isDeduplicationEnabled() { - return topic.getHierarchyTopicPolicies().getDeduplicationEnabled().get(); - } - /** * Topic will call this method whenever a producer connects. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 9d6855962ced6..8694882143813 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2040,10 +2040,6 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { return future; } - public boolean isDeduplicationEnabled() { - return messageDeduplication.isEnabled(); - } - @Override public int getNumberOfConsumers() { int count = 0; @@ -4109,6 +4105,10 @@ public boolean isMigrated() { return ledger.isMigrated(); } + public boolean isDeduplicationEnabled() { + return getHierarchyTopicPolicies().getDeduplicationEnabled().get(); + } + public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String subName) { return this.subscriptions.get(subName).getTransactionInPendingAckStats(txnID); } @@ -4143,4 +4143,5 @@ protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) { } return false; } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java index 720ae3c51891e..f2cec2138a3a0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java @@ -80,6 +80,22 @@ public boolean isCompactionEnabled() { return !NamespaceService.isHeartbeatNamespace(TopicName.get(topic)); } + @Override + public boolean isDeduplicationEnabled() { + /* + Disable deduplication on system topic to avoid recovering deduplication WAL + (especially from offloaded topic). + Because the system topic usually is a precondition of other topics. therefore, + we should pay attention on topic loading time. + + Note: If the system topic loading timeout may cause dependent topics to fail to run. + + Dependency diagram: normal topic --rely on--> system topic --rely on--> deduplication recover + --may rely on--> (tiered storage) + */ + return false; + } + @Override public boolean isEncryptionRequired() { // System topics are only written by the broker that can't know the encryption context. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index 402b5c4972ce2..f034717ccf2e3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -37,6 +37,8 @@ import io.netty.channel.EventLoopGroup; import java.lang.reflect.Field; import java.util.Map; +import java.util.Optional; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -50,9 +52,11 @@ import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.broker.qos.AsyncTokenBucket; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; @@ -490,4 +494,32 @@ public void testMessageDeduplication() throws Exception { messageDeduplication.purgeInactiveProducers(); assertTrue(messageDeduplication.getInactiveProducers().isEmpty()); } + + + @Test + public void testMessageDeduplicationShouldNotWorkForSystemTopic() throws PulsarAdminException { + final String localName = UUID.randomUUID().toString(); + final String namespace = "prop/ns-abc"; + final String prefix = "persistent://%s/".formatted(namespace); + final String topic = prefix + localName; + admin.topics().createNonPartitionedTopic(topic); + + // broker level policies + final String eventSystemTopic = prefix + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; + final Optional optionalTopic = pulsar.getBrokerService().getTopic(eventSystemTopic, true).join(); + assertTrue(optionalTopic.isPresent()); + final Topic ptRef = optionalTopic.get(); + assertTrue(ptRef.isSystemTopic()); + assertFalse(ptRef.isDeduplicationEnabled()); + + // namespace level policies + admin.namespaces().setDeduplicationStatus(namespace, true); + assertTrue(ptRef.isSystemTopic()); + assertFalse(ptRef.isDeduplicationEnabled()); + + // topic level policies + admin.topicPolicies().setDeduplicationStatus(eventSystemTopic, true); + assertTrue(ptRef.isSystemTopic()); + assertFalse(ptRef.isDeduplicationEnabled()); + } }