Skip to content

Commit

Permalink
fix non-persistent topic get partitioned metadata error on discovery (#…
Browse files Browse the repository at this point in the history
…10806)

Fixes #10443

### Motivation
fix non-persistent topic get partitioned metadata error if using discovery




(cherry picked from commit 8599229)
  • Loading branch information
aloyszhang authored and codelipenghui committed Jun 25, 2021
1 parent d809104 commit 50da9b2
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(Discover
try {
checkAuthorization(service, topicName, role, authenticationData);
final String path = path(PARTITIONED_TOPIC_PATH_ZNODE,
topicName.getNamespaceObject().toString(), "persistent", topicName.getEncodedLocalName());
topicName.getNamespaceObject().toString(), topicName.getDomain().value(), topicName.getEncodedLocalName());
// gets the number of partitions from the zk cache
pulsarResources.getNamespaceResources().getPartitionedTopicResources().getAsync(path)
.thenAccept(metadata -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,11 @@ protected void simulateStoreError(String string, Code sessionexpired) {
&& path.equals("/admin/partitioned-topics/test/local/ns/persistent/my-topic-2");
});
}

protected void simulateStoreErrorForNonPersistentTopic(String string, Code sessionexpired) {
mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET
&& path.equals("/admin/partitioned-topics/test/local/ns/non-persistent/my-topic-2");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,27 @@ public void testGetPartitionsMetadata() throws Exception {
}
}

@Test
public void testGetPartitionsMetadataForNonPersistentTopic() throws Exception {
TopicName topic1 = TopicName.get("non-persistent://test/local/ns/my-topic-1");

PartitionedTopicMetadata m = service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topic1, "role", null)
.get();
assertEquals(m.partitions, 0);

// Simulate ZK error
simulateStoreErrorForNonPersistentTopic("/admin/partitioned-topics/test/local/ns/non-persistent/my-topic-2", Code.SESSIONEXPIRED);
TopicName topic2 = TopicName.get("non-persistent://test/local/ns/my-topic-2");
CompletableFuture<PartitionedTopicMetadata> future = service.getDiscoveryProvider()
.getPartitionedTopicMetadata(service, topic2, "role", null);
try {
future.get();
fail("Partition metadata lookup should have failed");
} catch (ExecutionException e) {
assertEquals(e.getCause().getClass(), MetadataStoreException.class);
}
}

/**
* It verifies: client connects to Discovery-service and receives discovery response successfully.
*
Expand Down

0 comments on commit 50da9b2

Please sign in to comment.