Skip to content

Commit

Permalink
Change Kafkaconnect tests to use common collector and start collector…
Browse files Browse the repository at this point in the history
… before the job (#1367)

GitOrigin-RevId: 4e08299caab7fc4cc7007ae8af26578105eb8ce0
  • Loading branch information
orcunc authored and actions-user committed Apr 24, 2024
1 parent c8533cf commit 91a8ce2
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 64 deletions.
Expand Up @@ -22,7 +22,6 @@
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.collectors.MetricsCollector;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.SimpleTestInClusterSupport;
Expand Down Expand Up @@ -51,7 +50,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.io.File;
import java.io.Serializable;
import java.net.URISyntaxException;
Expand All @@ -63,13 +61,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.hazelcast.jet.Util.entry;
import static com.hazelcast.jet.config.ProcessingGuarantee.AT_LEAST_ONCE;
Expand All @@ -79,8 +71,6 @@
import static com.hazelcast.jet.kafka.connect.KafkaConnectSources.connect;
import static com.hazelcast.jet.pipeline.test.AssertionSinks.assertCollectedEventually;
import static com.hazelcast.test.OverridePropertyRule.set;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.kafka.connect.data.Values.convertToString;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -246,22 +236,20 @@ public void test_reading_and_writing_to_map() throws URISyntaxException {
Config config = smallInstanceConfig();
config.getJetConfig().setResourceUploadEnabled(true);
HazelcastInstance hz = createHazelcastInstance(config);
Job job = hz.getJet().newJob(pipeline, jobConfig);
MetricsRegistry metricsRegistry = getNode(hz).nodeEngine.getMetricsRegistry();

var collector = new KafkaMetricsCollector();
ScheduledExecutorService es = Executors.newScheduledThreadPool(1);
try {
es.scheduleWithFixedDelay(() -> metricsRegistry.collect(collector), 10, 10, MILLISECONDS);
job.join();
fail("Job should have completed with an AssertionCompletedException, but completed normally");
} catch (CompletionException e) {
String errorMsg = e.getCause().getMessage();
assertTrue("Job was expected to complete with AssertionCompletedException, but completed with: "
+ e.getCause(), errorMsg.contains(AssertionCompletedException.class.getName()));
assertTrueEventually(() -> assertThat(collector.getSourceRecordPollTotal()).isGreaterThan(ITEM_COUNT));
} finally {
shutdownAndAwaitTermination(es);
HazelcastInstance[] instances = {hz};
// Create the collector before the job to make sure it is scheduled
try (var collectors = new MultiNodeMetricsCollector<>(instances, new KafkaMetricsCollector())) {
try {
Job job = hz.getJet().newJob(pipeline, jobConfig);
job.join();
fail("Job should have completed with an AssertionCompletedException, but completed normally");
} catch (CompletionException e) {
String errorMsg = e.getCause().getMessage();
assertTrue("Job was expected to complete with AssertionCompletedException, but completed with: "
+ e.getCause(), errorMsg.contains(AssertionCompletedException.class.getName()));
assertTrueEventually(() -> assertThat(collectors.collector().getSourceRecordPollTotal()).isGreaterThan(ITEM_COUNT));
}
}
}

Expand Down Expand Up @@ -293,36 +281,27 @@ public void test_scaling() throws URISyntaxException {
config.getJetConfig().setResourceUploadEnabled(true);
HazelcastInstance[] hazelcastInstances = createHazelcastInstances(config, instanceCount);

HazelcastInstance hazelcastInstance = hazelcastInstances[0];
Job job = hazelcastInstance.getJet().newJob(pipeline, jobConfig);
var collectors = new MultiNodeMetricsCollector<>(hazelcastInstances, new KafkaMetricsCollector());
try {
job.join();
fail("Job should have completed with an AssertionCompletedException, but completed normally");
} catch (CompletionException e) {
String errorMsg = e.getCause().getMessage();
assertTrue("Job was expected to complete with AssertionCompletedException, but completed with: "
+ e.getCause(), errorMsg.contains(AssertionCompletedException.class.getName()));

assertTrueEventually(() -> {
assertThat(collectors.collector().getSourceRecordPollTotal()).isGreaterThan(ITEM_COUNT);
});
} finally {
collectors.close();
// Create the collector before the job to make sure it is scheduled
try (var collectors = new MultiNodeMetricsCollector<>(hazelcastInstances, new KafkaMetricsCollector())) {
try {
HazelcastInstance hazelcastInstance = hazelcastInstances[0];
Job job = hazelcastInstance.getJet().newJob(pipeline, jobConfig);
job.join();
fail("Job should have completed with an AssertionCompletedException, but completed normally");
} catch (CompletionException e) {
String errorMsg = e.getCause().getMessage();
assertTrue("Job was expected to complete with AssertionCompletedException, but completed with: "
+ e.getCause(), errorMsg.contains(AssertionCompletedException.class.getName()));

assertTrueEventually(() -> assertThat(collectors.collector().getSourceRecordPollTotal()).isGreaterThan(ITEM_COUNT));
}
}
}

private static String getTaskId(Order order) {
return order.headers.get("task.id");
}

@Nonnull
private static Map<String, List<Order>> groupByTaskId(List<Order> list) {
return list.stream()
.collect(Collectors.groupingBy(KafkaConnectIT::getTaskId,
Collectors.mapping(Function.identity(), toList())));
}

@Test
public void test_snapshotting() throws URISyntaxException {
Config config = smallInstanceConfig();
Expand Down Expand Up @@ -405,17 +384,6 @@ public void test_snapshotting() throws URISyntaxException {
}
}

private void shutdownAndAwaitTermination(ExecutorService executorService) {
try {
executorService.shutdown();
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
// Cancel currently executing tasks
executorService.shutdownNow();
}
} catch (InterruptedException ignored) {
}
}

private URL getDataGenConnectorURL() throws URISyntaxException {
ClassLoader classLoader = getClass().getClassLoader();
final String CONNECTOR_FILE_PATH = "confluentinc-kafka-connect-datagen-0.6.0.zip";
Expand Down
Expand Up @@ -20,31 +20,40 @@

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static com.hazelcast.jet.core.JetTestSupport.getNode;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

class MultiNodeMetricsCollector<T extends MetricsCollector> {
class MultiNodeMetricsCollector<T extends MetricsCollector> implements AutoCloseable {

private final ScheduledExecutorService scheduler;
private final T collector;

MultiNodeMetricsCollector(HazelcastInstance[] instances, T collector) {
this.scheduler = Executors.newScheduledThreadPool(instances.length);
this.collector = collector;

for (var inst : instances) {
var registry = getNode(inst).nodeEngine.getMetricsRegistry();
scheduler.scheduleAtFixedRate(() -> registry.collect(collector), 20, 3, MILLISECONDS);
// Schedule immediately
scheduler.scheduleAtFixedRate(() -> registry.collect(collector), 0, 3, MILLISECONDS);
}

this.collector = collector;
}

T collector() {
return collector;
}

@Override
public void close() {
scheduler.shutdownNow();
try {
scheduler.shutdown();
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
// Cancel currently executing tasks
scheduler.shutdownNow();
}
} catch (InterruptedException ignored) {
}
}
}

0 comments on commit 91a8ce2

Please sign in to comment.