Skip to content

Commit

Permalink
KCA doesn't handle unchecked unchecked ConnectException/KafkaExceptio…
Browse files Browse the repository at this point in the history
…n for the task, it may lead to the connector hanging (apache#12441)

(cherry picked from commit 34f237b)
  • Loading branch information
dlg99 authored and nicoloboschi committed Oct 22, 2021
1 parent f34a324 commit 8dea755
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 3 deletions.
Expand Up @@ -18,6 +18,15 @@
*/
package org.apache.pulsar.io.kafka.connect;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.runtime.TaskConfig;
Expand All @@ -34,7 +43,6 @@
import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -155,6 +163,7 @@ public synchronized Record<T> read() throws Exception {
} catch (ExecutionException ex) {
// log the error, continue execution
log.error("execution exception while get flushFuture", ex);
throw new Exception("Flush failed", ex.getCause());
} finally {
flushFuture = null;
currentBatch = null;
Expand All @@ -174,7 +183,6 @@ public void close() {

private static Map<String, String> PROPERTIES = Collections.emptyMap();
private static Optional<Long> RECORD_SEQUENCE = Optional.empty();
private static long FLUSH_TIMEOUT_MS = 60000;

public abstract class AbstractKafkaSourceRecord<T> implements Record {
@Getter
Expand Down Expand Up @@ -235,8 +243,15 @@ private void completedFlushOffset(Throwable error, Void result) {
flushFuture.complete(null);
} catch (InterruptedException exception) {
log.warn("Flush of {} offsets interrupted, cancelling", this);
Thread.currentThread().interrupt();
offsetWriter.cancelFlush();
flushFuture.completeExceptionally(new Exception("Failed to commit offsets"));
flushFuture.completeExceptionally(new Exception("Failed to commit offsets", exception));
} catch (Throwable t) {
// SourceTask can throw unchecked ConnectException/KafkaException.
// Make sure the future is cancelled in that case
log.warn("Flush of {} offsets failed, cancelling", this);
offsetWriter.cancelFlush();
flushFuture.completeExceptionally(new Exception("Failed to commit offsets", t));
}
}
}
Expand Down
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.io.kafka.connect;

import org.apache.kafka.connect.file.FileStreamSourceTask;

public class ErrFileStreamSourceTask extends FileStreamSourceTask {

@Override
public void commit() throws InterruptedException {
throw new org.apache.kafka.connect.errors.ConnectException("blah");
}

}
@@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.kafka.connect;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SourceContext;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.io.File;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

/**
* Test the implementation of {@link KafkaConnectSource}.
*/
@Slf4j
public class KafkaConnectSourceErrTest extends ProducerConsumerBase {

private Map<String, Object> config = new HashMap<>();
private String offsetTopicName;
// The topic to publish data to, for kafkaSource
private String topicName;
private KafkaConnectSource kafkaConnectSource;
private File tempFile;
private SourceContext context;
private PulsarClient client;

@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();

config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.pulsar.io.kafka.connect.ErrFileStreamSourceTask");
config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");

this.offsetTopicName = "persistent://my-property/my-ns/kafka-connect-source-offset";
config.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, offsetTopicName);

this.topicName = "persistent://my-property/my-ns/kafka-connect-source";
config.put(FileStreamSourceConnector.TOPIC_CONFIG, topicName);
tempFile = File.createTempFile("some-file-name", null);
config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsoluteFile().toString());
config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));

this.context = mock(SourceContext.class);
this.client = PulsarClient.builder()
.serviceUrl(brokerUrl.toString())
.build();
when(context.getPulsarClient()).thenReturn(this.client);
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
if (this.client != null) {
this.client.close();
}
tempFile.delete();
super.internalCleanup();
}

@Test
public void testOpenAndRead() throws Exception {
kafkaConnectSource = new KafkaConnectSource();
kafkaConnectSource.open(config, context);

// use FileStreamSourceConnector, each line is a record, need "\n" and end of each record.
OutputStream os = Files.newOutputStream(tempFile.toPath());

String line1 = "This is the first line\n";
os.write(line1.getBytes());
os.flush();
log.info("write 2 lines.");

String line2 = "This is the second line\n";
os.write(line2.getBytes());
os.flush();

log.info("finish write, will read 2 lines");

// Note: FileStreamSourceTask read the whole line as Value, and set Key as null.
Record<KeyValue<byte[], byte[]>> record = kafkaConnectSource.read();
String readBack1 = new String(record.getValue().getValue());
assertTrue(line1.contains(readBack1));
assertNull(record.getValue().getKey());
log.info("read line1: {}", readBack1);
record.ack();

record = kafkaConnectSource.read();
String readBack2 = new String(record.getValue().getValue());
assertTrue(line2.contains(readBack2));
assertNull(record.getValue().getKey());
assertTrue(record.getPartitionId().isPresent());
assertFalse(record.getPartitionIndex().isPresent());
log.info("read line2: {}", readBack2);
record.ack();

String line3 = "This is the 3rd line\n";
os.write(line3.getBytes());
os.flush();

try {
kafkaConnectSource.read();
fail("expected exception");
} catch (Exception e) {
log.info("got exception", e);
assertTrue(e.getCause().getCause() instanceof org.apache.kafka.connect.errors.ConnectException);
}
}
}

0 comments on commit 8dea755

Please sign in to comment.