Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature][broker] Allow to configure the entry filters per namespace and per topic #17153

Merged
merged 13 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ entryFilterNames=
# The directory for all the entry filter implementations
entryFiltersDirectory=

# Whether allow topic level entry filters policies overrides broker configuration.
allowOverrideEntryFilters=false

# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=50000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private String entryFiltersDirectory = "";

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Whether allow topic level entry filters policies overrides broker configuration."
)
private boolean allowOverrideEntryFilters = false;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Whether to use streaming read dispatcher. Currently is in preview and can be changed "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
Expand Down Expand Up @@ -2722,5 +2723,14 @@ protected void internalScanOffloadedLedgers(OffloaderObjectsScannerUtils.Scanner

}

protected CompletableFuture<Void> internalSetEntryFiltersPerTopicAsync(EntryFilters entryFilters) {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.entryFilters = entryFilters;
return policies;
}));
}

private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
Expand Down Expand Up @@ -5335,4 +5336,50 @@ protected CompletableFuture<Void> internalSetSchemaValidationEnforced(boolean sc
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}

protected CompletableFuture<EntryFilters> internalGetEntryFilters(boolean applied, boolean isGlobal) {
return validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.READ)
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getEntryFilters)
.orElseGet(() -> {
if (applied) {
EntryFilters entryFilters = getNamespacePolicies(namespaceName).entryFilters;
if (entryFilters == null) {
return new EntryFilters(String.join(",",
pulsar().getConfiguration().getEntryFilterNames()));
}
return entryFilters;
}
return null;
})));

}

protected CompletableFuture<Void> internalSetEntryFilters(EntryFilters entryFilters,
boolean isGlobal) {

return validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setEntryFilters(entryFilters);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService()
.updateTopicPoliciesAsync(topicName, topicPolicies);
}));
}

protected CompletableFuture<Void> internalRemoveEntryFilters(boolean isGlobal) {
return validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
.thenCompose(__ ->
getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setEntryFilters(null);
op.get().setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
Expand Down Expand Up @@ -2658,5 +2659,66 @@ public void finished(int total, int errors, int unknown) throws Exception {
}
}

@GET
@Path("/{tenant}/{namespace}/entryFilters")
@ApiOperation(value = "Get maxConsumersPerSubscription config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void getEntryFiltersPerTopic(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ENTRY_FILTERS, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(polices -> asyncResponse.resume(polices.entryFilters))
gaozhangmin marked this conversation as resolved.
Show resolved Hide resolved
.exceptionally(ex -> {
log.error("[{}] Failed to get entry filters config on namespace {}: {} ",
clientAppId(), namespaceName, ex.getCause().getMessage(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
@Path("/{tenant}/{namespace}/entryFilters")
@ApiOperation(value = "Set entry filters for namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")})
public void setEntryFiltersPerTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value = "entry filters", required = true)
EntryFilters entryFilters) {
validateNamespaceName(tenant, namespace);
internalSetEntryFiltersPerTopicAsync(entryFilters)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("Failed to set entry filters for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/entryFilters")
@ApiOperation(value = "Remove entry filters for namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Invalid TTL")})
public void removeNamespaceEntryFilters(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalSetEntryFiltersPerTopicAsync(null)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("Failed to remove entry filters for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}



private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
Expand All @@ -53,6 +54,7 @@
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicOperation;
Expand Down Expand Up @@ -509,6 +511,96 @@ protected void validateAdminOperationOnTopic(TopicName topicName, boolean author
validateTopicOwnership(topicName, authoritative);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/entryFilters")
@ApiOperation(value = "Get entry filters for a topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenants or Namespace doesn't exist") })
public void getEntryFilters(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") @DefaultValue("false") boolean applied,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Whether leader broker redirected this call to this "
+ "broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalGetEntryFilters(applied, isGlobal))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getEntryFilters", ex, asyncResponse);
return null;
});
}

@POST
@Path("/{tenant}/{namespace}/{topic}/entryFilters")
@ApiOperation(value = "Set entry filters for specified topic")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void setEntryFilters(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Whether leader broker redirected this "
+ "call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Enable sub types for the specified topic")
EntryFilters entryFilters) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetEntryFilters(entryFilters, isGlobal))
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setEntryFilters", ex, asyncResponse);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/entryFilters")
@ApiOperation(value = "Remove entry filters for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeEntryFilters(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Whether leader broker redirected this"
+ "call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalRemoveEntryFilters(isGlobal))
.thenRun(() -> {
log.info(
"[{}] Successfully remove entry filters: tenant={}, namespace={}, topic={}, isGlobal={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
isGlobal);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
handleTopicPolicyException("removeEntryFilters", ex, asyncResponse);
return null;
});
}

private Topic getTopicReference(TopicName topicName) {
try {
return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
Expand Down