diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java index 4df0603f8acfc..3f1edef8ec9dd 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java @@ -176,6 +176,12 @@ public synchronized Record read() throws Exception { public void close() { if (sourceTask != null) { sourceTask.stop(); + sourceTask = null; + } + + if (offsetStore != null) { + offsetStore.stop(); + offsetStore = null; } } diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java index b63dd0fb0f0d8..ceeb09f1c3fbc 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java @@ -169,12 +169,19 @@ public void start() { @Override public void stop() { + log.info("Stopping PulsarOffsetBackingStore"); if (null != producer) { + try { + producer.flush(); + } catch (PulsarClientException pce) { + log.warn("Failed to flush the producer", pce); + } try { producer.close(); } catch (PulsarClientException e) { log.warn("Failed to close producer", e); } + producer = null; } if (null != reader) { try { @@ -182,6 +189,7 @@ public void stop() { } catch (IOException e) { log.warn("Failed to close reader", e); } + reader = null; } if (null != client) { try {