Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #5395][broker] Implement AutoTopicCreation by namespace override #6471

Merged
merged 22 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9e6ee25
Add allowAutoTopicCreation option to policies and CLI
klevy-toast Feb 12, 2020
f60eeae
Auto-create by namespace MVP
klevy-toast Feb 12, 2020
e960426
Refactor to allow partitioned topics, added tests
klevy-toast Feb 22, 2020
bb50b3b
Merge branch 'master' into feature/autocreate-by-namespace
klevy-toast Feb 22, 2020
0a92e59
Cleanup commented code, imports
klevy-toast Feb 22, 2020
8bd458f
Tweak override validation
klevy-toast Feb 22, 2020
49f443c
More cleanup, documentation, tests
klevy-toast Feb 22, 2020
0752ccf
Add default partition type
klevy-toast Feb 27, 2020
ee3a3dc
Add license to new pulsar-common files
klevy-toast Feb 27, 2020
454b8c0
Remove null check that could cause silent failure
klevy-toast Mar 3, 2020
4f31397
Switch back to synchronous policy retreival
klevy-toast Mar 3, 2020
d5aa45f
Merge branch 'master' into feature/autocreate-by-namespace
klevy-toast Mar 4, 2020
bee6179
Merge branch 'master' into feature/autocreate-by-namespace
klevy-toast Mar 9, 2020
423562b
Make API async, renaming
klevy-toast Mar 10, 2020
163811a
Add clause to resume async response on exceptions
klevy-toast Mar 16, 2020
5897a8d
Merge branch 'master' into feature/autocreate-by-namespace
klevy-toast Mar 17, 2020
9957a3b
Remove override from method names
klevy-toast Mar 17, 2020
84f4e4a
Merge branch 'master' into feature/autocreate-by-namespace
klevy-toast Mar 20, 2020
102ccfe
Fix typo
klevy-toast Mar 20, 2020
bafdb70
Merge branch 'master' into feature/autocreate-by-namespace
klevy-toast Mar 23, 2020
221ddab
Merge branch 'master' into feature/autocreate-by-namespace
sijie Mar 25, 2020
f237b22
Fix pulsar-client-admin style
klevy-toast Mar 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.configuration.Category;
import org.apache.pulsar.common.configuration.FieldContext;
Expand Down Expand Up @@ -1532,18 +1533,4 @@ public int getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds() {
return brokerDeleteInactiveTopicsMaxInactiveDurationSeconds;
}
}

enum TopicType {
PARTITIONED("partitioned"),
NON_PARTITIONED("non-partitioned");
private String type;

TopicType(String type) {
this.type = type;
}

public String toString() {
return type;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
Expand Down Expand Up @@ -554,6 +555,105 @@ protected void internalSetNamespaceMessageTTL(int messageTTL) {
}
}

protected void internalSetAutoTopicCreationOverride(AsyncResponse asyncResponse, AutoTopicCreationOverride autoTopicCreationOverride) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();

if (!AutoTopicCreationOverride.isValidOverride(autoTopicCreationOverride)) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid configuration for autoTopicCreationOverride");
}
Comment on lines +558 to +563
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better complete the asyncResponse when there are RunTimeException occurs. Otherwise, the client just can get a 500 response but can't get any error messages.


// Force to read the data s.t. the watch to the cache content is setup.
policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
policies -> {
if (policies.isPresent()) {
Entry<Policies, Stat> policiesNode = policies.get();
policiesNode.getKey().autoTopicCreationOverride = autoTopicCreationOverride;
try {
// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, namespaceName.toString()),
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully {} on namespace {}", clientAppId(),
autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled", namespaceName);
return null;
} catch (KeeperException.NoNodeException e) {
log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(),
namespaceName);
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
return null;
} catch (KeeperException.BadVersionException e) {
log.error(
"[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());

asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
return null;
} catch (Exception e) {
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
}
} else {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
return null;
}
}
).exceptionally(e -> {
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
});
}

protected void internalRemoveAutoTopicCreationOverride(AsyncResponse asyncResponse) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
Comment on lines +610 to +611
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above comment.


// Force to read the data s.t. the watch to the cache content is setup.
policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
policies -> {
if (policies.isPresent()) {
Entry<Policies, Stat> policiesNode = policies.get();
policiesNode.getKey().autoTopicCreationOverride = null;
try {
// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, namespaceName.toString()),
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully removed override on namespace {}", clientAppId(), namespaceName);
return null;
} catch (KeeperException.NoNodeException e) {
log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(),
namespaceName);
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
return null;
} catch (KeeperException.BadVersionException e) {
log.error(
"[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());

asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
return null;
} catch (Exception e) {
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
}
} else {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
return null;
}
}
).exceptionally(e -> {
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
});
}

protected void internalModifyDeduplication(boolean enableDeduplication) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
Expand All @@ -574,17 +674,17 @@ protected void internalModifyDeduplication(boolean enableDeduplication) {
log.info("[{}] Successfully {} on namespace {}", clientAppId(),
enableDeduplication ? "enabled" : "disabled", namespaceName);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to modify deplication status for namespace {}: does not exist", clientAppId(),
log.warn("[{}] Failed to modify deduplication status for namespace {}: does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to modify deplication status on namespace {} expected policy node version={} : concurrent modification",
"[{}] Failed to modify deduplication status on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());

throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to modify deplication status on namespace {}", clientAppId(), namespaceName, e);
log.error("[{}] Failed to modify deduplication status on namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
Expand Down Expand Up @@ -298,6 +299,31 @@ public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam("
internalModifyDeduplication(enableDeduplication);
}

@POST
@Path("/{tenant}/{namespace}/autoTopicCreationOverride")
@ApiOperation(value = "Override broker's allowAutoTopicCreation setting for a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 400, message = "Invalid autoTopicCreation override") })
public void setAutoTopicCreationOverride(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
AutoTopicCreationOverride autoTopicCreationOverride) {
validateNamespaceName(tenant, namespace);
internalSetAutoTopicCreationOverride(asyncResponse, autoTopicCreationOverride);
}

@DELETE
@Path("/{tenant}/{namespace}/utoTopicCreationOverride")
@ApiOperation(value = "Remove override of broker's allowAutoTopicCreation in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public void removeAutoTopicCreationOverride(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalRemoveAutoTopicCreationOverride(asyncResponse);
}

@GET
@Path("/{tenant}/{namespace}/bundles")
@ApiOperation(value = "Get the bundles split data.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
Expand All @@ -130,6 +131,7 @@
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -657,7 +659,7 @@ public CompletableFuture<Optional<Topic>> getTopicIfExists(final String topic) {
}

public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
return getTopic(topic, pulsar.getConfiguration().isAllowAutoTopicCreation()).thenApply(Optional::get);
return getTopic(topic, isAllowAutoTopicCreation(topic)).thenApply(Optional::get);
}

public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean createIfMissing) {
Expand Down Expand Up @@ -1820,8 +1822,8 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata
// If topic is already exist, creating partitioned topic is not allowed.
if (metadata.partitions == 0
&& !topicExists
&& pulsar.getConfiguration().isAllowAutoTopicCreation()
&& pulsar.getConfiguration().isDefaultTopicTypePartitioned()) {
&& pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
&& pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
} else {
return CompletableFuture.completedFuture(metadata);
Expand All @@ -1832,7 +1834,7 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata

@SuppressWarnings("deprecation")
private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) {
int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName);
checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");

PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
Expand Down Expand Up @@ -2064,4 +2066,53 @@ long getCurrentMessagePublishBufferSize() {
foreachProducer(producer -> currentMessagePublishBufferBytes.addAndGet(producer.getCnx().getMessagePublishBufferSize()));
return currentMessagePublishBufferBytes.get();
}

public boolean isAllowAutoTopicCreation(final String topic) {
TopicName topicName = TopicName.get(topic);
return isAllowAutoTopicCreation(topicName);
}

public boolean isAllowAutoTopicCreation(final TopicName topicName) {
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
if (autoTopicCreationOverride != null) {
return autoTopicCreationOverride.allowAutoTopicCreation;
} else {
return pulsar.getConfiguration().isAllowAutoTopicCreation();
}
}

public boolean isDefaultTopicTypePartitioned(final TopicName topicName) {
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
if (autoTopicCreationOverride != null) {
return TopicType.PARTITIONED.toString().equals(autoTopicCreationOverride.topicType);
} else {
return pulsar.getConfiguration().isDefaultTopicTypePartitioned();
}
}

public int getDefaultNumPartitions(final TopicName topicName) {
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
if (autoTopicCreationOverride != null) {
return autoTopicCreationOverride.defaultNumPartitions;
} else {
return pulsar.getConfiguration().getDefaultNumPartitions();
}
}

private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) {
try {
Optional<Policies> policies = pulsar.getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, topicName.getNamespace()));
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
return policies.get().autoTopicCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the default
log.warn("Got exception when reading autoTopicCreateOverride policy for {}: {};", topicName, t.getMessage(), t);
return null;
}
log.warn("No autoTopicCreateOverride policy found for {}", topicName);
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
}

boolean createTopicIfDoesNotExist = forceTopicCreation
&& service.pulsar().getConfig().isAllowAutoTopicCreation();
&& service.isAllowAutoTopicCreation(topicName.toString());

service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected void resetConfig() {
this.conf.setDefaultNumberOfNamespaceBundles(1);
this.conf.setZookeeperServers("localhost:2181");
this.conf.setConfigurationStoreServers("localhost:3181");
this.conf.setAllowAutoTopicCreationType("non-persistent");
this.conf.setAllowAutoTopicCreationType("non-partitioned");
this.conf.setBrokerServicePort(Optional.of(0));
this.conf.setBrokerServicePortTls(Optional.of(0));
this.conf.setWebServicePort(Optional.of(0));
Expand Down