Skip to content

Commit

Permalink
Fix using partitioned topic name to get Policy (#11294)
Browse files Browse the repository at this point in the history
### Motivation
In the master branch, the REST API no longer allows the topic name of the partition to be used to set the topic policy, but there are still many places where it will be used internally.

Suppose we set a Topic policy for `persistent://tenant/namespace/topic`
However, the policy cannot be obtained through `persistent://tenant/namespace/topic-partition-0`, which causes the policy to become invalid.

For example:PersistentTopic.checkSubscriptionTypesEnable

### Modifications
Convert the name in SystemTopicBasedTopicPoliciesService
  • Loading branch information
315157973 committed Jul 14, 2021
1 parent 5ea2e77 commit 35d29b9
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 29 deletions.
Expand Up @@ -1303,12 +1303,8 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
OffloadPoliciesImpl topicLevelOffloadPolicies = null;

if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
TopicName cloneTopicName = topicName;
if (topicName.isPartitioned()) {
cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
}
try {
TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName);
TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
if (topicPolicies != null) {
persistencePolicies = topicPolicies.getPersistence();
retentionPolicies = topicPolicies.getRetentionPolicies();
Expand Down Expand Up @@ -2573,12 +2569,8 @@ public Optional<TopicPolicies> getTopicPolicies(TopicName topicName) {
if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
return Optional.empty();
}
TopicName cloneTopicName = topicName;
if (topicName.isPartitioned()) {
cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
}
try {
return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName));
return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName));
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic {} policies have not been initialized yet.", topicName.getPartitionedTopicName());
return Optional.empty();
Expand Down
Expand Up @@ -91,7 +91,7 @@ public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, Top
.domain(topicName.getDomain().toString())
.tenant(topicName.getTenant())
.namespace(topicName.getNamespaceObject().getLocalName())
.topic(topicName.getLocalName())
.topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName())
.policies(policies)
.build())
.build()).whenComplete(((messageId, e) -> {
Expand Down Expand Up @@ -147,7 +147,7 @@ public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesC
&& !policyCacheInitMap.get(topicName.getNamespaceObject())) {
throw new TopicPoliciesCacheNotInitException();
}
return policiesCache.get(topicName);
return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
}

@Override
Expand Down
Expand Up @@ -18,8 +18,23 @@
*/
package org.apache.pulsar.broker.admin;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
Expand All @@ -42,6 +57,7 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand All @@ -61,23 +77,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

@Slf4j
@Test(groups = "broker")
public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
Expand Down Expand Up @@ -2144,6 +2143,30 @@ public void testAutoCreationDisabled() throws Exception {
assertNull(admin.topics().getMessageTTL(topic));
}

@Test
public void testSubscriptionTypesWithPartitionedTopic() throws Exception {
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 1);
pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe().close();
Awaitility.await()
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
Set<SubscriptionType> subscriptionTypeSet = new HashSet<>();
subscriptionTypeSet.add(SubscriptionType.Key_Shared);
admin.topics().setSubscriptionTypesEnabled(topic, subscriptionTypeSet);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getSubscriptionTypesEnabled(topic)));

PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
.getTopicReference(TopicName.get(topic).getPartition(0).toString()).get();
Set<String> old = new HashSet<>(pulsar.getConfiguration().getSubscriptionTypesEnabled());
try {
pulsar.getConfiguration().getSubscriptionTypesEnabled().clear();
assertTrue(persistentTopic.checkSubscriptionTypesEnable(CommandSubscribe.SubType.Key_Shared));
} finally {
//restore
pulsar.getConfiguration().getSubscriptionTypesEnabled().addAll(old);
}
}

@Test(timeOut = 30000)
public void testSubscriptionTypesEnabled() throws Exception {
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
Expand Down

0 comments on commit 35d29b9

Please sign in to comment.