From a7cea97a9cfe941deab46ba18d46ed326cf94092 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Thu, 21 Oct 2021 17:50:53 -0700 Subject: [PATCH 1/2] Stop OffsetStore when stopping the connector --- .../connect/AbstractKafkaConnectSource.java | 6 ++++++ .../kafka/connect/PulsarOffsetBackingStore.java | 16 ++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java index 3901c5f3d176b..5bd85a00f1762 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java @@ -180,6 +180,12 @@ public synchronized Record read() throws Exception { public void close() { if (sourceTask != null) { sourceTask.stop(); + sourceTask = null; + } + + if (offsetStore != null) { + offsetStore.stop(); + offsetStore = null; } } diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java index b1338f837bb49..352e065c8b21e 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java @@ -158,12 +158,19 @@ public void start() { @Override public void stop() { + log.info("Stopping PulsarOffsetBackingStore"); if (null != producer) { + try { + producer.flush(); + } catch (PulsarClientException pce) { + log.warn("Failed to flush the producer", pce); + } try { producer.close(); } catch (PulsarClientException e) { log.warn("Failed to close producer", e); } + producer = null; } if (null != reader) { try { @@ -171,6 +178,15 @@ public void stop() { } catch (IOException e) { log.warn("Failed to close reader", e); } + reader = null; + } + if (null != client) { + try { + client.close(); + } catch (IOException e) { + log.warn("Failed to close client", e); + } + client = null; } } From 7a6ff6b699952779da6f9f5d0c2f26f3659bb89c Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 22 Oct 2021 08:45:06 -0700 Subject: [PATCH 2/2] don't close the client in version after 2.8(2.9?) as it is passed through the context --- .../io/kafka/connect/PulsarOffsetBackingStore.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java index 352e065c8b21e..495c8b9c194c0 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java @@ -180,14 +180,8 @@ public void stop() { } reader = null; } - if (null != client) { - try { - client.close(); - } catch (IOException e) { - log.warn("Failed to close client", e); - } - client = null; - } + + // do not close the client, it is provided by the sink context } @Override