Skip to content

Commit

Permalink
[IO] Pass client builder to debezium database history (apache#11293)
Browse files Browse the repository at this point in the history
an alternative approach for apache#11251
  • Loading branch information
sijie authored and ciaocloud committed Oct 16, 2021
1 parent 1d31fef commit 04cc940
Show file tree
Hide file tree
Showing 16 changed files with 255 additions and 85 deletions.
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import java.io.Serializable;
import java.net.InetSocketAddress;
import java.time.Clock;
import java.util.Map;
Expand All @@ -34,7 +35,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface ClientBuilder extends Cloneable {
public interface ClientBuilder extends Serializable, Cloneable {

/**
* Construct the final {@link PulsarClient} instance.
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.api;

import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
Expand Down Expand Up @@ -194,11 +195,27 @@ default <S extends StateStore> S getStateStore(String tenant, String ns, String
void recordMetric(String metricName, double value);

/**
* Get the pulsar client.
* Get the pre-configured pulsar client.
*
* You can use this client to access Pulsar cluster.
* The Function will be responsible for disposing this client.
*
* @return the instance of pulsar client
*/
default PulsarClient getPulsarClient() {
throw new UnsupportedOperationException("not implemented");
}

/**
* Get the pre-configured pulsar client builder.
*
* You can use this Builder to setup client to connect to the Pulsar cluster.
* But you need to close client properly after using it.
*
* @return the instance of pulsar client builder.
*/
default ClientBuilder getPulsarClientBuilder() {
throw new UnsupportedOperationException("not implemented");
}

}
Expand Up @@ -39,6 +39,7 @@
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
Expand Down Expand Up @@ -88,6 +89,7 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
// Per Message related
private Record<?> record;

private final ClientBuilder clientBuilder;
private final PulsarClient client;
private final PulsarAdmin pulsarAdmin;
private Map<String, Producer<?>> publishProducers;
Expand Down Expand Up @@ -132,9 +134,10 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, String[] metricsLabels,
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
StateManager stateManager, PulsarAdmin pulsarAdmin) {
StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder) throws PulsarClientException {
this.config = config;
this.logger = logger;
this.clientBuilder = clientBuilder;
this.client = client;
this.pulsarAdmin = pulsarAdmin;
this.topicSchema = new TopicSchema(client);
Expand Down Expand Up @@ -492,6 +495,11 @@ public PulsarClient getPulsarClient() {
return client;
}

@Override
public ClientBuilder getPulsarClientBuilder() {
return clientBuilder;
}

private <O> Producer<O> getProducer(String topicName, Schema<O> schema) throws PulsarClientException {
Producer<O> producer;
if (tlPublishProducers != null) {
Expand Down
Expand Up @@ -156,15 +156,9 @@ public static Map<String, String> getProperties(Function.FunctionDetails.Compone
return properties;
}


public static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig)
throws PulsarClientException {
return createPulsarClient(pulsarServiceUrl, authConfig, Optional.empty());
}

public static PulsarClient createPulsarClient(String pulsarServiceUrl,
AuthenticationConfig authConfig,
Optional<Long> memoryLimit) throws PulsarClientException {
public static ClientBuilder createPulsarClientBuilder(String pulsarServiceUrl,
AuthenticationConfig authConfig,
Optional<Long> memoryLimit) throws PulsarClientException {
ClientBuilder clientBuilder = null;
if (isNotBlank(pulsarServiceUrl)) {
clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
Expand All @@ -183,10 +177,20 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) {
clientBuilder.memoryLimit(memoryLimit.get(), SizeUnit.BYTES);
}
clientBuilder.ioThreads(Runtime.getRuntime().availableProcessors());
return clientBuilder.build();
return clientBuilder;
}
log.warn("pulsarServiceUrl cannot be null");
return null;
throw new PulsarClientException("pulsarServiceUrl cannot be null");
}

public static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig)
throws PulsarClientException {
return createPulsarClient(pulsarServiceUrl, authConfig, Optional.empty());
}

public static PulsarClient createPulsarClient(String pulsarServiceUrl,
AuthenticationConfig authConfig,
Optional<Long> memoryLimit) throws PulsarClientException {
return createPulsarClientBuilder(pulsarServiceUrl, authConfig, memoryLimit).build();
}

public static PulsarAdmin createPulsarAdminClient(String pulsarWebServiceUrl, AuthenticationConfig authConfig)
Expand Down
Expand Up @@ -39,7 +39,9 @@
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
Expand Down Expand Up @@ -90,7 +92,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private final InstanceConfig instanceConfig;

// input topic consumer & output topic producer
private final PulsarClientImpl client;
private final ClientBuilder clientBuilder;
private PulsarClientImpl client;
private final PulsarAdmin pulsarAdmin;

private LogAppender logAppender;
Expand Down Expand Up @@ -134,13 +137,15 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private ReadWriteLock statsLock = new ReentrantReadWriteLock();

public JavaInstanceRunnable(InstanceConfig instanceConfig,
ClientBuilder clientBuilder,
PulsarClient pulsarClient,
PulsarAdmin pulsarAdmin,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
FunctionCollectorRegistry collectorRegistry,
ClassLoader functionClassLoader) {
ClassLoader functionClassLoader) throws PulsarClientException {
this.instanceConfig = instanceConfig;
this.clientBuilder = clientBuilder;
this.client = (PulsarClientImpl) pulsarClient;
this.pulsarAdmin = pulsarAdmin;
this.stateStorageServiceUrl = stateStorageServiceUrl;
Expand Down Expand Up @@ -226,12 +231,12 @@ synchronized private void setup() throws Exception {
isInitialized = true;
}

ContextImpl setupContext() {
ContextImpl setupContext() throws PulsarClientException {
Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager,
pulsarAdmin);
pulsarAdmin, clientBuilder);
}

/**
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.functions.instance;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
Expand Down Expand Up @@ -74,28 +75,33 @@ public class ContextImplTest {

private InstanceConfig config;
private Logger logger;
private ClientBuilder clientBuilder;
private PulsarClientImpl client;
private PulsarAdmin pulsarAdmin;
private ContextImpl context;
private Producer producer = mock(Producer.class);

@BeforeMethod
public void setup() {
public void setup() throws PulsarClientException {
config = new InstanceConfig();
config.setExposePulsarAdminClientEnabled(true);
FunctionDetails functionDetails = FunctionDetails.newBuilder()
.setUserConfig("")
.build();
config.setFunctionDetails(functionDetails);
logger = mock(Logger.class);
client = mock(PulsarClientImpl.class);
pulsarAdmin = mock(PulsarAdmin.class);

client = mock(PulsarClientImpl.class);
when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, Schema.BYTES));
when(client.createProducerAsync(any(ProducerConfigurationData.class), any(), any()))
.thenReturn(CompletableFuture.completedFuture(producer));
when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));

clientBuilder = mock(ClientBuilder.class);
when(clientBuilder.build()).thenReturn(client);

TypedMessageBuilder messageBuilder = spy(new TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING));
doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync();
when(producer.newMessage()).thenReturn(messageBuilder);
Expand All @@ -105,7 +111,7 @@ public void setup() {
client,
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin);
pulsarAdmin, clientBuilder);
context.setCurrentMessageContext((Record<String>) () -> null);
}

Expand Down Expand Up @@ -190,28 +196,28 @@ public void testGetPulsarAdmin() throws Exception {
}

@Test(expectedExceptions = IllegalStateException.class)
public void testGetPulsarAdminWithExposePulsarAdminDisabled() {
public void testGetPulsarAdminWithExposePulsarAdminDisabled() throws PulsarClientException {
config.setExposePulsarAdminClientEnabled(false);
context = new ContextImpl(
config,
logger,
client,
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin);
pulsarAdmin, clientBuilder);
context.getPulsarAdmin();
}

@Test
public void testUnsupportedExtendedSinkContext(){
public void testUnsupportedExtendedSinkContext() throws PulsarClientException {
config.setExposePulsarAdminClientEnabled(false);
context = new ContextImpl(
config,
logger,
client,
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin);
pulsarAdmin, clientBuilder);
try {
context.seek("z", 0, Mockito.mock(MessageId.class));
Assert.fail("Expected exception");
Expand Down Expand Up @@ -241,7 +247,7 @@ public void testExtendedSinkContext() throws PulsarClientException {
client,
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin);
pulsarAdmin, clientBuilder);
Consumer<?> mockConsumer = Mockito.mock(Consumer.class);
when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString());
context.setInputConsumers(Lists.newArrayList(mockConsumer));
Expand Down Expand Up @@ -272,7 +278,7 @@ public void testGetConsumer() throws PulsarClientException {
client,
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin);
pulsarAdmin, clientBuilder);
Consumer<?> mockConsumer = Mockito.mock(Consumer.class);
when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString());
context.setInputConsumers(Lists.newArrayList(mockConsumer));
Expand All @@ -295,7 +301,7 @@ public void testGetConsumerMultiTopic() throws PulsarClientException {
client,
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin);
pulsarAdmin, clientBuilder);
ConsumerImpl<?> consumer1 = Mockito.mock(ConsumerImpl.class);
when(consumer1.getTopic()).thenReturn(TopicName.get("first").toString());
ConsumerImpl<?> consumer2 = Mockito.mock(ConsumerImpl.class);
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.SerDe;
Expand All @@ -37,6 +38,9 @@
import java.lang.reflect.Method;
import java.util.Map;

import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;

public class JavaInstanceRunnableTest {

static class IntegerSerDe implements SerDe<Integer> {
Expand Down Expand Up @@ -64,8 +68,10 @@ private static InstanceConfig createInstanceConfig(String outputSerde) {

private JavaInstanceRunnable createRunnable(String outputSerde) throws Exception {
InstanceConfig config = createInstanceConfig(outputSerde);
ClientBuilder clientBuilder = mock(ClientBuilder.class);
when(clientBuilder.build()).thenReturn(null);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
config, null, null, null, null, null, null);
config, clientBuilder, null, null, null, null, null, null);
return javaInstanceRunnable;
}

Expand Down Expand Up @@ -126,7 +132,7 @@ public void testStatsManagerNull() throws Exception {

@Test
public void testSinkConfigParsingPreservesOriginalType() throws Exception {
SinkSpecOrBuilder sinkSpec = Mockito.mock(SinkSpecOrBuilder.class);
SinkSpecOrBuilder sinkSpec = mock(SinkSpecOrBuilder.class);
Mockito.when(sinkSpec.getConfigs()).thenReturn("{\"ttl\": 9223372036854775807}");
Map<String, Object> parsedConfig =
new ObjectMapper().readValue(sinkSpec.getConfigs(), new TypeReference<Map<String, Object>>() {});
Expand All @@ -136,7 +142,7 @@ public void testSinkConfigParsingPreservesOriginalType() throws Exception {

@Test
public void testSourceConfigParsingPreservesOriginalType() throws Exception {
SourceSpecOrBuilder sourceSpec = Mockito.mock(SourceSpecOrBuilder.class);
SourceSpecOrBuilder sourceSpec = mock(SourceSpecOrBuilder.class);
Mockito.when(sourceSpec.getConfigs()).thenReturn("{\"ttl\": 9223372036854775807}");
Map<String, Object> parsedConfig =
new ObjectMapper().readValue(sourceSpec.getConfigs(), new TypeReference<Map<String, Object>>() {});
Expand Down
Expand Up @@ -31,6 +31,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class ThreadRuntime implements Runtime {
private ThreadGroup threadGroup;
private FunctionCacheManager fnCache;
private String jarFile;
private ClientBuilder clientBuilder;
private PulsarClient pulsarClient;
private PulsarAdmin pulsarAdmin;
private String stateStorageServiceUrl;
Expand All @@ -74,7 +76,8 @@ public class ThreadRuntime implements Runtime {
FunctionCacheManager fnCache,
ThreadGroup threadGroup,
String jarFile,
PulsarClient pulsarClient,
PulsarClient client,
ClientBuilder clientBuilder,
PulsarAdmin pulsarAdmin,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
Expand All @@ -89,7 +92,8 @@ public class ThreadRuntime implements Runtime {
this.threadGroup = threadGroup;
this.fnCache = fnCache;
this.jarFile = jarFile;
this.pulsarClient = pulsarClient;
this.clientBuilder = clientBuilder;
this.pulsarClient = client;
this.pulsarAdmin = pulsarAdmin;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
Expand Down Expand Up @@ -167,6 +171,7 @@ public void start() throws Exception {
// re-initialize JavaInstanceRunnable so that variables in constructor can be re-initialized
this.javaInstanceRunnable = new JavaInstanceRunnable(
instanceConfig,
clientBuilder,
pulsarClient,
pulsarAdmin,
stateStorageServiceUrl,
Expand Down

0 comments on commit 04cc940

Please sign in to comment.