diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 6b07d5cff3eb7..099f5b380f68b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -533,6 +533,7 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam break; case GET_TOPICS: case UNSUBSCRIBE: + case CLEAR_BACKLOG: isAuthorizedFuture = allowConsumeOpsAsync(namespaceName, role, authData); break; default: diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index 6fe7a9d15b6c7..b0c9a093cfed5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -293,7 +293,7 @@ public void testSubscriberPermission() throws Exception { Sets.newHashSet(AuthAction.consume, AuthAction.packages)); // now, subscriptionRole have consume authorization on namespace, so it will successfully unsubscribe namespace - superAdmin.namespaces().unsubscribeNamespaceBundle(namespace, "0x00000000_0xffffffff", subscriptionName2); + sub1Admin.namespaces().unsubscribeNamespaceBundle(namespace, "0x00000000_0xffffffff", subscriptionName2); subscriptions = sub1Admin.topics().getSubscriptions(topicName); assertEquals(subscriptions.size(), 1); List topics = sub1Admin.namespaces().getTopics(namespace); @@ -358,6 +358,82 @@ public void testSubscriberPermission() throws Exception { log.info("-- Exiting {} test --", methodName); } + @Test + public void testClearBacklogPermission() throws Exception { + log.info("-- Starting {} test --", methodName); + + conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName()); + setup(); + + final String subscriptionRole = "sub-role"; + final String subscriptionName = "sub1"; + final String namespace = "my-property/my-ns-sub-auth"; + final String topicName = "persistent://" + namespace + "/my-topic"; + Authentication adminAuthentication = new ClientAuthentication("superUser"); + + clientAuthProviderSupportedRoles.add(subscriptionRole); + + @Cleanup + PulsarAdmin superAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(adminAuthentication).build()); + + Authentication subAdminAuthentication = new ClientAuthentication(subscriptionRole); + @Cleanup + PulsarAdmin sub1Admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(subAdminAuthentication).build()); + + superAdmin.clusters().createCluster("test", + ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); + superAdmin.tenants().createTenant("my-property", + new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test"))); + superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); + superAdmin.topics().createPartitionedTopic(topicName, 1); + + // grant topic consume&produce authorization to the subscriptionRole + superAdmin.topics().grantPermission(topicName, subscriptionRole, + Sets.newHashSet(AuthAction.produce, AuthAction.consume)); + replacePulsarClient(PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .authentication(subAdminAuthentication)); + + @Cleanup + Producer batchProducer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .create(); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer().topic(topicName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName(subscriptionName) + .subscribe(); + + CompletableFuture completableFuture = new CompletableFuture<>(); + for (int i = 0; i < 10; i++) { + completableFuture = batchProducer.sendAsync("a".getBytes()); + } + completableFuture.get(); + assertEquals(sub1Admin.topics().getStats(topicName + "-partition-0").getSubscriptions() + .get(subscriptionName).getMsgBacklog(), 10); + + // subscriptionRole doesn't have namespace-level authorization, so it will fail to clear backlog + try { + sub1Admin.namespaces().clearNamespaceBundleBacklog(namespace, "0x00000000_0xffffffff"); + fail("should have failed with authorization exception"); + } catch (Exception e) { + assertTrue(e.getMessage().startsWith( + "Unauthorized to validateNamespaceOperation for operation [CLEAR_BACKLOG]")); + } + + superAdmin.namespaces().grantPermissionOnNamespace(namespace, subscriptionRole, + Sets.newHashSet(AuthAction.consume)); + // now, subscriptionRole have consume authorization on namespace, so it will successfully clear backlog + sub1Admin.namespaces().clearNamespaceBundleBacklog(namespace, "0x00000000_0xffffffff"); + assertEquals(sub1Admin.topics().getStats(topicName + "-partition-0").getSubscriptions() + .get(subscriptionName).getMsgBacklog(), 0); + + log.info("-- Exiting {} test --", methodName); + } + @Test public void testSubscriptionPrefixAuthorization() throws Exception { log.info("-- Starting {} test --", methodName);