Skip to content

Commit

Permalink
[hotfix] Replace deprecated DescribeTopicsResult#all, KafkaConsumer#p…
Browse files Browse the repository at this point in the history
…oll and ContainerState#getContainerIpAddress
  • Loading branch information
snuyanzin committed Nov 14, 2022
1 parent 3c9a394 commit 4c3b6c7
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static Map<String, TopicDescription> getAllTopicMetadata(AdminClient adminClient
static Map<String, TopicDescription> getTopicMetadata(
AdminClient adminClient, Set<String> topicNames) {
try {
return adminClient.describeTopics(topicNames).all().get();
return adminClient.describeTopics(topicNames).allTopicNames().get();
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to get metadata for topics %s.", topicNames), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import javax.annotation.Nonnull;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -258,7 +259,7 @@ public void run() {
// over
if (records == null) {
try {
records = consumer.poll(pollTimeout);
records = consumer.poll(Duration.ofMillis(pollTimeout));
} catch (WakeupException we) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed

protected Map<String, TopicDescription> getTopicMetadata(List<String> topics) {
try {
return kafkaAdminClient.describeTopics(topics).all().get();
return kafkaAdminClient.describeTopics(topics).allTopicNames().get();
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to get metadata for topics %s.", topics), e);
Expand All @@ -202,7 +202,7 @@ protected Map<String, TopicDescription> getTopicMetadata(List<String> topics) {

private boolean topicExists(String topic) {
try {
kafkaAdminClient.describeTopics(Arrays.asList(topic)).all().get();
kafkaAdminClient.describeTopics(Arrays.asList(topic)).allTopicNames().get();
return true;
} catch (Exception e) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,10 @@ private KafkaPartitionDataWriter scaleOutTopic(String topicName) throws Exceptio
final Set<String> topics = adminClient.listTopics().names().get();
if (topics.contains(topicName)) {
final Map<String, TopicDescription> topicDescriptions =
adminClient.describeTopics(Collections.singletonList(topicName)).all().get();
adminClient
.describeTopics(Collections.singletonList(topicName))
.allTopicNames()
.get();
final int numPartitions = topicDescriptions.get(topicName).partitions().size();
LOG.info("Creating partition {} for topic '{}'", numPartitions + 1, topicName);
adminClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private void assertRecord(String topicName, String expectedKey, String expectedV
kafkaConsumer.subscribe(Collections.singletonList(topicName));
ConsumerRecords<String, String> records = ConsumerRecords.empty();
while (records.isEmpty()) {
records = kafkaConsumer.poll(10000);
records = kafkaConsumer.poll(Duration.ofMillis(10000));
}

ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void createTestTopic(
topicDescriptions =
adminClient
.describeTopics(Collections.singleton(topic))
.all()
.allTopicNames()
.get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("Exception caught when describing Kafka topics", e);
Expand Down Expand Up @@ -331,7 +331,10 @@ public void stopBroker(int brokerId) throws Exception {
public int getLeaderToShutDown(String topic) throws Exception {
try (final AdminClient client = AdminClient.create(getStandardProperties())) {
TopicDescription result =
client.describeTopics(Collections.singleton(topic)).all().get().get(topic);
client.describeTopics(Collections.singleton(topic))
.allTopicNames()
.get()
.get(topic);
return result.partitions().get(0).leader().id();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private Map<String, TopicDescription> describeExternalTopics() {
.map(TopicListing::name)
.collect(Collectors.toList());

return adminClient.describeTopics(topics).all().get();
return adminClient.describeTopics(topics).allTopicNames().get();
} catch (Exception e) {
throw new RuntimeException("Failed to list Kafka topics", e);
}
Expand Down

0 comments on commit 4c3b6c7

Please sign in to comment.