Skip to content

Commit

Permalink
When the Replicator is enabled, no managedLedger is created when upda…
Browse files Browse the repository at this point in the history
…ting the number of partitions (#10910)

Fixes #10673

### Motivation
When updating the number of partitions, we need to update the data in two places in zk:
```
/admin/partitioned-topics
/managed-ledgers/
```

Now we only update the number of partitions in `/admin/partitioned-topics`, so if we do not create a Producer or Consumer, the data obtained in another cluster will be incorrect

### Modifications
1)Try to create managedLedger when updating the number of partitions
2)Ensure that the number of partitions in `/admin/partitioned-topics` is updated every time
  • Loading branch information
315157973 committed Jun 17, 2021
1 parent baedfac commit 202da11
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
Expand Up @@ -422,6 +422,7 @@ protected void internalUpdatePartitionedTopic(int numPartitions,
throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list");
}
try {
tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
createSubscriptions(topicName, numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
Expand All @@ -435,7 +436,7 @@ protected void internalUpdatePartitionedTopic(int numPartitions,
if (!updateLocalTopicOnly) {
CompletableFuture<Void> updatePartition = new CompletableFuture<>();
final String path = ZkAdminPaths.partitionedTopicPath(topicName);
updatePartitionInOtherCluster(numPartitions, clusters).thenAccept((res) -> {
updatePartitionInOtherCluster(numPartitions, clusters).thenRun(() -> {
try {
namespaceResources().getPartitionedTopicResources().setAsync(path, (p) -> {
return new PartitionedTopicMetadata(numPartitions);
Expand Down
Expand Up @@ -36,6 +36,7 @@
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -803,6 +804,42 @@ public void verifyChecksumAfterReplication() throws Exception {
reader2.closeAsync().get();
}

@Test
public void testReplicatorWithPartitionedTopic() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
final String persistentTopicName = "persistent://" + namespace + "/partTopic" + UUID.randomUUID();

admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
// Create partitioned-topic from R1
admin1.topics().createPartitionedTopic(persistentTopicName, 3);
// List partitioned topics from R2
Awaitility.await().untilAsserted(() -> assertNotNull(admin2.topics().getPartitionedTopicList(namespace)));
Awaitility.await().untilAsserted(() -> assertEquals(
admin2.topics().getPartitionedTopicList(namespace).get(0), persistentTopicName));
assertEquals(admin1.topics().getList(namespace).size(), 3);
// List partitioned topics from R3
Awaitility.await().untilAsserted(() -> assertNotNull(admin3.topics().getPartitionedTopicList(namespace)));
Awaitility.await().untilAsserted(() -> assertEquals(
admin3.topics().getPartitionedTopicList(namespace).get(0), persistentTopicName));
// Update partitioned topic from R2
admin2.topics().updatePartitionedTopic(persistentTopicName, 5);
assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 5);
assertEquals(admin2.topics().getList(namespace).size(), 5);
// Update partitioned topic from R3
admin3.topics().updatePartitionedTopic(persistentTopicName, 5);
assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 5);
assertEquals(admin3.topics().getList(namespace).size(), 5);
// Update partitioned topic from R1
admin1.topics().updatePartitionedTopic(persistentTopicName, 6);
assertEquals(admin1.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
assertEquals(admin1.topics().getList(namespace).size(), 6);
assertEquals(admin2.topics().getList(namespace).size(), 6);
assertEquals(admin3.topics().getList(namespace).size(), 6);
}

/**
* It verifies that broker should not start replicator for partitioned-topic (topic without -partition postfix)
*
Expand Down

0 comments on commit 202da11

Please sign in to comment.