Skip to content

Commit

Permalink
[Broker] Remove tenant admin verification when listing partitioned to…
Browse files Browse the repository at this point in the history
…pics (apache#13138)

* [Broker] Remove tenant permission verification when list partitioned-topic

* Improve test

* Update annotation
  • Loading branch information
yuruguo authored and fangxiaobing committed Dec 19, 2021
1 parent 3cbe7d8 commit ddf6312
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 6 deletions.
Expand Up @@ -171,7 +171,6 @@ protected List<String> internalGetList() {
}

protected List<String> internalGetPartitionedTopicList() {
validateAdminAccessForTenant(namespaceName.getTenant());
validateNamespaceOperation(namespaceName, NamespaceOperation.GET_TOPICS);
// Validate that namespace exists, throws 404 if it doesn't exist
try {
Expand Down
Expand Up @@ -69,7 +69,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@Path("/{property}/{cluster}/{namespace}")
@ApiOperation(hidden = true, value = "Get the list of topics under a namespace.",
response = String.class, responseContainer = "List")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin or consume permission on namespace"),
@ApiResponse(code = 404, message = "Namespace doesn't exist")})
public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
Expand All @@ -87,7 +87,7 @@ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("pr
@Path("/{property}/{cluster}/{namespace}/partitioned")
@ApiOperation(hidden = true, value = "Get the list of partitioned topics under a namespace.",
response = String.class, responseContainer = "List")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin or consume permission on namespace"),
@ApiResponse(code = 404, message = "Namespace doesn't exist")})
public List<String> getPartitionedTopicList(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
Expand Down
Expand Up @@ -85,7 +85,7 @@ public class PersistentTopics extends PersistentTopicsBase {
response = String.class, responseContainer = "List")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 403, message = "Don't have admin or consume permission on namespace"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error")})
Expand All @@ -111,7 +111,7 @@ public void getList(
response = String.class, responseContainer = "List")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 403, message = "Don't have admin or consume permission on namespace"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error")})
Expand Down
Expand Up @@ -24,6 +24,7 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
Expand Down Expand Up @@ -365,6 +366,7 @@ public void testClearBacklogPermission() throws Exception {
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
setup();

final String tenantRole = "tenant-role";
final String subscriptionRole = "sub-role";
final String subscriptionName = "sub1";
final String namespace = "my-property/my-ns-sub-auth";
Expand All @@ -377,6 +379,11 @@ public void testClearBacklogPermission() throws Exception {
PulsarAdmin superAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(adminAuthentication).build());

Authentication tenantAdminAuthentication = new ClientAuthentication(tenantRole);
@Cleanup
PulsarAdmin tenantAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(tenantAdminAuthentication).build());

Authentication subAdminAuthentication = new ClientAuthentication(subscriptionRole);
@Cleanup
PulsarAdmin sub1Admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
Expand All @@ -385,9 +392,11 @@ public void testClearBacklogPermission() throws Exception {
superAdmin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
superAdmin.tenants().createTenant("my-property",
new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test")));
new TenantInfoImpl(Sets.newHashSet(tenantRole), Sets.newHashSet("test")));
superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
superAdmin.topics().createPartitionedTopic(topicName, 1);
assertEquals(tenantAdmin.topics().getPartitionedTopicList(namespace),
Lists.newArrayList(topicName));

// grant topic consume&produce authorization to the subscriptionRole
superAdmin.topics().grantPermission(topicName, subscriptionRole,
Expand Down Expand Up @@ -416,6 +425,13 @@ public void testClearBacklogPermission() throws Exception {
.get(subscriptionName).getMsgBacklog(), 10);

// subscriptionRole doesn't have namespace-level authorization, so it will fail to clear backlog
try {
sub1Admin.topics().getPartitionedTopicList(namespace);
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith(
"Unauthorized to validateNamespaceOperation for operation [GET_TOPICS]"));
}
try {
sub1Admin.namespaces().clearNamespaceBundleBacklog(namespace, "0x00000000_0xffffffff");
fail("should have failed with authorization exception");
Expand All @@ -427,6 +443,8 @@ public void testClearBacklogPermission() throws Exception {
superAdmin.namespaces().grantPermissionOnNamespace(namespace, subscriptionRole,
Sets.newHashSet(AuthAction.consume));
// now, subscriptionRole have consume authorization on namespace, so it will successfully clear backlog
assertEquals(sub1Admin.topics().getPartitionedTopicList(namespace),
Lists.newArrayList(topicName));
sub1Admin.namespaces().clearNamespaceBundleBacklog(namespace, "0x00000000_0xffffffff");
assertEquals(sub1Admin.topics().getStats(topicName + "-partition-0").getSubscriptions()
.get(subscriptionName).getMsgBacklog(), 0);
Expand Down

0 comments on commit ddf6312

Please sign in to comment.