From a220559ae77f793dee819f3e815927c806078d91 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov <8622884+dlg99@users.noreply.github.com> Date: Sat, 23 Oct 2021 21:34:38 -0700 Subject: [PATCH] Stop OffsetStore when stopping the connector (#12457) ### Motivation Source connectors based on KCA (all debezium ones) don't stop properly on error / don't restart. https://github.com/apache/pulsar/pull/12441 fixes one problem, this PR fixes another: ofsetStore is not closed on connector stop() and producer/consumer aren't closed too, preventing the connector from shutting down. ### Modifications Closing offset store on connector stop. --- .../io/kafka/connect/AbstractKafkaConnectSource.java | 6 ++++++ .../io/kafka/connect/PulsarOffsetBackingStore.java | 10 ++++++++++ 2 files changed, 16 insertions(+) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java index 65c0c5c821940c..4612633677b1a6 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java @@ -179,6 +179,12 @@ public synchronized Record read() throws Exception { public void close() { if (sourceTask != null) { sourceTask.stop(); + sourceTask = null; + } + + if (offsetStore != null) { + offsetStore.stop(); + offsetStore = null; } } diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java index b1338f837bb49e..495c8b9c194c03 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java @@ -158,12 +158,19 @@ public void start() { @Override public void stop() { + log.info("Stopping PulsarOffsetBackingStore"); if (null != producer) { + try { + producer.flush(); + } catch (PulsarClientException pce) { + log.warn("Failed to flush the producer", pce); + } try { producer.close(); } catch (PulsarClientException e) { log.warn("Failed to close producer", e); } + producer = null; } if (null != reader) { try { @@ -171,7 +178,10 @@ public void stop() { } catch (IOException e) { log.warn("Failed to close reader", e); } + reader = null; } + + // do not close the client, it is provided by the sink context } @Override