forked from testcontainers/testcontainers-java
-
Notifications
You must be signed in to change notification settings - Fork 0
/
KafkaContainerTest.java
122 lines (101 loc) · 4.3 KB
/
KafkaContainerTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package org.testcontainers.containers;
import com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import java.time.Duration;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;
public class KafkaContainerTest {
@Test
public void testUsage() throws Exception {
try (KafkaContainer kafka = new KafkaContainer()) {
kafka.start();
testKafkaFunctionality(kafka.getBootstrapServers());
}
}
/**
* @deprecated the {@link Network} should be set explicitly with {@link KafkaContainer#withNetwork(Network)}.
*/
@Test
@Deprecated
public void testExternalZookeeperWithKafkaNetwork() throws Exception {
try (
KafkaContainer kafka = new KafkaContainer()
.withExternalZookeeper("zookeeper:2181");
GenericContainer zookeeper = new GenericContainer("confluentinc/cp-zookeeper:4.0.0")
.withNetwork(kafka.getNetwork())
.withNetworkAliases("zookeeper")
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
) {
zookeeper.start();
kafka.start();
testKafkaFunctionality(kafka.getBootstrapServers());
}
}
@Test
public void testExternalZookeeperWithExternalNetwork() throws Exception {
try (
Network network = Network.newNetwork();
KafkaContainer kafka = new KafkaContainer()
.withNetwork(network)
.withExternalZookeeper("zookeeper:2181");
GenericContainer zookeeper = new GenericContainer("confluentinc/cp-zookeeper:4.0.0")
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
) {
zookeeper.start();
kafka.start();
testKafkaFunctionality(kafka.getBootstrapServers());
}
}
protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
try (
KafkaProducer<String, String> producer = new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()
),
new StringSerializer(),
new StringSerializer()
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
),
new StringDeserializer(),
new StringDeserializer()
);
) {
String topicName = "messages";
consumer.subscribe(Arrays.asList(topicName));
producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();
Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
return false;
}
assertThat(records)
.hasSize(1)
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));
return true;
});
consumer.unsubscribe();
}
}
}