Skip to content

Commit

Permalink
[FLINK-30056][Connectors/Kafka] Make polling for metadata no more tha…
Browse files Browse the repository at this point in the history
…n specified timeout by using new Consumer#poll(Duration)
  • Loading branch information
snuyanzin committed Nov 23, 2022
1 parent 7828d77 commit 22e4694
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public void run() {
// over
if (records == null) {
try {
records = consumer.poll(pollTimeout);
records = consumer.poll(Duration.ofMillis(pollTimeout));
} catch (WakeupException we) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private void assertRecord(String topicName, String expectedKey, String expectedV
kafkaConsumer.subscribe(Collections.singletonList(topicName));
ConsumerRecords<String, String> records = ConsumerRecords.empty();
while (records.isEmpty()) {
records = kafkaConsumer.poll(10000);
records = kafkaConsumer.poll(Duration.ofMillis(10000));
}

ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
Expand Down

0 comments on commit 22e4694

Please sign in to comment.