Skip to content

Commit

Permalink
add configuration allowOverrideEntryFilters, default to false
Browse files Browse the repository at this point in the history
  • Loading branch information
gavingaozhangmin committed Aug 29, 2022
1 parent 7121fe8 commit d7d9e53
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 5 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ entryFilterNames=
# The directory for all the entry filter implementations
entryFiltersDirectory=

# Whether allow topic level entry filters policies overrides broker configuration.
allowOverrideEntryFilters=false

# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=50000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private String entryFiltersDirectory = "";

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
doc = "Whether allow topic level entry filters policies overrides broker configuration."
)
private boolean allowOverrideEntryFilters = false;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Whether to use streaming read dispatcher. Currently is in preview and can be changed "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater"));
this.authorizationService = new AuthorizationService(
pulsar.getConfiguration(), pulsar().getPulsarResources());
if (!pulsar.getConfiguration().getEntryFilterNames().isEmpty()) {
this.entryFilters = EntryFilterProvider.createEntryFilters(pulsar.getConfiguration());
}

pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
Expand Down Expand Up @@ -751,6 +754,17 @@ public CompletableFuture<Void> closeAsync() {
}
});

//close entry filters
if (entryFilters != null) {
entryFilters.forEach((name, filter) -> {
try {
filter.close();
} catch (Exception e) {
log.warn("Error shutting down entry filter {}", name, e);
}
});
}

CompletableFuture<CompletableFuture<Void>> cancellableDownstreamFutureReference = new CompletableFuture<>();
log.info("Event loops shutting down gracefully...");
List<CompletableFuture<?>> shutdownEventLoops = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableMap;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.service.plugin.FilterContext;
Expand All @@ -40,12 +41,19 @@ public class EntryFilterSupport {
public EntryFilterSupport(Subscription subscription) {
this.subscription = subscription;
if (subscription != null && subscription.getTopic() != null) {
ImmutableMap<String, EntryFilterWithClassLoader> entryFiltersMap =
subscription.getTopic().getEntryFilters();
if (entryFiltersMap != null) {
this.entryFilters = subscription.getTopic().getEntryFilters().values().asList();
if (MapUtils.isNotEmpty(subscription.getTopic()
.getBrokerService().getEntryFilters())
&& !subscription.getTopic().getBrokerService().pulsar()
.getConfiguration().isAllowOverrideEntryFilters()) {
this.entryFilters = subscription.getTopic().getBrokerService().getEntryFilters().values().asList();
} else {
this.entryFilters = ImmutableList.of();
ImmutableMap<String, EntryFilterWithClassLoader> entryFiltersMap =
subscription.getTopic().getEntryFilters();
if (entryFiltersMap != null) {
this.entryFilters = subscription.getTopic().getEntryFilters().values().asList();
} else {
this.entryFilters = ImmutableList.of();
}
}
this.filterContext = new FilterContext();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,27 @@ public static ImmutableMap<String, EntryFilterWithClassLoader> createEntryFilter
return builder.build();
}

public static ImmutableMap<String, EntryFilterWithClassLoader> createEntryFilters(
ServiceConfiguration conf) throws IOException {
EntryFilterDefinitions definitions = searchForEntryFilters(conf.getEntryFiltersDirectory(),
conf.getNarExtractionDirectory());
ImmutableMap.Builder<String, EntryFilterWithClassLoader> builder = ImmutableMap.builder();
for (String filterName : conf.getEntryFilterNames()) {
EntryFilterMetaData metaData = definitions.getFilters().get(filterName);
if (null == metaData) {
throw new RuntimeException("No entry filter is found for name `" + filterName
+ "`. Available entry filters are : " + definitions.getFilters());
}
EntryFilterWithClassLoader filter;
filter = load(metaData, conf.getNarExtractionDirectory());
if (filter != null) {
builder.put(filterName, filter);
}
log.info("Successfully loaded entry filter for name `{}`", filterName);
}
return builder.build();
}

private static EntryFilterDefinitions searchForEntryFilters(String entryFiltersDirectory,
String narExtractionDirectory)
throws IOException {
Expand Down

0 comments on commit d7d9e53

Please sign in to comment.