New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add auto-configuration for Reactor Kafka #30567
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you auto-configure only options. Why there are no auto-configured beans for KafkaReceiver
and KafkaSender
? Or even their respective Spring for Apache Kafka wrappers: ReactiveKafkaConsumerTemplate
and ReactiveKafkaProducerTemplate
?
Thanks for contribution anyway!
else { | ||
Pattern subscribePattern = this.reactiveKafkaProperties.getReceiver().getSubscribePattern(); | ||
if (subscribePattern != null) { | ||
receiverOptions = receiverOptions.subscription(subscribePattern); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider to use a PropertyMapper
for this kind of complex configuration properties settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@artembilan Great idea. As an alternative, I can perform something similar to:
ReactiveKafkaProperties.Receiver receiverProperties = this.reactiveKafkaProperties.getReceiver();
return receiverOptions
.atmostOnceCommitAheadSize(receiverProperties.getAtmostOnceCommitAheadSize())
.maxDeferredCommits(receiverProperties.getMaxDeferredCommits())
.maxCommitAttempts(receiverProperties.getMaxCommitAttempts())
.commitBatchSize(receiverProperties.getCommitBatchSize())
.closeTimeout(receiverProperties.getCloseTimeout())
.pollTimeout(receiverProperties.getPollTimeout())
.subscription(receiverProperties.getSubscribePattern());
But this means I won't check anymore for nulls, and I'll set ReceiverOptions
anyway. Do you think the nulls (and integer sanity value) check is really necessary here? I actually think we can give up on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for delay.
I don't think it is OK to propagate them blindly.
I might not be interested in subscription, so I won't provide that receiverProperties.getSubscribePattern()
and we will fail here with the Objects.requireNonNull(pattern)
.
Therefore PropertyMapper
is better way to accept those props which are expected and valid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
* the underlying Kafka consumer is not thread-safe, long poll intervals may delay | ||
* commits and other operations invoked using | ||
* {@link KafkaReceiver#doOnConsumer(java.util.function.Function)}. Very short | ||
* timeouts may reduce batching and increase load on the broker. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description is these configuration properties must be short and simple.
And it cannot be like a method Javadoc: we must not use any Javadoc tag and style in this descriptions.
This property indeed doesn't set anything.
See docs for more info: https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#features.developing-auto-configuration.custom-starter.configuration-keys
@artembilan That's actually a great idea. I've auto configured options basically because that's what we agreed on the issue. That's can be pretty easy to migrate this for an auto configuration to the templates. Would you suggest to still keep the auto configuration for the options (in case people would like to customize stuff)? |
Sure! I will leave that decision to @garyrussell since it looks like exactly he advised to auto-configure only options. |
I suggested configuring just the options because that is the biggest pain point - Boot already has mechanisms to configure the properties, which are identical to spring-kafka's; I don't see much benefit in creating the sender and receiver because they can't be reused and it only needs one more line for the user to write; so it doesn't save a lot of boilerplate. In most cases KafkaReceiver.create(options)
.receive()
. ...
.subscribe() The idea here was to get the minimal auto config and have the community request more as time goes by. Those |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We really need to pull Producer
, Consumer
etc in KafkaProperties to their own class(es) and use them in both places; these represent native Kafka Properties.
@ConditionalOnMissingBean(ReceiverOptions.class) | ||
public <K, V> ReceiverOptions<K, V> receiverOptions() { | ||
Map<String, Object> properties = this.kafkaProperties.buildConsumerProperties(); | ||
properties.putAll(this.reactiveKafkaProperties.buildReceiverProperties()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of this? It is adding reactor-kafka properties to the Kafka receiver properties; Kafka knows nothing about these properties.
@ConditionalOnMissingBean(SenderOptions.class) | ||
public <K, V> SenderOptions<K, V> senderOptions() { | ||
Map<String, Object> properties = this.kafkaProperties.buildProducerProperties(); | ||
properties.putAll(this.reactiveKafkaProperties.buildSenderProperties()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto - we shouldn't be adding these; they are reactor-kafka specific properties.
this.maxDeferredCommits = maxDeferredCommits; | ||
} | ||
|
||
public Map<String, Object> buildProperties() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need any of these build*Properties()
methods; all the kafka specific properties are already handled by parts of KafkaProperties
.
You'd like to do this in this PR or on another one? I think it would be better to get done on another PR since it changes Also, @artembilan & @garyrussell, I've resolved your notes, you're welcome to review the PR once more. |
Isn't that going to be a breaking change? Therefore such a fix cannot be applied to the current Spring Boot Do I miss anything else, @garyrussell ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise this is OK with me.
Thanks
*/ | ||
@AutoConfiguration | ||
@ConditionalOnClass({ KafkaReceiver.class, KafkaSender.class }) | ||
@ConditionalOnBean(KafkaProperties.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess something like @AutoConfigureAfter(KafkaAutoConfiguration.class)
is better otherwise we cannot predict the order of auto-configurations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea
Why would it be a breaking change? The boot team does not consider |
@garyrussell the only change that there is to do is to move the I wonder if it also means to move |
I would say so, yes. But, I don't know enough about Boot's property documentation as to whether such a change would break things, so we need input from the Boot team. |
@garyrussell I think this would only break native usage of the |
As I said
This has missed the 2.7.x train anyway so there is now plenty of time to resolve this. |
@garyrussell Ok. I'll add the commits for the classes split to this PR in the following days |
Thanks for the PR, @almogtavor. A few general comments:
I think things would benefit from taking a small step back and considering exactly how we want the receiver and sender options to be configured. Once that's been established, we can then map that into properties classes and auto-configuration. |
If we decided to "share" the consumer/producer properties they would be moved out to separate classes and then each instance var to the |
@wilkinsona I will reply w/ what I see in regards to the overlap of the properties and also stepping back and thinking about how we want to configure this thing....
So, the pattern is that all of the properties in the "ReceiverOptions.properties" map and the key and value serializer property public <K, V> Consumer<K, V> createConsumer(ReceiverOptions<K, V> config) {
return new KafkaConsumer<>(config.consumerProperties(),
config.keyDeserializer(),
config.valueDeserializer());
} These are the only "true" overlapping ones. All of the other options are used outside of the I am guessing they chose not to keep the key/value consumer property map entry and the 1st class key/value serde properties in sync since they are both passed into the KafkaConsumer constructor. We could do the same, or sync them if we want. I remember when we I used Possible directionTo summarize what I think I am hearing and also what I think would be a good direction:
Tradeoff of the split approach is that the consumer properties need to be mapped multiple times if someone is An example yaml would look like: spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
auto-offset-reset: earliest
auto-commit-interval: 5s
reactive:
consumer:
auto-offset-reset: latest
auto-commit-interval: 3s Thoughts? |
@@ -188,6 +188,7 @@ dependencies { | |||
optional("org.thymeleaf.extras:thymeleaf-extras-java8time") | |||
optional("org.thymeleaf.extras:thymeleaf-extras-springsecurity6") | |||
optional("redis.clients:jedis") | |||
optional("io.projectreactor.kafka:reactor-kafka") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[NIT] Wdyt about moving the dep to alpha-order by the other io.projectreactor
or another option, closer to spring-kafka
? It seems out of place here.
@@ -0,0 +1,37 @@ | |||
/* | |||
* Copyright 2012-2020 the original author or authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[NIT] Copyrights need updating.
* {@code reactor.kafka.receiver.internals.DefaultKafkaReceiver} beans. | ||
* | ||
* @author Almog Tavor | ||
* @since 2.7.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[NIT] This ship has sailed :) - @since
needs to be updated
import org.springframework.context.annotation.Bean; | ||
|
||
/** | ||
* {@link EnableAutoConfiguration Auto-configuration} for the Reactive client of Apache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[NIT] We have a bit of inconsistent description of the "thing" we are doing. We call it several things, "Reactive client of Apache Kafka" here and slightly different things elsewhere in this PR - maybe consolidate on one? I like ReactorKafka
personally.
class ReactiveKafkaAutoConfigurationTests { | ||
|
||
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() | ||
.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be simplified into single withConfiguration
by passing both auto-configs into AutoConfigurations.of
. That is the typical pattern I see in the project.
* @since 2.7.0 | ||
*/ | ||
@AutoConfiguration | ||
@ConditionalOnClass({ KafkaReceiver.class, KafkaSender.class }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be more direct to guard on the thing we are auto-configuring (ReceiverOptions/SenderOptions
) rather than something that happens to use them and is in the same library.
* @since 2.7.0 | ||
*/ | ||
@FunctionalInterface | ||
public interface DefaultKafkaReceiverCustomizer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are only auto-configuring the Receiver/SenderOptions
I think we should drop these customizers out of this proposal. They also are not currently being called nor tested.
Instead we could add the Receiver/SenderOptionsCustomizer
called out in spring-cloud/spring-cloud-stream#2296. We could also just add the options customizers in a separate PR under the SCS ticket.
If we do any customizers in this PR though they will need to be invoked in an order manner in the AC and have tests for this.
* Additional properties, common to producers and consumers, used to configure the | ||
* client. | ||
*/ | ||
private final Map<String, String> properties = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These extra "properties" are not needed as we do not do anything w/ them.
Map<String, Object> properties = this.kafkaProperties.buildConsumerProperties(); | ||
ReceiverOptions<K, V> receiverOptions = ReceiverOptions.create(properties); | ||
ReactiveKafkaProperties.Receiver receiverProperties = this.reactiveKafkaProperties.getReceiver(); | ||
receiverOptions = setPropertyWhenGreaterThanZero(receiverProperties.getAtmostOnceCommitAheadSize(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[NIT-PREFERENCE] I would prefer to use the PropertyMapper API inline w/o the convenience methods below. There are many examples that do this same thing such as CassandraAutoConfiguration and JettyWebServerFactoryCustomizer
} | ||
|
||
@Bean | ||
@ConditionalOnMissingBean(SenderOptions.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bean type is redundant here as the return type of the method is the default.
Thanks for sharing your thoughts, @onobc.
I'm not too comfortable with that. Once Boot surfaces the properties, IDEs will offer auto-completion for them which increases the chances of two similarly named properties causing confusion for users and being difficult to use as a result. If the two properties serve the same purpose, I think they should be consolidated into a single property. If the two properties serve different purposes, I think they should be renamed. In either case, I suspect some changes in Spring Kafka and/or Reactor Kafka will be necessary so that the property or properties map onto similar settings. |
@wilkinsona that makes sense. So a deeper level of understanding of these "overlapping" properties is required to understand what to do for each one (consolidate, rename, leave-alone, etc.). I will try to take a scan today and get a list of these properties so we can start discussing. Other than the few questionable properties, wdyt of the other points:
|
I think Let's continue the discussion on #29080 rather than here please. Once it's reached a conclusion we can decide if this PR is roughly applicable or if a different approach is needed. |
Just for another data point - see #17420 and the discussion on the linked PR. The number of Kafka properties are overwhelming; when we first added auto config, we picked a subset of properties to be first class, as discussed here: https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties Specifically,
We have added others over time at user request (such as isolation level on that linked PR). Some kind of code generation of the properties from the kafka-clients |
Some notes:
kafka
hierarchy and notreactor.kafka
since there is no other component that its reactive equivalent has its folder. This can also be easier when a so-called "common kafka autoconfigurations superclass" will be implemented. This can also be easier to maintain when changes will be needed to get implemented since both of the autoconfigurations will sit at the same place.ReactiveKafkaProperties
has a staticProperties
class of its own.bootstrapServers
are defined inKafkaProperties
,ReactiveKafkaProperties
is annotated with@ConditionalOnBean(KafkaProperties.class)
.ReceiverOptions
's &SenderOptions
's setters with null checks\ greater than zero checks, and not just building of a class likeImmutableReceiverOptions
since this class is internal and not public.DefaultKafkaReceiver
&DefaultKafkaSender
are classified as internals.There is no autoconfiguration for the
scheduler
,assignListeners
,revokeListeners
andschedulerSupplier
fields.reactor.kafka.sender.ImmutableSenderOptions
is not public it is currently hard to pass fields like Scheduler since it is being read from the configuration file as a field of typeClass<?>
, which cannot be easily passed tosenderOptions.scheduler()
.assignListeners
andrevokeListeners
seemed unnecessary to be configured from a YAML, and even if we'd wanted to, I think it'd be pretty hard to accomplish.schedulerSupplier
also seemed unnecessary to me for autoconfigurations.assignTopicPartitions
due to problems with binding aString
&int
KV pair from the YAML file as aTopicPartition
class.