From 8dea755e3f3e88edbdf60fc005c2c8b2ce58ae40 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov <8622884+dlg99@users.noreply.github.com> Date: Fri, 22 Oct 2021 03:01:42 -0700 Subject: [PATCH] KCA doesn't handle unchecked unchecked ConnectException/KafkaException for the task, it may lead to the connector hanging (#12441) (cherry picked from commit 34f237b3d6ba7c51902fd9574e62f242431db313) --- .../connect/AbstractKafkaConnectSource.java | 21 ++- .../connect/ErrFileStreamSourceTask.java | 31 ++++ .../connect/KafkaConnectSourceErrTest.java | 145 ++++++++++++++++++ 3 files changed, 194 insertions(+), 3 deletions(-) create mode 100644 pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/ErrFileStreamSourceTask.java create mode 100644 pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceErrTest.java diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java index 1fee13b64862c..110e21de1b8c6 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java @@ -18,6 +18,15 @@ */ package org.apache.pulsar.io.kafka.connect; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.connect.runtime.TaskConfig; @@ -34,7 +43,6 @@ import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import org.apache.pulsar.kafka.shade.io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -155,6 +163,7 @@ public synchronized Record read() throws Exception { } catch (ExecutionException ex) { // log the error, continue execution log.error("execution exception while get flushFuture", ex); + throw new Exception("Flush failed", ex.getCause()); } finally { flushFuture = null; currentBatch = null; @@ -174,7 +183,6 @@ public void close() { private static Map PROPERTIES = Collections.emptyMap(); private static Optional RECORD_SEQUENCE = Optional.empty(); - private static long FLUSH_TIMEOUT_MS = 60000; public abstract class AbstractKafkaSourceRecord implements Record { @Getter @@ -235,8 +243,15 @@ private void completedFlushOffset(Throwable error, Void result) { flushFuture.complete(null); } catch (InterruptedException exception) { log.warn("Flush of {} offsets interrupted, cancelling", this); + Thread.currentThread().interrupt(); offsetWriter.cancelFlush(); - flushFuture.completeExceptionally(new Exception("Failed to commit offsets")); + flushFuture.completeExceptionally(new Exception("Failed to commit offsets", exception)); + } catch (Throwable t) { + // SourceTask can throw unchecked ConnectException/KafkaException. + // Make sure the future is cancelled in that case + log.warn("Flush of {} offsets failed, cancelling", this); + offsetWriter.cancelFlush(); + flushFuture.completeExceptionally(new Exception("Failed to commit offsets", t)); } } } diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/ErrFileStreamSourceTask.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/ErrFileStreamSourceTask.java new file mode 100644 index 0000000000000..d17f32cfc47a0 --- /dev/null +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/ErrFileStreamSourceTask.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import org.apache.kafka.connect.file.FileStreamSourceTask; + +public class ErrFileStreamSourceTask extends FileStreamSourceTask { + + @Override + public void commit() throws InterruptedException { + throw new org.apache.kafka.connect.errors.ConnectException("blah"); + } + +} diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceErrTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceErrTest.java new file mode 100644 index 0000000000000..cc04706f3ee45 --- /dev/null +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceErrTest.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.kafka.connect; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.connect.file.FileStreamSourceConnector; +import org.apache.kafka.connect.runtime.TaskConfig; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.SourceContext; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +/** + * Test the implementation of {@link KafkaConnectSource}. + */ +@Slf4j +public class KafkaConnectSourceErrTest extends ProducerConsumerBase { + + private Map config = new HashMap<>(); + private String offsetTopicName; + // The topic to publish data to, for kafkaSource + private String topicName; + private KafkaConnectSource kafkaConnectSource; + private File tempFile; + private SourceContext context; + private PulsarClient client; + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + + config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.pulsar.io.kafka.connect.ErrFileStreamSourceTask"); + config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + + this.offsetTopicName = "persistent://my-property/my-ns/kafka-connect-source-offset"; + config.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, offsetTopicName); + + this.topicName = "persistent://my-property/my-ns/kafka-connect-source"; + config.put(FileStreamSourceConnector.TOPIC_CONFIG, topicName); + tempFile = File.createTempFile("some-file-name", null); + config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsoluteFile().toString()); + config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE)); + + this.context = mock(SourceContext.class); + this.client = PulsarClient.builder() + .serviceUrl(brokerUrl.toString()) + .build(); + when(context.getPulsarClient()).thenReturn(this.client); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + if (this.client != null) { + this.client.close(); + } + tempFile.delete(); + super.internalCleanup(); + } + + @Test + public void testOpenAndRead() throws Exception { + kafkaConnectSource = new KafkaConnectSource(); + kafkaConnectSource.open(config, context); + + // use FileStreamSourceConnector, each line is a record, need "\n" and end of each record. + OutputStream os = Files.newOutputStream(tempFile.toPath()); + + String line1 = "This is the first line\n"; + os.write(line1.getBytes()); + os.flush(); + log.info("write 2 lines."); + + String line2 = "This is the second line\n"; + os.write(line2.getBytes()); + os.flush(); + + log.info("finish write, will read 2 lines"); + + // Note: FileStreamSourceTask read the whole line as Value, and set Key as null. + Record> record = kafkaConnectSource.read(); + String readBack1 = new String(record.getValue().getValue()); + assertTrue(line1.contains(readBack1)); + assertNull(record.getValue().getKey()); + log.info("read line1: {}", readBack1); + record.ack(); + + record = kafkaConnectSource.read(); + String readBack2 = new String(record.getValue().getValue()); + assertTrue(line2.contains(readBack2)); + assertNull(record.getValue().getKey()); + assertTrue(record.getPartitionId().isPresent()); + assertFalse(record.getPartitionIndex().isPresent()); + log.info("read line2: {}", readBack2); + record.ack(); + + String line3 = "This is the 3rd line\n"; + os.write(line3.getBytes()); + os.flush(); + + try { + kafkaConnectSource.read(); + fail("expected exception"); + } catch (Exception e) { + log.info("got exception", e); + assertTrue(e.getCause().getCause() instanceof org.apache.kafka.connect.errors.ConnectException); + } + } +}