Skip to content

Commit

Permalink
[improve][broker] Copy subscription properties during updating the to…
Browse files Browse the repository at this point in the history
…pic partition number. (#19223)

(cherry picked from commit 253e3e4)
  • Loading branch information
mattisonchao authored and liangyepianzhou committed Feb 7, 2023
1 parent d1e4008 commit 05cbbfd
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4257,7 +4257,8 @@ private CompletableFuture<Void> createSubscriptions(TopicName topicName, int num
final String topicNamePartition = topicName.getPartition(i).toString();
CompletableFuture<Void> future = new CompletableFuture<>();
admin.topics().createSubscriptionAsync(topicNamePartition,
subscription, MessageId.latest, replicated).whenComplete((__, ex) -> {
subscription, MessageId.latest, replicated, ss.getSubscriptionProperties())
.whenComplete((__, ex) -> {
if (ex == null) {
future.complete(null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.Collections;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
Expand All @@ -35,7 +38,9 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -61,6 +66,9 @@ public void setup() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("use"));
admin.tenants().createTenant("prop-xyz", tenantInfo);
admin.namespaces().createNamespace("prop-xyz/use/ns1");

// Setup v2 namespaces
setupDefaultTenantAndNamespace();
}

@AfterMethod(alwaysRun = true)
Expand Down Expand Up @@ -89,7 +97,7 @@ public void testIncrementPartitionsOfTopic() throws Exception {
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 1);

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName("sub-1")
.subscribe();
.subscribe();

admin.topics().updatePartitionedTopic(partitionedTopicName, 2);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2);
Expand All @@ -106,6 +114,36 @@ public void testIncrementPartitionsOfTopic() throws Exception {
consumer.close();
}

@Test
public void testIncrementPartitionsOfTopicWithSubscriptionProperties() throws Exception {
final String partitionedTopicName = UUID.randomUUID()
+ "-testIncrementPartitionsOfTopicWithSubscriptionProperties";

admin.topics().createPartitionedTopic(partitionedTopicName, 1);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 1);

Map<String, String> properties = new HashMap<>();
properties.put("method", "testIncrementPartitionsOfTopic");

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(partitionedTopicName)
.subscriptionName("sub-1")
.subscriptionProperties(properties)
.subscribe();

admin.topics().updatePartitionedTopic(partitionedTopicName, 20);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20);

assertEquals(admin.topics().getSubscriptions(
TopicName.get(partitionedTopicName).getPartition(15).toString()), List.of("sub-1"));
TopicStats stats = admin.topics()
.getStats(TopicName.get(partitionedTopicName).getPartition(15).toString());
Map<String, String> subscriptionProperties = stats.getSubscriptions()
.get("sub-1").getSubscriptionProperties();
Assert.assertEquals(properties, subscriptionProperties);
}

@Test
public void testIncrementPartitionsWithNoSubscriptions() throws Exception {
final String partitionedTopicName =
Expand Down

0 comments on commit 05cbbfd

Please sign in to comment.