Skip to content

Commit

Permalink
Make client-admin-api to use interfaces with builders instead of POJOs (
Browse files Browse the repository at this point in the history
#10818)

### Motivation

Instead of using POJOs types in the Java client-admin API, use interfaces 

### Modifications
 * Converted more POJOs into interfaces
 * Added builders to construct instances of the interfaces without directly using the implementation classes

Note: for easier reviewing, there are 2 commits in this PR: 
 * f14ce73 includes the changes to production code
 * 23b6f74 changes to the unit tests
  • Loading branch information
merlimat committed Jun 6, 2021
1 parent bd568bc commit da71ec2
Show file tree
Hide file tree
Showing 302 changed files with 6,239 additions and 3,563 deletions.
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -104,8 +105,12 @@ protected void internalSetUpForNamespace() throws Exception {
admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrlTls.toString())
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(false)
.authentication(AuthenticationTls.class.getName(), authParams).build());
admin.clusters().createCluster(clusterName, new ClusterDataImpl(brokerUrl.toString(), brokerUrlTls.toString(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()));
admin.clusters().createCluster(clusterName, ClusterData.builder()
.serviceUrl(brokerUrl.toString())
.serviceUrlTls(brokerUrlTls.toString())
.brokerServiceUrl(pulsar.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar.getBrokerServiceUrlTls())
.build());
admin.tenants().createTenant("my-property",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/my-ns");
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TenantOperation;
Expand Down Expand Up @@ -78,7 +79,7 @@ default CompletableFuture<Boolean> isSuperUser(String role, ServiceConfiguration
* @return a CompletableFuture containing a boolean in which true means the role is an admin user
* and false if it is not
*/
default CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfoImpl tenantInfo,
default CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo,
AuthenticationDataSource authenticationData) {
return CompletableFuture.completedFuture(role != null && tenantInfo.getAdminRoles() != null && tenantInfo.getAdminRoles().contains(role) ? true : false);
}
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
Expand Down Expand Up @@ -80,7 +81,7 @@ public CompletableFuture<Boolean> isSuperUser(String user, AuthenticationDataSou
return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured"));
}

public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfoImpl tenantInfo,
public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo,
AuthenticationDataSource authenticationData) {
if (provider != null) {
return provider.isTenantAdmin(tenant, role, tenantInfo, authenticationData);
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
Expand Down Expand Up @@ -118,7 +119,8 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
if (isNotBlank(subscription)) {
// validate if role is authorize to access subscription. (skip validatation if authorization
// list is empty)
Set<String> roles = policies.get().auth_policies.subscription_auth_roles.get(subscription);
Set<String> roles = policies.get().auth_policies
.getSubscriptionAuthentication().get(subscription);
if (roles != null && !roles.isEmpty() && !roles.contains(role)) {
log.warn("[{}] is not authorized to subscribe on {}-{}", role, topicName, subscription);
PulsarServerException ex = new PulsarServerException(
Expand Down Expand Up @@ -242,7 +244,8 @@ private CompletableFuture<Boolean> allowTheSpecifiedActionOpsAsync(NamespaceName
log.debug("Policies node couldn't be found for namespace : {}", namespaceName);
}
} else {
Map<String, Set<AuthAction>> namespaceRoles = policies.get().auth_policies.namespace_auth;
Map<String, Set<AuthAction>> namespaceRoles = policies.get()
.auth_policies.getNamespaceAuthentication();
Set<AuthAction> namespaceActions = namespaceRoles.get(role);
if (namespaceActions != null && namespaceActions.contains(authAction)) {
// The role has namespace level permission
Expand Down Expand Up @@ -294,7 +297,7 @@ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespaceName,
final String policiesPath = String.format("/%s/%s/%s", "admin", POLICIES, namespaceName.toString());
try {
pulsarResources.getNamespaceResources().set(policiesPath, (policies)->{
policies.auth_policies.namespace_auth.put(role, actions);
policies.auth_policies.getNamespaceAuthentication().put(role, actions);
return policies;
});
log.info("[{}] Successfully granted access for role {}: {} - namespace {}", role, role, actions,
Expand Down Expand Up @@ -344,15 +347,15 @@ private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName
Policies policies = pulsarResources.getNamespaceResources().get(policiesPath)
.orElseThrow(() -> new NotFoundException(policiesPath + " not found"));
if (remove) {
if (policies.auth_policies.subscription_auth_roles.get(subscriptionName) != null) {
policies.auth_policies.subscription_auth_roles.get(subscriptionName).removeAll(roles);
if (policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName) != null) {
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName).removeAll(roles);
}else {
log.info("[{}] Couldn't find role {} while revoking for sub = {}", namespace, subscriptionName, roles);
result.completeExceptionally(new IllegalArgumentException("couldn't find subscription"));
return result;
}
} else {
policies.auth_policies.subscription_auth_roles.put(subscriptionName, roles);
policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName, roles);
}
pulsarResources.getNamespaceResources().set(policiesPath, (data)->policies);

Expand Down Expand Up @@ -400,15 +403,16 @@ public CompletableFuture<Boolean> checkPermission(TopicName topicName, String ro
log.debug("Policies node couldn't be found for topic : {}", topicName);
}
} else {
Map<String, Set<AuthAction>> namespaceRoles = policies.get().auth_policies.namespace_auth;
Map<String, Set<AuthAction>> namespaceRoles = policies.get().auth_policies
.getNamespaceAuthentication();
Set<AuthAction> namespaceActions = namespaceRoles.get(role);
if (namespaceActions != null && namespaceActions.contains(action)) {
// The role has namespace level permission
permissionFuture.complete(true);
return;
}

Map<String, Set<AuthAction>> topicRoles = policies.get().auth_policies.destination_auth
Map<String, Set<AuthAction>> topicRoles = policies.get().auth_policies.getTopicAuthentication()
.get(topicName.toString());
if (topicRoles != null && role != null) {
// Topic has custom policy
Expand Down Expand Up @@ -440,7 +444,8 @@ public CompletableFuture<Boolean> checkPermission(TopicName topicName, String ro
// We can also check the permission of partitioned topic.
// For https://github.com/apache/pulsar/issues/10300
if (topicName.isPartitioned()) {
topicRoles = policies.get().auth_policies.destination_auth.get(topicName.getPartitionedTopicName());
topicRoles = policies.get().auth_policies
.getTopicAuthentication().get(topicName.getPartitionedTopicName());
if (topicRoles != null) {
// Topic has custom policy
Set<AuthAction> topicActions = topicRoles.get(role);
Expand Down Expand Up @@ -634,7 +639,7 @@ public CompletableFuture<Boolean> validateTenantAdminAccess(String tenantName,
return CompletableFuture.completedFuture(true);
} else {
try {
TenantInfoImpl tenantInfo = pulsarResources.getTenantResources()
TenantInfo tenantInfo = pulsarResources.getTenantResources()
.get(path(POLICIES, tenantName))
.orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Tenant does not exist"));
return isTenantAdmin(tenantName, role, tenantInfo, authData);
Expand Down
Expand Up @@ -22,19 +22,20 @@
import java.util.Set;
import lombok.Getter;

import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

public class ClusterResources extends BaseResources<ClusterDataImpl> {
public class ClusterResources extends BaseResources<ClusterData> {

public static final String CLUSTERS_ROOT = "/admin/clusters";
@Getter
private FailureDomainResources failureDomainResources;

public ClusterResources(MetadataStoreExtended store, int operationTimeoutSec) {
super(store, ClusterDataImpl.class, operationTimeoutSec);
super(store, ClusterData.class, operationTimeoutSec);
this.failureDomainResources = new FailureDomainResources(store, FailureDomainImpl.class, operationTimeoutSec);
}

Expand Down
Expand Up @@ -18,11 +18,12 @@
*/
package org.apache.pulsar.broker.resources;

import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

public class TenantResources extends BaseResources<TenantInfoImpl> {
public class TenantResources extends BaseResources<TenantInfo> {
public TenantResources(MetadataStoreExtended store, int operationTimeoutSec) {
super(store, TenantInfoImpl.class, operationTimeoutSec);
super(store, TenantInfo.class, operationTimeoutSec);
}
}
Expand Up @@ -36,7 +36,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
Expand Down Expand Up @@ -236,15 +236,18 @@ public static void main(String[] args) throws Exception {

createMetadataNode(configStore, "/admin/clusters", new byte[0]);

ClusterDataImpl clusterData = new ClusterDataImpl(arguments.clusterWebServiceUrl,
arguments.clusterWebServiceUrlTls, arguments.clusterBrokerServiceUrl,
arguments.clusterBrokerServiceUrlTls);
ClusterData clusterData = ClusterData.builder()
.serviceUrl(arguments.clusterWebServiceUrl)
.serviceUrlTls(arguments.clusterWebServiceUrlTls)
.brokerServiceUrl(arguments.clusterBrokerServiceUrl)
.brokerServiceUrlTls(arguments.clusterBrokerServiceUrlTls)
.build();
byte[] clusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(clusterData);

createMetadataNode(configStore, "/admin/clusters/" + arguments.cluster, clusterDataJson);

// Create marker for "global" cluster
ClusterDataImpl globalClusterData = new ClusterDataImpl(null, null);
ClusterData globalClusterData = ClusterData.builder().build();
byte[] globalClusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(globalClusterData);

createMetadataNode(configStore, "/admin/clusters/global", globalClusterDataJson);
Expand Down
Expand Up @@ -34,7 +34,8 @@
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
Expand Down Expand Up @@ -305,8 +306,10 @@ public void start() throws Exception {
webServiceUrl.toString()).authentication(
config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters()).build();
ClusterDataImpl clusterData =
new ClusterDataImpl(webServiceUrl.toString(), null, brokerServiceUrl, null);
ClusterData clusterData = ClusterData.builder()
.serviceUrl(webServiceUrl.toString())
.brokerServiceUrl(brokerServiceUrl)
.build();
createSampleNameSpace(clusterData, cluster);
} else {
URL webServiceUrlTls = new URL(
Expand All @@ -333,8 +336,10 @@ public void start() throws Exception {
}

admin = builder.build();
ClusterDataImpl clusterData = new ClusterDataImpl(null, webServiceUrlTls.toString(),
null, brokerServiceUrlTls);
ClusterData clusterData = ClusterData.builder()
.serviceUrlTls(webServiceUrlTls.toString())
.brokerServiceUrlTls(brokerServiceUrlTls)
.build();
createSampleNameSpace(clusterData, cluster);
}

Expand All @@ -355,7 +360,10 @@ private void createNameSpace(String cluster, String publicTenant, String default
try {
if (!admin.tenants().getTenants().contains(publicTenant)) {
admin.tenants().createTenant(publicTenant,
new TenantInfoImpl(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
TenantInfo.builder()
.adminRoles(Sets.newHashSet(config.getSuperUserRoles()))
.allowedClusters(Sets.newHashSet(cluster))
.build());
}
if (!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
admin.namespaces().createNamespace(defaultNamespace);
Expand All @@ -367,7 +375,7 @@ private void createNameSpace(String cluster, String publicTenant, String default
}
}

private void createSampleNameSpace(ClusterDataImpl clusterData, String cluster) {
private void createSampleNameSpace(ClusterData clusterData, String cluster) {
// Create a sample namespace
final String tenant = "sample";
final String globalCluster = "global";
Expand All @@ -383,7 +391,7 @@ private void createSampleNameSpace(ClusterDataImpl clusterData, String cluster)
try {
admin.clusters().getCluster(globalCluster);
} catch (PulsarAdminException.NotFoundException ex) {
admin.clusters().createCluster(globalCluster, new ClusterDataImpl(null, null));
admin.clusters().createCluster(globalCluster, ClusterData.builder().build());
}

if (!admin.tenants().getTenants().contains(tenant)) {
Expand Down
Expand Up @@ -729,8 +729,12 @@ public Boolean get() {
this.brokerServiceUrlTls = brokerUrlTls(config);

if (null != this.webSocketService) {
ClusterDataImpl clusterData =
new ClusterDataImpl(webServiceAddress, webServiceAddressTls, brokerServiceUrl, brokerServiceUrlTls);
ClusterDataImpl clusterData = ClusterDataImpl.builder()
.serviceUrl(webServiceAddress)
.serviceUrlTls(webServiceAddressTls)
.brokerServiceUrl(brokerServiceUrl)
.brokerServiceUrlTls(brokerServiceUrlTls)
.build();
this.webSocketService.setLocalCluster(clusterData);
}

Expand Down

0 comments on commit da71ec2

Please sign in to comment.