From 3b4d2a527500ac0162a7b00b68dde029173e4545 Mon Sep 17 00:00:00 2001 From: Mattison Date: Thu, 25 Apr 2024 15:49:41 +0800 Subject: [PATCH 1/3] [fix][broker] Disable system topic message deduplication --- .../apache/pulsar/broker/service/Topic.java | 10 +++++ .../persistent/MessageDeduplication.java | 2 +- .../service/persistent/PersistentTopic.java | 9 +++-- .../service/persistent/SystemTopic.java | 16 ++++++++ .../persistent/MessageDuplicationTest.java | 37 ++++++++++++++++++- 5 files changed, 68 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index a296052a41191..66718f69e4ab2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -213,6 +213,16 @@ CompletableFuture close( void checkCursorsToCacheEntries(); + /** + * Indicate if the current topic enabled server side deduplication. + * This is a dynamic configuration, user may update it by namespace/topic policies. + * + * @return whether enabled server side deduplication + */ + default boolean isDeduplicationEnabled() { + return false; + } + void checkDeduplicationSnapshot(); void checkMessageExpiry(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index e508661364d74..3f4c171c4383e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -473,7 +473,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { } private boolean isDeduplicationEnabled() { - return topic.getHierarchyTopicPolicies().getDeduplicationEnabled().get(); + return topic.isDeduplicationEnabled(); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 9d6855962ced6..8694882143813 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2040,10 +2040,6 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { return future; } - public boolean isDeduplicationEnabled() { - return messageDeduplication.isEnabled(); - } - @Override public int getNumberOfConsumers() { int count = 0; @@ -4109,6 +4105,10 @@ public boolean isMigrated() { return ledger.isMigrated(); } + public boolean isDeduplicationEnabled() { + return getHierarchyTopicPolicies().getDeduplicationEnabled().get(); + } + public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String subName) { return this.subscriptions.get(subName).getTransactionInPendingAckStats(txnID); } @@ -4143,4 +4143,5 @@ protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) { } return false; } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java index 720ae3c51891e..f2cec2138a3a0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java @@ -80,6 +80,22 @@ public boolean isCompactionEnabled() { return !NamespaceService.isHeartbeatNamespace(TopicName.get(topic)); } + @Override + public boolean isDeduplicationEnabled() { + /* + Disable deduplication on system topic to avoid recovering deduplication WAL + (especially from offloaded topic). + Because the system topic usually is a precondition of other topics. therefore, + we should pay attention on topic loading time. + + Note: If the system topic loading timeout may cause dependent topics to fail to run. + + Dependency diagram: normal topic --rely on--> system topic --rely on--> deduplication recover + --may rely on--> (tiered storage) + */ + return false; + } + @Override public boolean isEncryptionRequired() { // System topics are only written by the broker that can't know the encryption context. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index 402b5c4972ce2..4513c937c33c8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -33,11 +33,16 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; + +import com.fasterxml.jackson.annotation.ObjectIdGenerators; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import java.lang.reflect.Field; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.*; + import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -50,9 +55,11 @@ import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.broker.qos.AsyncTokenBucket; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; @@ -490,4 +497,32 @@ public void testMessageDeduplication() throws Exception { messageDeduplication.purgeInactiveProducers(); assertTrue(messageDeduplication.getInactiveProducers().isEmpty()); } + + + @Test + public void testMessageDeduplicationShouldNotWorkForSystemTopic() throws PulsarAdminException { + final String localName = UUID.randomUUID().toString(); + final String namespace = "prop/ns-abc"; + final String prefix = "persistent://%s/".formatted(namespace); + final String topic = prefix + localName; + admin.topics().createNonPartitionedTopic(topic); + + // broker level policies + final String eventSystemTopic = prefix + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; + final Optional optionalTopic = pulsar.getBrokerService().getTopic(eventSystemTopic, true).join(); + assertTrue(optionalTopic.isPresent()); + final Topic ptRef = optionalTopic.get(); + assertTrue(ptRef.isSystemTopic()); + assertFalse(ptRef.isDeduplicationEnabled()); + + // namespace level policies + admin.namespaces().setDeduplicationStatus(namespace, true); + assertTrue(ptRef.isSystemTopic()); + assertFalse(ptRef.isDeduplicationEnabled()); + + // topic level policies + admin.topicPolicies().setDeduplicationStatus(eventSystemTopic, true); + assertTrue(ptRef.isSystemTopic()); + assertFalse(ptRef.isDeduplicationEnabled()); + } } From 82b5714e98e31d0a65c8c8dc0d70495331e59574 Mon Sep 17 00:00:00 2001 From: Mattison Date: Thu, 25 Apr 2024 15:58:22 +0800 Subject: [PATCH 2/3] fix checkstyle --- .../broker/service/persistent/MessageDuplicationTest.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index 4513c937c33c8..f034717ccf2e3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -33,16 +33,13 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; - -import com.fasterxml.jackson.annotation.ObjectIdGenerators; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import java.lang.reflect.Field; import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.*; - +import java.util.concurrent.ConcurrentHashMap; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; From 53cc8d8472bde676bd2b5770bfe59707f76a828a Mon Sep 17 00:00:00 2001 From: Mattison Date: Thu, 25 Apr 2024 16:00:15 +0800 Subject: [PATCH 3/3] avoid another stack frame --- .../broker/service/persistent/MessageDeduplication.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 3f4c171c4383e..ab3b799093be6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -217,7 +217,7 @@ public Status getStatus() { * returning a future to track the completion of the task */ public CompletableFuture checkStatus() { - boolean shouldBeEnabled = isDeduplicationEnabled(); + boolean shouldBeEnabled = topic.isDeduplicationEnabled(); synchronized (this) { if (status == Status.Recovering || status == Status.Removing) { // If there's already a transition happening, check later for status @@ -472,10 +472,6 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { }, null); } - private boolean isDeduplicationEnabled() { - return topic.isDeduplicationEnabled(); - } - /** * Topic will call this method whenever a producer connects. */