diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index 70d17b41f61119..3cf53fdbf02910 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import java.io.Serializable; import java.net.InetSocketAddress; import java.time.Clock; import java.util.Map; @@ -34,7 +35,7 @@ */ @InterfaceAudience.Public @InterfaceStability.Stable -public interface ClientBuilder extends Cloneable { +public interface ClientBuilder extends Serializable, Cloneable { /** * Construct the final {@link PulsarClient} instance. diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java index 37184a4ea89fda..5bd7597173ea42 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.api; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -194,11 +195,27 @@ default S getStateStore(String tenant, String ns, String void recordMetric(String metricName, double value); /** - * Get the pulsar client. + * Get the pre-configured pulsar client. + * + * You can use this client to access Pulsar cluster. + * The Function will be responsible for disposing this client. * * @return the instance of pulsar client */ default PulsarClient getPulsarClient() { throw new UnsupportedOperationException("not implemented"); } + + /** + * Get the pre-configured pulsar client builder. + * + * You can use this Builder to setup client to connect to the Pulsar cluster. + * But you need to close client properly after using it. + * + * @return the instance of pulsar client builder. + */ + default ClientBuilder getPulsarClientBuilder() { + throw new UnsupportedOperationException("not implemented"); + } + } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index afd57e55bcbe52..957cc448108ece 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -39,6 +39,7 @@ import lombok.ToString; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -88,6 +89,7 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable // Per Message related private Record record; + private final ClientBuilder clientBuilder; private final PulsarClient client; private final PulsarAdmin pulsarAdmin; private Map> publishProducers; @@ -132,9 +134,10 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, String[] metricsLabels, Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager, - StateManager stateManager, PulsarAdmin pulsarAdmin) { + StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder) throws PulsarClientException { this.config = config; this.logger = logger; + this.clientBuilder = clientBuilder; this.client = client; this.pulsarAdmin = pulsarAdmin; this.topicSchema = new TopicSchema(client); @@ -492,6 +495,11 @@ public PulsarClient getPulsarClient() { return client; } + @Override + public ClientBuilder getPulsarClientBuilder() { + return clientBuilder; + } + private Producer getProducer(String topicName, Schema schema) throws PulsarClientException { Producer producer; if (tlPublishProducers != null) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java index 5501a316516aaa..d52c7dbefdb0e3 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java @@ -156,15 +156,9 @@ public static Map getProperties(Function.FunctionDetails.Compone return properties; } - - public static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig) - throws PulsarClientException { - return createPulsarClient(pulsarServiceUrl, authConfig, Optional.empty()); - } - - public static PulsarClient createPulsarClient(String pulsarServiceUrl, - AuthenticationConfig authConfig, - Optional memoryLimit) throws PulsarClientException { + public static ClientBuilder createPulsarClientBuilder(String pulsarServiceUrl, + AuthenticationConfig authConfig, + Optional memoryLimit) throws PulsarClientException { ClientBuilder clientBuilder = null; if (isNotBlank(pulsarServiceUrl)) { clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl); @@ -183,10 +177,20 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) { clientBuilder.memoryLimit(memoryLimit.get(), SizeUnit.BYTES); } clientBuilder.ioThreads(Runtime.getRuntime().availableProcessors()); - return clientBuilder.build(); + return clientBuilder; } - log.warn("pulsarServiceUrl cannot be null"); - return null; + throw new PulsarClientException("pulsarServiceUrl cannot be null"); + } + + public static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig) + throws PulsarClientException { + return createPulsarClient(pulsarServiceUrl, authConfig, Optional.empty()); + } + + public static PulsarClient createPulsarClient(String pulsarServiceUrl, + AuthenticationConfig authConfig, + Optional memoryLimit) throws PulsarClientException { + return createPulsarClientBuilder(pulsarServiceUrl, authConfig, memoryLimit).build(); } public static PulsarAdmin createPulsarAdminClient(String pulsarWebServiceUrl, AuthenticationConfig authConfig) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 37079910c8c697..705b92033fd30a 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -39,7 +39,9 @@ import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.functions.ConsumerConfig; @@ -90,7 +92,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private final InstanceConfig instanceConfig; // input topic consumer & output topic producer - private final PulsarClientImpl client; + private final ClientBuilder clientBuilder; + private PulsarClientImpl client; private final PulsarAdmin pulsarAdmin; private LogAppender logAppender; @@ -134,13 +137,15 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private ReadWriteLock statsLock = new ReentrantReadWriteLock(); public JavaInstanceRunnable(InstanceConfig instanceConfig, + ClientBuilder clientBuilder, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, String stateStorageServiceUrl, SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, - ClassLoader functionClassLoader) { + ClassLoader functionClassLoader) throws PulsarClientException { this.instanceConfig = instanceConfig; + this.clientBuilder = clientBuilder; this.client = (PulsarClientImpl) pulsarClient; this.pulsarAdmin = pulsarAdmin; this.stateStorageServiceUrl = stateStorageServiceUrl; @@ -226,12 +231,12 @@ synchronized private void setup() throws Exception { isInitialized = true; } - ContextImpl setupContext() { + ContextImpl setupContext() throws PulsarClientException { Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger( "function-" + instanceConfig.getFunctionDetails().getName()); return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider, collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager, - pulsarAdmin); + pulsarAdmin, clientBuilder); } /** diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index 5bb61fd7bd2afb..1caab98517e1cd 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.instance; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -74,13 +75,14 @@ public class ContextImplTest { private InstanceConfig config; private Logger logger; + private ClientBuilder clientBuilder; private PulsarClientImpl client; private PulsarAdmin pulsarAdmin; private ContextImpl context; private Producer producer = mock(Producer.class); @BeforeMethod - public void setup() { + public void setup() throws PulsarClientException { config = new InstanceConfig(); config.setExposePulsarAdminClientEnabled(true); FunctionDetails functionDetails = FunctionDetails.newBuilder() @@ -88,14 +90,18 @@ public void setup() { .build(); config.setFunctionDetails(functionDetails); logger = mock(Logger.class); - client = mock(PulsarClientImpl.class); pulsarAdmin = mock(PulsarAdmin.class); + + client = mock(PulsarClientImpl.class); when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, Schema.BYTES)); when(client.createProducerAsync(any(ProducerConfigurationData.class), any(), any())) .thenReturn(CompletableFuture.completedFuture(producer)); when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty())); when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null)); + clientBuilder = mock(ClientBuilder.class); + when(clientBuilder.build()).thenReturn(client); + TypedMessageBuilder messageBuilder = spy(new TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING)); doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync(); when(producer.newMessage()).thenReturn(messageBuilder); @@ -105,7 +111,7 @@ public void setup() { client, new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin); + pulsarAdmin, clientBuilder); context.setCurrentMessageContext((Record) () -> null); } @@ -190,7 +196,7 @@ public void testGetPulsarAdmin() throws Exception { } @Test(expectedExceptions = IllegalStateException.class) - public void testGetPulsarAdminWithExposePulsarAdminDisabled() { + public void testGetPulsarAdminWithExposePulsarAdminDisabled() throws PulsarClientException { config.setExposePulsarAdminClientEnabled(false); context = new ContextImpl( config, @@ -198,12 +204,12 @@ public void testGetPulsarAdminWithExposePulsarAdminDisabled() { client, new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin); + pulsarAdmin, clientBuilder); context.getPulsarAdmin(); } @Test - public void testUnsupportedExtendedSinkContext(){ + public void testUnsupportedExtendedSinkContext() throws PulsarClientException { config.setExposePulsarAdminClientEnabled(false); context = new ContextImpl( config, @@ -211,7 +217,7 @@ public void testUnsupportedExtendedSinkContext(){ client, new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin); + pulsarAdmin, clientBuilder); try { context.seek("z", 0, Mockito.mock(MessageId.class)); Assert.fail("Expected exception"); @@ -241,7 +247,7 @@ public void testExtendedSinkContext() throws PulsarClientException { client, new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin); + pulsarAdmin, clientBuilder); Consumer mockConsumer = Mockito.mock(Consumer.class); when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString()); context.setInputConsumers(Lists.newArrayList(mockConsumer)); @@ -272,7 +278,7 @@ public void testGetConsumer() throws PulsarClientException { client, new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin); + pulsarAdmin, clientBuilder); Consumer mockConsumer = Mockito.mock(Consumer.class); when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString()); context.setInputConsumers(Lists.newArrayList(mockConsumer)); @@ -295,7 +301,7 @@ public void testGetConsumerMultiTopic() throws PulsarClientException { client, new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin); + pulsarAdmin, clientBuilder); ConsumerImpl consumer1 = Mockito.mock(ConsumerImpl.class); when(consumer1.getTopic()).thenReturn(TopicName.get("first").toString()); ConsumerImpl consumer2 = Mockito.mock(ConsumerImpl.class); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index 220802fefbef67..59a4f5b50a618d 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Getter; import lombok.Setter; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.SerDe; @@ -37,6 +38,9 @@ import java.lang.reflect.Method; import java.util.Map; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + public class JavaInstanceRunnableTest { static class IntegerSerDe implements SerDe { @@ -64,8 +68,10 @@ private static InstanceConfig createInstanceConfig(String outputSerde) { private JavaInstanceRunnable createRunnable(String outputSerde) throws Exception { InstanceConfig config = createInstanceConfig(outputSerde); + ClientBuilder clientBuilder = mock(ClientBuilder.class); + when(clientBuilder.build()).thenReturn(null); JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable( - config, null, null, null, null, null, null); + config, clientBuilder, null, null, null, null, null, null); return javaInstanceRunnable; } @@ -126,7 +132,7 @@ public void testStatsManagerNull() throws Exception { @Test public void testSinkConfigParsingPreservesOriginalType() throws Exception { - SinkSpecOrBuilder sinkSpec = Mockito.mock(SinkSpecOrBuilder.class); + SinkSpecOrBuilder sinkSpec = mock(SinkSpecOrBuilder.class); Mockito.when(sinkSpec.getConfigs()).thenReturn("{\"ttl\": 9223372036854775807}"); Map parsedConfig = new ObjectMapper().readValue(sinkSpec.getConfigs(), new TypeReference>() {}); @@ -136,7 +142,7 @@ public void testSinkConfigParsingPreservesOriginalType() throws Exception { @Test public void testSourceConfigParsingPreservesOriginalType() throws Exception { - SourceSpecOrBuilder sourceSpec = Mockito.mock(SourceSpecOrBuilder.class); + SourceSpecOrBuilder sourceSpec = mock(SourceSpecOrBuilder.class); Mockito.when(sourceSpec.getConfigs()).thenReturn("{\"ttl\": 9223372036854775807}"); Map parsedConfig = new ObjectMapper().readValue(sourceSpec.getConfigs(), new TypeReference>() {}); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java index e26887b13fd779..b6dd019140e76d 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java @@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.instance.InstanceUtils; @@ -62,6 +63,7 @@ public class ThreadRuntime implements Runtime { private ThreadGroup threadGroup; private FunctionCacheManager fnCache; private String jarFile; + private ClientBuilder clientBuilder; private PulsarClient pulsarClient; private PulsarAdmin pulsarAdmin; private String stateStorageServiceUrl; @@ -74,7 +76,8 @@ public class ThreadRuntime implements Runtime { FunctionCacheManager fnCache, ThreadGroup threadGroup, String jarFile, - PulsarClient pulsarClient, + PulsarClient client, + ClientBuilder clientBuilder, PulsarAdmin pulsarAdmin, String stateStorageServiceUrl, SecretsProvider secretsProvider, @@ -89,7 +92,8 @@ public class ThreadRuntime implements Runtime { this.threadGroup = threadGroup; this.fnCache = fnCache; this.jarFile = jarFile; - this.pulsarClient = pulsarClient; + this.clientBuilder = clientBuilder; + this.pulsarClient = client; this.pulsarAdmin = pulsarAdmin; this.stateStorageServiceUrl = stateStorageServiceUrl; this.secretsProvider = secretsProvider; @@ -167,6 +171,7 @@ public void start() throws Exception { // re-initialize JavaInstanceRunnable so that variables in constructor can be re-initialized this.javaInstanceRunnable = new JavaInstanceRunnable( instanceConfig, + clientBuilder, pulsarClient, pulsarAdmin, stateStorageServiceUrl, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java index c020bf9ce9ec9d..864a067c1be2ce 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java @@ -60,6 +60,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory { @Getter private ThreadGroup threadGroup; private FunctionCacheManager fnCache; + private ClientBuilder clientBuilder; private PulsarClient pulsarClient; private PulsarAdmin pulsarAdmin; private String storageServiceUrl; @@ -101,7 +102,8 @@ private void initialize(String threadGroupName, Optional metricsDataCompletableFuture = new CompletableFuture(); metricsDataCompletableFuture.complete(javaInstanceRunnable.getMetrics()); Runtime runtime = mock(Runtime.class); @@ -214,14 +215,14 @@ public void testStatusEmpty() { } @Test - public void testMetricsEmpty() { + public void testMetricsEmpty() throws PulsarClientException { Function.FunctionDetails.Builder functionDetailsBuilder = createDefaultFunctionDetails().toBuilder(); InstanceConfig instanceConfig = new InstanceConfig(); instanceConfig.setFunctionDetails(functionDetailsBuilder.build()); instanceConfig.setMaxBufferedTuples(1024); JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable( - instanceConfig, null, null, null, null, null, null); + instanceConfig, null, null, null, null, null, null, null); CompletableFuture completableFuture = new CompletableFuture(); completableFuture.complete(javaInstanceRunnable.getMetrics()); Runtime runtime = mock(Runtime.class); diff --git a/pulsar-io/debezium/core/pom.xml b/pulsar-io/debezium/core/pom.xml index 0b928987c44b0c..e004b0f421206c 100644 --- a/pulsar-io/debezium/core/pom.xml +++ b/pulsar-io/debezium/core/pom.xml @@ -38,6 +38,12 @@ ${project.version} + + ${project.groupId} + pulsar-client-original + ${project.version} + + io.debezium debezium-core @@ -62,12 +68,6 @@ ${kafka-client.version} - - ${project.groupId} - pulsar-client-original - ${project.version} - - ${project.groupId} pulsar-broker diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java index 54b02280571997..b9074b91bc7c57 100644 --- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.debezium; +import io.debezium.relational.history.DatabaseHistory; import java.util.Map; import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; @@ -96,6 +97,9 @@ public void open(Map config, SourceContext sourceContext) throws setConfigIfNull(config, PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC); + config.put(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.client.builder", + SerDeUtils.serialize(sourceContext.getPulsarClientBuilder())); + super.open(config, sourceContext); } diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java index 9e9a7f14725050..be152a6da8eb2e 100644 --- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -61,26 +62,34 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { .withValidation(Field::isRequired); public static final Field SERVICE_URL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.service.url") - .withDisplayName("Pulsar broker addresses") + .withDisplayName("Pulsar service url") .withType(Type.STRING) .withWidth(Width.LONG) .withImportance(Importance.HIGH) .withDescription("Pulsar service url") - .withValidation(Field::isRequired); + .withValidation(Field::isOptional); + + public static final Field CLIENT_BUILDER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.client.builder") + .withDisplayName("Pulsar client builder") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.HIGH) + .withDescription("Pulsar client builder") + .withValidation(Field::isOptional); public static Field.Set ALL_FIELDS = Field.setOf( TOPIC, SERVICE_URL, + CLIENT_BUILDER, DatabaseHistory.NAME); private final DocumentReader reader = DocumentReader.defaultReader(); private String topicName; - private String serviceUrl; private String dbHistoryName; + private ClientBuilder clientBuilder; private volatile PulsarClient pulsarClient; private volatile Producer producer; - @Override public void configure( Configuration config, @@ -93,12 +102,24 @@ public void configure( + getClass().getSimpleName() + "; check the logs for details"); } this.topicName = config.getString(TOPIC); - this.serviceUrl = config.getString(SERVICE_URL); + + if (config.getString(CLIENT_BUILDER) == null && config.getString(SERVICE_URL) == null) { + throw new IllegalArgumentException("Neither Pulsar Service URL nor ClientBuilder provided."); + } + String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER); + this.clientBuilder = PulsarClient.builder(); + if (null != clientBuilderBase64Encoded) { + // deserialize the client builder to the same classloader + this.clientBuilder = (ClientBuilder) SerDeUtils.deserialize(clientBuilderBase64Encoded, this.clientBuilder.getClass().getClassLoader()); + } else { + this.clientBuilder.serviceUrl(config.getString(SERVICE_URL)); + } + // Copy the relevant portions of the configuration and add useful defaults ... this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString()); - log.info("Configure to store the debezium database history {} to pulsar topic {} at {}", - dbHistoryName, topicName, serviceUrl); + log.info("Configure to store the debezium database history {} to pulsar topic {}", + dbHistoryName, topicName); } @Override @@ -117,12 +138,9 @@ public void initializeStorage() { void setupClientIfNeeded() { if (null == this.pulsarClient) { try { - pulsarClient = PulsarClient.builder() - .serviceUrl(serviceUrl) - .build(); + pulsarClient = clientBuilder.build(); } catch (PulsarClientException e) { - throw new RuntimeException("Failed to create pulsar client to pulsar cluster at " - + serviceUrl, e); + throw new RuntimeException("Failed to create pulsar client to pulsar cluster", e); } } } @@ -137,9 +155,9 @@ void setupProducerIfNeeded() { .blockIfQueueFull(true) .create(); } catch (PulsarClientException e) { - log.error("Failed to create pulsar producer to topic '{}' at cluster '{}'", topicName, serviceUrl); + log.error("Failed to create pulsar producer to topic '{}'", topicName); throw new RuntimeException("Failed to create pulsar producer to topic '" - + topicName + "' at cluster '" + serviceUrl + "'", e); + + topicName, e); } } } @@ -258,7 +276,7 @@ public boolean storageExists() { @Override public String toString() { if (topicName != null) { - return "Pulsar topic (" + topicName + ") at " + serviceUrl; + return "Pulsar topic (" + topicName + ")"; } return "Pulsar topic"; } diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java new file mode 100644 index 00000000000000..1c3897a407eba9 --- /dev/null +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.debezium; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamClass; +import java.util.Base64; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class SerDeUtils { + public static Object deserialize(String objectBase64Encoded, ClassLoader classLoader) { + byte[] data = Base64.getDecoder().decode(objectBase64Encoded); + try (InputStream bai = new ByteArrayInputStream(data); + PulsarClientBuilderInputStream ois = new PulsarClientBuilderInputStream(bai, classLoader)) { + return ois.readObject(); + } catch (Exception e) { + throw new RuntimeException( + "Failed to initialize the pulsar client to store debezium database history", e); + } + } + + public static String serialize(Object obj) throws Exception { + try (ByteArrayOutputStream bao = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bao)) { + oos.writeObject(obj); + oos.flush(); + byte[] data = bao.toByteArray(); + return Base64.getEncoder().encodeToString(data); + } + } + + static class PulsarClientBuilderInputStream extends ObjectInputStream { + private final ClassLoader classLoader; + public PulsarClientBuilderInputStream(InputStream in, ClassLoader ldr) throws IOException { + super(in); + this.classLoader = ldr; + } + + protected Class resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { + try { + return Class.forName(desc.getName(), true, classLoader); + } catch (Exception ex) { + log.warn("PulsarClientBuilderInputStream resolveClass failed {} {}", desc.getName(), ex); + } + return super.resolveClass(desc); + } + } +} diff --git a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java index ba3bc6a1951d04..04334da5e435bd 100644 --- a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java +++ b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java @@ -30,9 +30,16 @@ import io.debezium.relational.history.DatabaseHistoryListener; import io.debezium.text.ParsingException; import io.debezium.util.Collect; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.util.Base64; import java.util.Map; + +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -67,15 +74,27 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - private void testHistoryTopicContent(boolean skipUnparseableDDL) { + private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder) throws Exception { + Configuration.Builder configBuidler = Configuration.create() + .with(PulsarDatabaseHistory.TOPIC, topicName) + .with(DatabaseHistory.NAME, "my-db-history") + .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL); + + if (testWithClientBuilder) { + ClientBuilder builder = PulsarClient.builder().serviceUrl(brokerUrl.toString()); + ByteArrayOutputStream bao = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(bao)) { + oos.writeObject(builder); + oos.flush(); + byte[] data = bao.toByteArray(); + configBuidler.with(PulsarDatabaseHistory.CLIENT_BUILDER, Base64.getEncoder().encodeToString(data)); + } + } else { + configBuidler.with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString()); + } + // Start up the history ... - Configuration config = Configuration.create() - .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString()) - .with(PulsarDatabaseHistory.TOPIC, topicName) - .with(DatabaseHistory.NAME, "my-db-history") - .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL) - .build(); - history.configure(config, null, DatabaseHistoryListener.NOOP, true); + history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true); history.start(); // Should be able to call start more than once ... @@ -134,7 +153,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL) { // Stop the history (which should stop the producer) ... history.stop(); history = new PulsarDatabaseHistory(); - history.configure(config, null, DatabaseHistoryListener.NOOP, true); + history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true); // no need to start // Recover from the very beginning to just past the first change ... @@ -170,7 +189,7 @@ protected void setLogPosition(int index) { @Test public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exception { // Create the empty topic ... - testHistoryTopicContent(false); + testHistoryTopicContent(false, true); } @Test @@ -187,7 +206,7 @@ public void shouldIgnoreUnparseableMessages() throws Exception { producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP TABLE foo;\"}"); } - testHistoryTopicContent(true); + testHistoryTopicContent(true, true); } @Test(expectedExceptions = ParsingException.class) @@ -196,14 +215,14 @@ public void shouldStopOnUnparseableSQL() throws Exception { producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP TABLE foo;\"}"); } - testHistoryTopicContent(false); + testHistoryTopicContent(false, false); } @Test - public void testExists() { + public void testExists() throws Exception { // happy path - testHistoryTopicContent(true); + testHistoryTopicContent(true, false); assertTrue(history.exists()); // Set history to use dummy topic