Skip to content

Commit

Permalink
[fix][broker] Disable system topic message deduplication (#22582)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao authored and Technoboy- committed May 8, 2024
1 parent 2c92ae3 commit b6f464e
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 9 deletions.
Expand Up @@ -213,6 +213,16 @@ CompletableFuture<Void> close(

void checkCursorsToCacheEntries();

/**
* Indicate if the current topic enabled server side deduplication.
* This is a dynamic configuration, user may update it by namespace/topic policies.
*
* @return whether enabled server side deduplication
*/
default boolean isDeduplicationEnabled() {
return false;
}

void checkDeduplicationSnapshot();

void checkMessageExpiry();
Expand Down
Expand Up @@ -217,7 +217,7 @@ public Status getStatus() {
* returning a future to track the completion of the task
*/
public CompletableFuture<Void> checkStatus() {
boolean shouldBeEnabled = isDeduplicationEnabled();
boolean shouldBeEnabled = topic.isDeduplicationEnabled();
synchronized (this) {
if (status == Status.Recovering || status == Status.Removing) {
// If there's already a transition happening, check later for status
Expand Down Expand Up @@ -472,10 +472,6 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}, null);
}

private boolean isDeduplicationEnabled() {
return topic.getHierarchyTopicPolicies().getDeduplicationEnabled().get();
}

/**
* Topic will call this method whenever a producer connects.
*/
Expand Down
Expand Up @@ -2146,10 +2146,6 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
return future;
}

public boolean isDeduplicationEnabled() {
return messageDeduplication.isEnabled();
}

@Override
public int getNumberOfConsumers() {
int count = 0;
Expand Down Expand Up @@ -4080,6 +4076,10 @@ public boolean isMigrated() {
return ledger.isMigrated();
}

public boolean isDeduplicationEnabled() {
return getHierarchyTopicPolicies().getDeduplicationEnabled().get();
}

public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String subName) {
return this.subscriptions.get(subName).getTransactionInPendingAckStats(txnID);
}
Expand All @@ -4104,4 +4104,5 @@ public long getLastDataMessagePublishedTimestamp() {
public Optional<TopicName> getShadowSourceTopic() {
return Optional.ofNullable(shadowSourceTopic);
}

}
Expand Up @@ -80,6 +80,22 @@ public boolean isCompactionEnabled() {
return !NamespaceService.isHeartbeatNamespace(TopicName.get(topic));
}

@Override
public boolean isDeduplicationEnabled() {
/*
Disable deduplication on system topic to avoid recovering deduplication WAL
(especially from offloaded topic).
Because the system topic usually is a precondition of other topics. therefore,
we should pay attention on topic loading time.
Note: If the system topic loading timeout may cause dependent topics to fail to run.
Dependency diagram: normal topic --rely on--> system topic --rely on--> deduplication recover
--may rely on--> (tiered storage)
*/
return false;
}

@Override
public boolean isEncryptionRequired() {
// System topics are only written by the broker that can't know the encryption context.
Expand Down
Expand Up @@ -37,6 +37,8 @@
import io.netty.channel.EventLoopGroup;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -50,9 +52,11 @@
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
Expand Down Expand Up @@ -490,4 +494,32 @@ public void testMessageDeduplication() throws Exception {
messageDeduplication.purgeInactiveProducers();
assertTrue(messageDeduplication.getInactiveProducers().isEmpty());
}


@Test
public void testMessageDeduplicationShouldNotWorkForSystemTopic() throws PulsarAdminException {
final String localName = UUID.randomUUID().toString();
final String namespace = "prop/ns-abc";
final String prefix = "persistent://%s/".formatted(namespace);
final String topic = prefix + localName;
admin.topics().createNonPartitionedTopic(topic);

// broker level policies
final String eventSystemTopic = prefix + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
final Optional<Topic> optionalTopic = pulsar.getBrokerService().getTopic(eventSystemTopic, true).join();
assertTrue(optionalTopic.isPresent());
final Topic ptRef = optionalTopic.get();
assertTrue(ptRef.isSystemTopic());
assertFalse(ptRef.isDeduplicationEnabled());

// namespace level policies
admin.namespaces().setDeduplicationStatus(namespace, true);
assertTrue(ptRef.isSystemTopic());
assertFalse(ptRef.isDeduplicationEnabled());

// topic level policies
admin.topicPolicies().setDeduplicationStatus(eventSystemTopic, true);
assertTrue(ptRef.isSystemTopic());
assertFalse(ptRef.isDeduplicationEnabled());
}
}

0 comments on commit b6f464e

Please sign in to comment.