Skip to content

Commit

Permalink
[Functions] Support KEY_BASED batch builder for Java based functions …
Browse files Browse the repository at this point in the history
…and sources (#11706)

* [Functions] Support KEY_BASED batch builder for Java based functions and sources

* Include batchBuilder in ProducerSpec -> ProducerConfig.ProducerConfigBuilder conversion

* Support setting batch builder for sources with "--batch-builder KEY_BASED" argument
  • Loading branch information
lhotari committed Aug 25, 2021
1 parent 7de9992 commit b923af1
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -146,14 +147,22 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
boolean useThreadLocalProducers = false;
if (config.getFunctionDetails().getSink().getProducerSpec() != null) {
if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages() != 0) {
this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages());
Function.ProducerSpec producerSpec = config.getFunctionDetails().getSink().getProducerSpec();
if (producerSpec != null) {
if (producerSpec.getMaxPendingMessages() != 0) {
this.producerBuilder.maxPendingMessages(producerSpec.getMaxPendingMessages());
}
if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) {
this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
if (producerSpec.getMaxPendingMessagesAcrossPartitions() != 0) {
this.producerBuilder.maxPendingMessagesAcrossPartitions(producerSpec.getMaxPendingMessagesAcrossPartitions());
}
useThreadLocalProducers = config.getFunctionDetails().getSink().getProducerSpec().getUseThreadLocalProducers();
if (producerSpec.getBatchBuilder() != null) {
if (producerSpec.getBatchBuilder().equals("KEY_BASED")) {
this.producerBuilder.batcherBuilder(BatcherBuilder.KEY_BASED);
} else {
this.producerBuilder.batcherBuilder(BatcherBuilder.DEFAULT);
}
}
useThreadLocalProducers = producerSpec.getUseThreadLocalProducers();
}
if (useThreadLocalProducers) {
tlPublishProducers = new ThreadLocal<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ private void setupOutput(ContextImpl contextImpl) throws Exception {
ProducerConfig.ProducerConfigBuilder builder = ProducerConfig.builder()
.maxPendingMessages(conf.getMaxPendingMessages())
.maxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions())
.batchBuilder(conf.getBatchBuilder())
.useThreadLocalProducers(conf.getUseThreadLocalProducers())
.cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()));
pulsarSinkConfig.setProducerConfig(builder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ public static FunctionDetails convert(SourceConfig sourceConfig, ExtractedSource
sinkSpecBuilder.setProducerSpec(pbldr.build());
}

if (sourceConfig.getBatchBuilder() != null) {
Function.ProducerSpec.Builder builder = sinkSpecBuilder.getProducerSpec() != null
? sinkSpecBuilder.getProducerSpec().toBuilder()
: Function.ProducerSpec.newBuilder();
sinkSpecBuilder.setProducerSpec(builder.setBatchBuilder(sourceConfig.getBatchBuilder()).build());
}

sinkSpecBuilder.setForwardSourceMessageProperty(true);

functionDetailsBuilder.setSink(sinkSpecBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,37 @@ public void testValidateConfig() throws IOException {
assertTrue(e.getMessage().contains("Could not validate source config: Field 'configParameter' cannot be null!"));
}

@Test
public void testSupportsBatchBuilderWhenProducerConfigIsNull() {
SourceConfig sourceConfig = createSourceConfig();
sourceConfig.setProducerConfig(null);
sourceConfig.setBatchBuilder("KEY_BASED");
Function.FunctionDetails functionDetails =
SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
}

@Test
public void testSupportsBatchBuilderWhenProducerConfigExists() {
SourceConfig sourceConfig = createSourceConfig();
sourceConfig.setBatchBuilder("KEY_BASED");
sourceConfig.getProducerConfig().setMaxPendingMessages(123456);
Function.FunctionDetails functionDetails =
SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
assertEquals(functionDetails.getSink().getProducerSpec().getMaxPendingMessages(), 123456);
}

@Test
public void testSupportsBatchBuilderDefinedInProducerConfigWhenTopLevelBatchBuilderIsUndefined() {
SourceConfig sourceConfig = createSourceConfig();
sourceConfig.setBatchBuilder(null);
sourceConfig.getProducerConfig().setBatchBuilder("KEY_BASED");
Function.FunctionDetails functionDetails =
SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
}

private SourceConfig createSourceConfigWithBatch() {
SourceConfig sourceConfig = createSourceConfig();
BatchSourceConfig batchSourceConfig = createBatchSourceConfig();
Expand Down

0 comments on commit b923af1

Please sign in to comment.