Skip to content

Commit

Permalink
[Authorization] Support CLEAR_BACKLOG namespace op after enable auth (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
yuruguo committed Nov 26, 2021
1 parent 877bf3a commit 64af8df
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 1 deletion.
Expand Up @@ -533,6 +533,7 @@ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName nam
break;
case GET_TOPICS:
case UNSUBSCRIBE:
case CLEAR_BACKLOG:
isAuthorizedFuture = allowConsumeOpsAsync(namespaceName, role, authData);
break;
default:
Expand Down
Expand Up @@ -293,7 +293,7 @@ public void testSubscriberPermission() throws Exception {
Sets.newHashSet(AuthAction.consume, AuthAction.packages));

// now, subscriptionRole have consume authorization on namespace, so it will successfully unsubscribe namespace
superAdmin.namespaces().unsubscribeNamespaceBundle(namespace, "0x00000000_0xffffffff", subscriptionName2);
sub1Admin.namespaces().unsubscribeNamespaceBundle(namespace, "0x00000000_0xffffffff", subscriptionName2);
subscriptions = sub1Admin.topics().getSubscriptions(topicName);
assertEquals(subscriptions.size(), 1);
List<String> topics = sub1Admin.namespaces().getTopics(namespace);
Expand Down Expand Up @@ -358,6 +358,82 @@ public void testSubscriberPermission() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testClearBacklogPermission() throws Exception {
log.info("-- Starting {} test --", methodName);

conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
setup();

final String subscriptionRole = "sub-role";
final String subscriptionName = "sub1";
final String namespace = "my-property/my-ns-sub-auth";
final String topicName = "persistent://" + namespace + "/my-topic";
Authentication adminAuthentication = new ClientAuthentication("superUser");

clientAuthProviderSupportedRoles.add(subscriptionRole);

@Cleanup
PulsarAdmin superAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(adminAuthentication).build());

Authentication subAdminAuthentication = new ClientAuthentication(subscriptionRole);
@Cleanup
PulsarAdmin sub1Admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(subAdminAuthentication).build());

superAdmin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
superAdmin.tenants().createTenant("my-property",
new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test")));
superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
superAdmin.topics().createPartitionedTopic(topicName, 1);

// grant topic consume&produce authorization to the subscriptionRole
superAdmin.topics().grantPermission(topicName, subscriptionRole,
Sets.newHashSet(AuthAction.produce, AuthAction.consume));
replacePulsarClient(PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(subAdminAuthentication));

@Cleanup
Producer<byte[]> batchProducer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.create();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName(subscriptionName)
.subscribe();

CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
for (int i = 0; i < 10; i++) {
completableFuture = batchProducer.sendAsync("a".getBytes());
}
completableFuture.get();
assertEquals(sub1Admin.topics().getStats(topicName + "-partition-0").getSubscriptions()
.get(subscriptionName).getMsgBacklog(), 10);

// subscriptionRole doesn't have namespace-level authorization, so it will fail to clear backlog
try {
sub1Admin.namespaces().clearNamespaceBundleBacklog(namespace, "0x00000000_0xffffffff");
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith(
"Unauthorized to validateNamespaceOperation for operation [CLEAR_BACKLOG]"));
}

superAdmin.namespaces().grantPermissionOnNamespace(namespace, subscriptionRole,
Sets.newHashSet(AuthAction.consume));
// now, subscriptionRole have consume authorization on namespace, so it will successfully clear backlog
sub1Admin.namespaces().clearNamespaceBundleBacklog(namespace, "0x00000000_0xffffffff");
assertEquals(sub1Admin.topics().getStats(topicName + "-partition-0").getSubscriptions()
.get(subscriptionName).getMsgBacklog(), 0);

log.info("-- Exiting {} test --", methodName);
}

@Test
public void testSubscriptionPrefixAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down

0 comments on commit 64af8df

Please sign in to comment.