Skip to content

Commit

Permalink
[hotfix] Replace deprecated DescribeTopicsResult#all, KafkaConsumer#p…
Browse files Browse the repository at this point in the history
…oll and ContainerState#getContainerIpAddress
  • Loading branch information
snuyanzin committed Oct 25, 2022
1 parent 8e16cc8 commit 123bdae
Show file tree
Hide file tree
Showing 10 changed files with 19 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public KinesaliteContainer(DockerImageName imageName) {

/** Returns the endpoint url to access the container from outside the docker network. */
public String getContainerEndpointUrl() {
return String.format(URL_FORMAT, getContainerIpAddress(), getMappedPort(PORT));
return String.format(URL_FORMAT, getHost(), getMappedPort(PORT));
}

/** Returns the endpoint url to access the host from inside the docker network. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static Map<String, TopicDescription> getAllTopicMetadata(AdminClient adminClient
static Map<String, TopicDescription> getTopicMetadata(
AdminClient adminClient, Set<String> topicNames) {
try {
return adminClient.describeTopics(topicNames).all().get();
return adminClient.describeTopics(topicNames).allTopicNames().get();
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to get metadata for topics %s.", topicNames), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import javax.annotation.Nonnull;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -258,7 +259,7 @@ public void run() {
// over
if (records == null) {
try {
records = consumer.poll(pollTimeout);
records = consumer.poll(Duration.ofMillis(pollTimeout));
} catch (WakeupException we) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed

protected Map<String, TopicDescription> getTopicMetadata(List<String> topics) {
try {
return kafkaAdminClient.describeTopics(topics).all().get();
return kafkaAdminClient.describeTopics(topics).allTopicNames().get();
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to get metadata for topics %s.", topics), e);
Expand All @@ -210,7 +210,7 @@ protected Map<String, TopicDescription> getTopicMetadata(List<String> topics) {

private boolean topicExists(String topic) {
try {
kafkaAdminClient.describeTopics(Arrays.asList(topic)).all().get();
kafkaAdminClient.describeTopics(Arrays.asList(topic)).allTopicNames().get();
return true;
} catch (Exception e) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,10 @@ private KafkaPartitionDataWriter scaleOutTopic(String topicName) throws Exceptio
final Set<String> topics = adminClient.listTopics().names().get();
if (topics.contains(topicName)) {
final Map<String, TopicDescription> topicDescriptions =
adminClient.describeTopics(Collections.singletonList(topicName)).all().get();
adminClient
.describeTopics(Collections.singletonList(topicName))
.allTopicNames()
.get();
final int numPartitions = topicDescriptions.get(topicName).partitions().size();
LOG.info("Creating partition {} for topic '{}'", numPartitions + 1, topicName);
adminClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private void assertRecord(String topicName, String expectedKey, String expectedV
kafkaConsumer.subscribe(Collections.singletonList(topicName));
ConsumerRecords<String, String> records = ConsumerRecords.empty();
while (records.isEmpty()) {
records = kafkaConsumer.poll(10000);
records = kafkaConsumer.poll(Duration.ofMillis(10000));
}

ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void createTestTopic(
topicDescriptions =
adminClient
.describeTopics(Collections.singleton(topic))
.all()
.allTopicNames()
.get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("Exception caught when describing Kafka topics", e);
Expand Down Expand Up @@ -339,7 +339,10 @@ public void stopBroker(int brokerId) throws Exception {
public int getLeaderToShutDown(String topic) throws Exception {
try (final AdminClient client = AdminClient.create(getStandardProperties())) {
TopicDescription result =
client.describeTopics(Collections.singleton(topic)).all().get().get(topic);
client.describeTopics(Collections.singleton(topic))
.allTopicNames()
.get()
.get(topic);
return result.partitions().get(0).leader().id();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private Map<String, TopicDescription> describeExternalTopics() {
.map(TopicListing::name)
.collect(Collectors.toList());

return adminClient.describeTopics(topics).all().get();
return adminClient.describeTopics(topics).allTopicNames().get();
} catch (Exception e) {
throw new RuntimeException("Failed to list Kafka topics", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ protected void configure() {
}

public String getSchemaRegistryUrl() {
return "http://" + getContainerIpAddress() + ":" + getMappedPort(8082);
return "http://" + getHost() + ":" + getMappedPort(8082);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public AmazonS3 getClient() {
}

private String getHttpEndpoint() {
return String.format("http://%s:%s", getContainerIpAddress(), getMappedPort(DEFAULT_PORT));
return String.format("http://%s:%s", getHost(), getMappedPort(DEFAULT_PORT));
}

/**
Expand Down

0 comments on commit 123bdae

Please sign in to comment.