Skip to content

Commit

Permalink
[Issue apache#5395][broker] Implement AutoTopicCreation by namespace …
Browse files Browse the repository at this point in the history
…override (apache#6471)

Fixes apache#5395 

### Motivation

This change introduces a new namespace policy `autoTopicCreationOverride`, which will enable an override of broker `autoTopicCreation` settings on the namespace level. You may keep `autoTopicCreation` disabled for the broker and allow it on a specific namespace using this feature.

### Modifications

- Add new namespace policy: `autoTopicCreationOverride` and associated API / CLI interface for setting and removing. Defaults to non-partitioned type, but also allows partitioned topics.
- Modifies BrokerService: when checking `autoTopicCreation` configuration, the broker first retrieves namespace policies from zookeeper. If the `autoTopicCreationOverride` policy exists for that namespace then it uses those settings. If not, falls back to broker configuration.
- Slight refactor to move `TopicType` enum to pulsar-common and add `autoTopicCreationOverride` to pulsar-common.
  • Loading branch information
klevy-toast authored and huangdx0726 committed Aug 24, 2020
1 parent f0792a5 commit 5822cc0
Show file tree
Hide file tree
Showing 15 changed files with 723 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.configuration.Category;
import org.apache.pulsar.common.configuration.FieldContext;
Expand Down Expand Up @@ -1538,18 +1539,4 @@ public int getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds() {
return brokerDeleteInactiveTopicsMaxInactiveDurationSeconds;
}
}

enum TopicType {
PARTITIONED("partitioned"),
NON_PARTITIONED("non-partitioned");
private String type;

TopicType(String type) {
this.type = type;
}

public String toString() {
return type;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
Expand Down Expand Up @@ -553,6 +554,105 @@ protected void internalSetNamespaceMessageTTL(int messageTTL) {
}
}

protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTopicCreationOverride autoTopicCreationOverride) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();

if (!AutoTopicCreationOverride.isValidOverride(autoTopicCreationOverride)) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid configuration for autoTopicCreationOverride");
}

// Force to read the data s.t. the watch to the cache content is setup.
policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
policies -> {
if (policies.isPresent()) {
Entry<Policies, Stat> policiesNode = policies.get();
policiesNode.getKey().autoTopicCreationOverride = autoTopicCreationOverride;
try {
// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, namespaceName.toString()),
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully {} on namespace {}", clientAppId(),
autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled", namespaceName);
return null;
} catch (KeeperException.NoNodeException e) {
log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(),
namespaceName);
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
return null;
} catch (KeeperException.BadVersionException e) {
log.error(
"[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());

asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
return null;
} catch (Exception e) {
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
}
} else {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
return null;
}
}
).exceptionally(e -> {
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
});
}

protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();

// Force to read the data s.t. the watch to the cache content is setup.
policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
policies -> {
if (policies.isPresent()) {
Entry<Policies, Stat> policiesNode = policies.get();
policiesNode.getKey().autoTopicCreationOverride = null;
try {
// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, namespaceName.toString()),
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully removed override on namespace {}", clientAppId(), namespaceName);
return null;
} catch (KeeperException.NoNodeException e) {
log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(),
namespaceName);
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
return null;
} catch (KeeperException.BadVersionException e) {
log.error(
"[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());

asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
return null;
} catch (Exception e) {
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
}
} else {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
return null;
}
}
).exceptionally(e -> {
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
});
}

protected void internalModifyDeduplication(boolean enableDeduplication) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
Expand All @@ -573,17 +673,17 @@ protected void internalModifyDeduplication(boolean enableDeduplication) {
log.info("[{}] Successfully {} on namespace {}", clientAppId(),
enableDeduplication ? "enabled" : "disabled", namespaceName);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to modify deplication status for namespace {}: does not exist", clientAppId(),
log.warn("[{}] Failed to modify deduplication status for namespace {}: does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to modify deplication status on namespace {} expected policy node version={} : concurrent modification",
"[{}] Failed to modify deduplication status on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());

throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to modify deplication status on namespace {}", clientAppId(), namespaceName, e);
log.error("[{}] Failed to modify deduplication status on namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
Expand Down Expand Up @@ -299,6 +300,43 @@ public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam("
internalModifyDeduplication(enableDeduplication);
}

@POST
@Path("/{tenant}/{namespace}/autoTopicCreation")
@ApiOperation(value = "Override broker's allowAutoTopicCreation setting for a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 400, message = "Invalid autoTopicCreation override") })
public void setAutoTopicCreation(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
AutoTopicCreationOverride autoTopicCreationOverride) {
try {
validateNamespaceName(tenant, namespace);
internalSetAutoTopicCreation(asyncResponse, autoTopicCreationOverride);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e ) {
asyncResponse.resume(new RestException(e));
}
}

@DELETE
@Path("/{tenant}/{namespace}/autoTopicCreation")
@ApiOperation(value = "Remove override of broker's allowAutoTopicCreation in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
try {
validateNamespaceName(tenant, namespace);
internalRemoveAutoTopicCreation(asyncResponse);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
@Path("/{tenant}/{namespace}/bundles")
@ApiOperation(value = "Get the bundles split data.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
Expand All @@ -131,6 +132,7 @@
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -658,7 +660,7 @@ public CompletableFuture<Optional<Topic>> getTopicIfExists(final String topic) {
}

public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
return getTopic(topic, pulsar.getConfiguration().isAllowAutoTopicCreation()).thenApply(Optional::get);
return getTopic(topic, isAllowAutoTopicCreation(topic)).thenApply(Optional::get);
}

public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean createIfMissing) {
Expand Down Expand Up @@ -1846,8 +1848,8 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata
// If topic is already exist, creating partitioned topic is not allowed.
if (metadata.partitions == 0
&& !topicExists
&& pulsar.getConfiguration().isAllowAutoTopicCreation()
&& pulsar.getConfiguration().isDefaultTopicTypePartitioned()) {
&& pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
&& pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
} else {
return CompletableFuture.completedFuture(metadata);
Expand All @@ -1858,7 +1860,7 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata

@SuppressWarnings("deprecation")
private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) {
int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName);
checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");

PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
Expand Down Expand Up @@ -2091,4 +2093,53 @@ long getCurrentMessagePublishBufferSize() {
foreachProducer(producer -> currentMessagePublishBufferBytes.addAndGet(producer.getCnx().getMessagePublishBufferSize()));
return currentMessagePublishBufferBytes.get();
}

public boolean isAllowAutoTopicCreation(final String topic) {
TopicName topicName = TopicName.get(topic);
return isAllowAutoTopicCreation(topicName);
}

public boolean isAllowAutoTopicCreation(final TopicName topicName) {
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
if (autoTopicCreationOverride != null) {
return autoTopicCreationOverride.allowAutoTopicCreation;
} else {
return pulsar.getConfiguration().isAllowAutoTopicCreation();
}
}

public boolean isDefaultTopicTypePartitioned(final TopicName topicName) {
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
if (autoTopicCreationOverride != null) {
return TopicType.PARTITIONED.toString().equals(autoTopicCreationOverride.topicType);
} else {
return pulsar.getConfiguration().isDefaultTopicTypePartitioned();
}
}

public int getDefaultNumPartitions(final TopicName topicName) {
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
if (autoTopicCreationOverride != null) {
return autoTopicCreationOverride.defaultNumPartitions;
} else {
return pulsar.getConfiguration().getDefaultNumPartitions();
}
}

private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) {
try {
Optional<Policies> policies = pulsar.getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, topicName.getNamespace()));
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
return policies.get().autoTopicCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the default
log.warn("Got exception when reading autoTopicCreateOverride policy for {}: {};", topicName, t.getMessage(), t);
return null;
}
log.warn("No autoTopicCreateOverride policy found for {}", topicName);
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
}

boolean createTopicIfDoesNotExist = forceTopicCreation
&& service.pulsar().getConfig().isAllowAutoTopicCreation();
&& service.isAllowAutoTopicCreation(topicName.toString());

service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected void resetConfig() {
this.conf.setDefaultNumberOfNamespaceBundles(1);
this.conf.setZookeeperServers("localhost:2181");
this.conf.setConfigurationStoreServers("localhost:3181");
this.conf.setAllowAutoTopicCreationType("non-persistent");
this.conf.setAllowAutoTopicCreationType("non-partitioned");
this.conf.setBrokerServicePort(Optional.of(0));
this.conf.setBrokerServicePortTls(Optional.of(0));
this.conf.setWebServicePort(Optional.of(0));
Expand Down

0 comments on commit 5822cc0

Please sign in to comment.