Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Disable system topic message deduplication #22582

Merged
merged 3 commits into from May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -213,6 +213,16 @@ CompletableFuture<Void> close(

void checkCursorsToCacheEntries();

/**
* Indicate if the current topic enabled server side deduplication.
* This is a dynamic configuration, user may update it by namespace/topic policies.
*
* @return whether enabled server side deduplication
*/
default boolean isDeduplicationEnabled() {
return false;
}

void checkDeduplicationSnapshot();

void checkMessageExpiry();
Expand Down
Expand Up @@ -217,7 +217,7 @@ public Status getStatus() {
* returning a future to track the completion of the task
*/
public CompletableFuture<Void> checkStatus() {
boolean shouldBeEnabled = isDeduplicationEnabled();
boolean shouldBeEnabled = topic.isDeduplicationEnabled();
synchronized (this) {
if (status == Status.Recovering || status == Status.Removing) {
// If there's already a transition happening, check later for status
Expand Down Expand Up @@ -472,10 +472,6 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}, null);
}

private boolean isDeduplicationEnabled() {
return topic.getHierarchyTopicPolicies().getDeduplicationEnabled().get();
}

/**
* Topic will call this method whenever a producer connects.
*/
Expand Down
Expand Up @@ -2040,10 +2040,6 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
return future;
}

public boolean isDeduplicationEnabled() {
return messageDeduplication.isEnabled();
}

@Override
public int getNumberOfConsumers() {
int count = 0;
Expand Down Expand Up @@ -4109,6 +4105,10 @@ public boolean isMigrated() {
return ledger.isMigrated();
}

public boolean isDeduplicationEnabled() {
return getHierarchyTopicPolicies().getDeduplicationEnabled().get();
}

public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String subName) {
return this.subscriptions.get(subName).getTransactionInPendingAckStats(txnID);
}
Expand Down Expand Up @@ -4143,4 +4143,5 @@ protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) {
}
return false;
}

}
Expand Up @@ -80,6 +80,22 @@ public boolean isCompactionEnabled() {
return !NamespaceService.isHeartbeatNamespace(TopicName.get(topic));
}

@Override
public boolean isDeduplicationEnabled() {
/*
Disable deduplication on system topic to avoid recovering deduplication WAL
(especially from offloaded topic).
Because the system topic usually is a precondition of other topics. therefore,
we should pay attention on topic loading time.

Note: If the system topic loading timeout may cause dependent topics to fail to run.

Dependency diagram: normal topic --rely on--> system topic --rely on--> deduplication recover
--may rely on--> (tiered storage)
*/
return false;
}

@Override
public boolean isEncryptionRequired() {
// System topics are only written by the broker that can't know the encryption context.
Expand Down
Expand Up @@ -37,6 +37,8 @@
import io.netty.channel.EventLoopGroup;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -50,9 +52,11 @@
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
Expand Down Expand Up @@ -490,4 +494,32 @@ public void testMessageDeduplication() throws Exception {
messageDeduplication.purgeInactiveProducers();
assertTrue(messageDeduplication.getInactiveProducers().isEmpty());
}


@Test
public void testMessageDeduplicationShouldNotWorkForSystemTopic() throws PulsarAdminException {
final String localName = UUID.randomUUID().toString();
final String namespace = "prop/ns-abc";
final String prefix = "persistent://%s/".formatted(namespace);
final String topic = prefix + localName;
admin.topics().createNonPartitionedTopic(topic);

// broker level policies
final String eventSystemTopic = prefix + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
final Optional<Topic> optionalTopic = pulsar.getBrokerService().getTopic(eventSystemTopic, true).join();
assertTrue(optionalTopic.isPresent());
final Topic ptRef = optionalTopic.get();
assertTrue(ptRef.isSystemTopic());
assertFalse(ptRef.isDeduplicationEnabled());

// namespace level policies
admin.namespaces().setDeduplicationStatus(namespace, true);
assertTrue(ptRef.isSystemTopic());
assertFalse(ptRef.isDeduplicationEnabled());

// topic level policies
admin.topicPolicies().setDeduplicationStatus(eventSystemTopic, true);
assertTrue(ptRef.isSystemTopic());
assertFalse(ptRef.isDeduplicationEnabled());
}
}