Skip to content

Commit

Permalink
[Authorization] Support UNSUBSCRIBE namespace op after enable auth (#…
Browse files Browse the repository at this point in the history
…12742)

### Motivation
Currently, we can `unsubscribe` the given subscription on all topics on a namespace through `bin/pulsar-admin namespaces unsubscribe -s sub tn1/ns1`. However, role(not super-user or administrator) with `consume` auth action for namespace cannot perform `unsubscribe` operation when enable auth.

The root of the problem is that `PulsarAuthorizationProvider` lacks support for namespace operation `UNSUBSCRIBE` when verifying the role's authorization, code as below:
https://github.com/apache/pulsar/blob/8cae63557a318240e95697f382b4f61c22b70d64/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java#L1667-L1669
https://github.com/apache/pulsar/blob/8cae63557a318240e95697f382b4f61c22b70d64/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java#L522-L536

The purpose of this PR is to support that role with `consume` namespace authorization could `unsubscribe` subscriptions on a namespace.

(cherry picked from commit 8926631)
  • Loading branch information
yuruguo authored and codelipenghui committed Nov 18, 2021
1 parent 0d2022d commit 2ab4dec
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName nam
isAuthorizedFuture = allowTheSpecifiedActionOpsAsync(namespaceName, role, authData, AuthAction.packages);
break;
case GET_TOPICS:
case UNSUBSCRIBE:
isAuthorizedFuture = allowConsumeOpsAsync(namespaceName, role, authData);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@

import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -51,6 +54,7 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TenantOperation;
Expand Down Expand Up @@ -174,6 +178,7 @@ public void testSubscriberPermission() throws Exception {
final String tenantRole = "tenant-role";
final String subscriptionRole = "sub1-role";
final String subscriptionName = "sub1";
final String subscriptionName2 = "sub2";
final String namespace = "my-property/my-ns-sub-auth";
final String topicName = "persistent://" + namespace + "/my-topic";
Authentication adminAuthentication = new ClientAuthentication("superUser");
Expand Down Expand Up @@ -201,7 +206,18 @@ public void testSubscriberPermission() throws Exception {
superAdmin.tenants().createTenant("my-property",
new TenantInfoImpl(Sets.newHashSet(tenantRole), Sets.newHashSet("test")));
superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
tenantAdmin.namespaces().grantPermissionOnNamespace(namespace, subscriptionRole,

// subscriptionRole doesn't have topic-level authorization, so it will fail to get topic stats-internal info
try {
sub1Admin.topics().getInternalStats(topicName, true);
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith(
"Unauthorized to validateTopicOperation for operation [GET_STATS]"));
}

// grant topic consume authorization to the subscriptionRole
tenantAdmin.topics().grantPermission(topicName, subscriptionRole,
Collections.singleton(AuthAction.consume));

replacePulsarClient(PulsarClient.builder()
Expand All @@ -211,7 +227,17 @@ public void testSubscriberPermission() throws Exception {
// (1) Create subscription name
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName2)
.subscribe();
consumer.close();
consumer2.close();

List<String> subscriptions = sub1Admin.topics().getSubscriptions(topicName);
assertEquals(subscriptions.size(), 2);

// now, subscriptionRole have consume authorization on topic, so it will successfully get topic internal stats
PersistentTopicInternalStats internalStats = superAdmin.topics().getInternalStats(topicName, true);
assertNotNull(internalStats);

// verify tenant is able to perform all subscription-admin api
tenantAdmin.topics().skipAllMessages(topicName, subscriptionName);
Expand All @@ -227,10 +253,24 @@ public void testSubscriberPermission() throws Exception {
tenantAdmin.topics().resetCursor(topicName, subscriptionName, 10);
tenantAdmin.topics().resetCursor(topicName, subscriptionName, MessageId.earliest);

// subscriptionRole doesn't have namespace-level authorization, so it will fail to unsubscribe namespace
try {
sub1Admin.namespaces().unsubscribeNamespace(namespace, subscriptionName2);
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith(
"Unauthorized to validateNamespaceOperation for operation [UNSUBSCRIBE]"));
}

// grant namespace-level authorization to the subscriptionRole
tenantAdmin.namespaces().grantPermissionOnNamespace(namespace, subscriptionRole,
Collections.singleton(AuthAction.consume));

// now, subscriptionRole have consume authorization on namespace, so it will successfully unsubscribe namespace
superAdmin.namespaces().unsubscribeNamespaceBundle(namespace, "0x00000000_0xffffffff", subscriptionName2);
subscriptions = sub1Admin.topics().getSubscriptions(topicName);
assertEquals(subscriptions.size(), 1);

// subscriptionRole has namespace-level authorization
sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);

Expand Down

This file was deleted.

0 comments on commit 2ab4dec

Please sign in to comment.