Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default for spring.kafka.listener.missing-topics-fatal is no longer aligned with Spring Kafka's default #20917

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -52,6 +52,7 @@
* @author Stephane Nicoll
* @author Artem Bilan
* @author Nakul Mishra
* @author Dhiren Mathur
* @since 1.5.0
*/
@ConfigurationProperties(prefix = "spring.kafka")
Expand Down Expand Up @@ -906,7 +907,7 @@ public enum Type {
* Whether the container should fail to start if at least one of the configured
* topics are not present on the broker.
*/
private boolean missingTopicsFatal = true;
private boolean missingTopicsFatal = false;

public Type getType() {
return this.type;
Expand Down
Expand Up @@ -87,6 +87,7 @@
* @author Stephane Nicoll
* @author Eddú Meléndez
* @author Nakul Mishra
* @author Dhiren Mathur
*/
class KafkaAutoConfigurationTests {

Expand Down Expand Up @@ -370,7 +371,7 @@ void listenerProperties() {
"spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch",
"spring.kafka.listener.idle-event-interval=1s", "spring.kafka.listener.monitor-interval=45",
"spring.kafka.listener.log-container-config=true",
"spring.kafka.listener.missing-topics-fatal=false", "spring.kafka.jaas.enabled=true",
"spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
"spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo",
"spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true")
.run((context) -> {
Expand All @@ -395,7 +396,7 @@ void listenerProperties() {
assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L);
assertThat(containerProperties.getMonitorInterval()).isEqualTo(45);
assertThat(containerProperties.isLogContainerConfig()).isTrue();
assertThat(containerProperties.isMissingTopicsFatal()).isFalse();
assertThat(containerProperties.isMissingTopicsFatal()).isTrue();
assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3);
assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue();
assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)).hasSize(1);
Expand Down Expand Up @@ -586,6 +587,17 @@ void testConcurrentKafkaListenerContainerFactoryWithCustomConsumerFactory() {
});
}

@Test
void testConcurrentKafkaListenerContainerFactoryMatchesDefaults() {
Listener listenerProperties = new KafkaProperties().getListener();
this.contextRunner.withUserConfiguration(ConsumerFactoryConfiguration.class).run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(kafkaListenerContainerFactory.getContainerProperties().isMissingTopicsFatal())
.isEqualTo(listenerProperties.isMissingTopicsFatal());
});
}

@Test
void specificSecurityProtocolOverridesCommonSecurityProtocol() {
this.contextRunner.withPropertyValues("spring.kafka.security.protocol=SSL",
Expand Down