Open thepaep opened 2 years ago
+1
@snicoll @simonbasle @garyrussell any help with this?
Can someone describe what features are currently missing on this subject and how should they be implemented (what functionalities should be added and where), so it'll be easier for contributors to help with this?
+1
@lynch19 @roger751 please use the reaction on the original description rather than +1 comments like this.
@lynch19 reactor-kafka has two fundamental properties objects
Each with a number of properties.
In addition, creation of these takes a Map<String, Object>
of regular Kafka Properties (similar to the ...properties extension for the spring-kafka auto configuration).
There are several levels of Boot auto configuration that would be useful, with the MVP being the auto-configuration of these two beans using application properties.
I'm using the reactive kafka in my applications. The Reactor kafka provides everything you need to work. Or does it means support in the form of annotations? For example:
@ReactiveKafkaListener(topic = "smth", autoCommit=true)
public Mono<Void> consumer(DTO dto) {
log.info("received dto: " + dto);
return service.call(dto).then();
}
No; not at all; this is just about configuring the sender and receiver options via Boot properties.
Such a mechanism would belong in Spring for Apache Kafka (spring-kafka), but there are currently no plans to do so.
I would suggest something like package org.springframework.boot.autoconfigure.reactor.kafka
in the spring-boot-autoconfigure project.
We can leverage the existing KafkaProperties
for the consumer and producer properties passed into the receiver and sender options.
This has been requested previously (https://github.com/spring-projects/spring-boot/issues/18751) but was declined as the Reactor team advised us against adding support. IIRC, this was due to the status of Reactor's Kafka support at the time and things may well have moved on since then.
@simonbasle, what's you take on this now please? Has this moved on sufficiently in the last couple of years that this is now worth considering again?
@wilkinsona with the broad changes in 1.3.x late 2020, reactor-kafka has been made more stable and maintainable. thanks to @garyrussell and @artembilan, the project is more actively maintained. thus I would defer to gary and artem regarding that decision, but it definitely looks better than back in 2019
I concur; it is in much better shape now, thanks to some significant work by Sergie back then. Also, due to community requests, the Spring Cloud Stream team are likely to incorporate it in the next major release and basic auto configuration of the sender and receiver properties will make things easier there too.
@garyrussell what will it exactly demand? Only ReactiveKafkaAutoConfiguration
, ReactiveKafkaAnnotationDrivenConfiguration
(that will use KafkaProperties
\ ReactiveKafkaProperties
), ReceiverOptionsCustomizer
and SenderOptionsCustomizer
classes? Or is there anything else that needs to be noted?
We can leverage the existing
KafkaProperties
for the consumer and producer properties passed into the receiver and sender options.
Do you mean that we should use the existing KafkaProperties
class, right?
@almogtavor There would be no ReactiveKafkaAnnotationDrivenConfiguration
- there are no annotations in that project.
As I said above, I'd say the MVP would be a ReactiveKafkaAutoConfiguration
creating 2 beans SenderOptions
and ReceiverOptions
.
For example
@Bean
ReceiverOptions receiverOptions(KafkaProperties kp, ReactiveKafkaProperties rkp) {
ReceiverOptions opts = ReceiverOptions.create(kp.buildConsumerProperties());
/// apply rkps.getReceiver() properties here
return opts;
}
Calling options customizers (if configured) before returning might be a nice addition, but not really a requirement because the user can further customize the properties where used to create a receiver/sender - since the options are immutable, the base options can be altered in different ways for each usage. A object is created each time a property is added.
@garyrussell Seems great. Is there any need for using KafkaProperties
? Seems that we can easily auto-configure all of the ReceiverOptions
and SenderOptions
parameters.
KafkaProperties.Consumer
and ....Producer
encompass common Kafka consumer and producer properties, as well as a general .properties
node, (passed into the create()
methods as in my example above). We don't want to duplicate all of those properties here.
Notice that there are not a lot of common parameters between the two (KafkaProperties.Consumer
and KafkaProperties.Receiver
have only 3 parameters in common). There are also some caveats I'd like you to take a look at:
KafkaProperties
.KafkaProperties
and some of ReactiveKafkaProperties
, users will have to auto-configure some parameters with spring.kafka.whatever
, and some with spring.reactor.kafka.whatever
. I don't see here any other option.Notice that there are not a lot of common parameters between the two (KafkaProperties.Consumer and KafkaProperties.Receiver have only 3 parameters in common). There are also some caveats I'd like you to take a look at:
I am not talking about those properties, I am specifically talking about the ConsumerConfig
and ProducerConfig
kafka-clients properties (which are handled within KafkaProperties
- some being first class properties that boot knows about, but other generic properties; see https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties
Ideally, that part of KafkaProperties
will be pulled out into a common super class.
Agree about the common super class. Is this a thing you'd want later on or at the very first PR?
@garyrussell I've raised a PR.
As a continuation of the discussion in the PR...
For the others, I think it's too soon to say. I think we need to know what we want the auto-configuration to offer before we spend any more time thinking about the precise details. What's offered should be defined as the beans that will be auto-configured and the properties that will be available to control their configuration
I believe we want to auto-configure only the ReceiverOptions
and SenderOptions
in the initial offering.
@garyrussell @artembilan @almogtavor is this your understanding as well?
and the properties that will be available to control their configuration
The properties would be the 1st class properties offered by the Receiver/Sender options as well as the ability to specify the normal consumer/producer properties currently available in KafkaProperties.Consumer/Producer. It does sound like we may only want to offer a subset of those. And there is also a question of "overlapping" properties to figure out.
As a continuation from this comment on the PR...
The number of Kafka properties are overwhelming; when we first added auto config, we picked a subset of properties to be first class, as discussed here: https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties
Specifically,
Spring Boot auto-configuration supports all HIGH importance properties, some selected MEDIUM and LOW properties, and any properties that do not have a default value.
We have added others over time at user request (such as isolation level on that linked PR).
@garyrussell are you suggesting we should drop some of the properties in the current PR? I think the number of primitive properties on the ReceiverOptions (the ones chosen in the associated PR) are not too overwhelming. The more complicated props can be adjusted as needed in the options customizer that will be added shortly after.
Some kind of code generation of the properties from the kafka-clients *Config classes would be ideal.
Yes, this would be nice.
I believe we want to auto-configure only the
ReceiverOptions
andSenderOptions
in the initial offering.
That was my proposal for the MVP above, yes:
https://github.com/spring-projects/spring-boot/issues/29080#issuecomment-1005165885
For the "overlapping" properties (for spring-kafka), the hierarchy is as follows (e.g. for the consumer properties).
spring.kafka.properties
...consumer.properties
i.e. with:
spring.kafka.properties.auto.offset.reset=...
spring.kafka.consumer.auto-offset-reset=...
spring.kafka.consumer.properties.auto.offset.reset-...
the last one would win.
drop some of the properties in the current PR?
I am not sure what you mean; I am suggesting using the Producer and Consumer (from KafkaProperties
) to build the map for ReceiverOptions.create()
and expose the other methods that take simple values as Boot properties.
For complex types, for example, users can grab the auto configured ReceiverOptions
and add listeners to it.
Given that the default implementation of ReceiverOptions
is immutable, each use can "modify" the auto wired bean without affecting other uses.
@Autowired
ReceiverOptions ro;
...
ReceiverOptions one = ro.addAssignListener(...).subscription(List.of("topic1"));
ReceiverOptions two = ro.addAssignListener(...).subscription(List.of("topic2"));
one and two will be different objects.
For the "overlapping" properties (for spring-kafka), the hierarchy is as follows (e.g. for the consumer properties)
@garyrussell
I agree w/ the property hierarchy and precedence as you outlined above. The "overlapping" ones I was referring to are the ones that are named similar but may or may not be the same thing (eg. consumer.auto-commit-interval
and receiver option's commit-interval
which is not passed into the consumer).
I am not sure what you mean; I am suggesting using the Producer and Consumer (from KafkaProperties) to build the map for ReceiverOptions.create() and expose the other methods that take simple values as Boot properties.
I am suggesting the same thing. My comment around the "drop some of the properties" I was referring to the 1st class boot properties in the current proposal. I was not sure if you were suggesting there were currently too many of them and we should pick the high priority ones. It sounds like that is not what you are saying though. Sorry for the confusion.
For complex types, for example, users can grab the auto configured ReceiverOptions and add listeners to it.
Given that the default implementation of ReceiverOptions is immutable, each use can "modify" the auto wired bean without affecting other uses.
Are you suggesting we do not add the Sender/ReceiverOptionCustomizers or that we can use this technique until we do add the customizers?
On second thought, I suppose customizers would still make sense - e.g. for an app that creates multiple receivers but wants to add the same assignment listener to them all.
So I dug into each ReactorKafka ReceiverOptions
property. Here is what I found:
The only consumer properties that needs to be considered as duplicate/overlapping at this point are:
ReceiverOptions.commitInterval
KafkaProperties.autoCommitInterval
KafkaProperties.enableAutoCommit
They are not the same exact property but could be used to control the same underlying concept. More details below.
assignTopicPartitions
schedulerSupplier
assignListeners
revokeListeners
keyDeserializer
valueDeserializer
These are passed to the KafkaConsumer but are expected to already be configured and configure()
will not be called on them. So they are additive to the KP ones and not a problem here.
We are not surfacing the in the RKP anyways but I wanted to detail what I found. subscribeTopics
subscribePattern
pollTimeout
- the timeout for each {@link KafkaConsumer#poll(long)} operation. Closest setting is
spring.kafka.listener.pollTimeoutcloseTimeout
- timeout for graceful shutdown of {@link KafkaConsumer}. No equivalent in KPSo these were the ones that seemed likely overlapping
commitInterval
- commit interval for automatic commitscommitBatchSize
- commit batch size for automatic commits
ℹ️ If commit interval and commit batch size are configured, a commit operation is scheduled when either the interval or batch size is reached
ℹ️ The KP has the following similar properties but always sets
enableAutoCommit
to false and controls this in the Reactive API
enableAutoCommit
autoCommitInterval
- frequency to commit if above is set to true
While these are commit related, I believe these have no equivalent in KP
maxCommitAttempts
- max num consecutive non-fatal commit failures toleratedcommitRetryInterval
- how long to wait before retrymaxDeferredCommits
- max out-of-order commitsI think this is RKP only but need to dig a bit more to see what KP options are available for out-of-order-commits
atmostOnceCommitAheadSize
- commit ahead size per partition for at-most-once deliveryThe only consumer properties that needs to be considered as duplicate/overlapping at this point are:
ReceiverOptions.commitInterval KafkaProperties.autoCommitInterval and possibly KafkaProperties.enableAutoCommit
This "overlap" is by name only; unfortunate, but true.
The latter two control whether the kafka-clients automatically commits the offsets on a schedule.
The first one is the interval used by reactor kafka to automatically commit offsets.
Users should not enable both mechanisms; in spring-kafka, we disable enable.auto.commit
by default, because it takes care of the commits in a more deterministic fashion.
reactor-kafka also disables it - see ImmutableReceiverOptions
(ctor).
this.properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
We should consider removing those properties from KafkaProperties
- spring-kafka prefers them not to be set and reactor-kafka ignores them.
Users should not enable both mechanisms; in spring-kafka, we disable
enable.auto.commit
by default, because it takes care of the commits in a more deterministic fashion.reactor-kafka also disables it - see
ImmutableReceiverOptions
(ctor).this.properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Agreed @garyrussell - that is what I was alluding to here as well with these comments:
They are not the same exact property but could be used to control the same underlying concept. More details below.
ℹ️ The KP has the following similar properties but always sets enableAutoCommit to false and controls this in the Reactive API
If they are not desired in spring-kafka either, it seems like a great thing to remove (as you suggested).
So I dug into each ReactorKafka SenderOptions
properties. Here is what I found:
These are much more straight forward than the consumer/receiver properties.
closeTimeout
- timeout for graceful shutdown of sendermaxInFlight
- max num in-flight records fetched from the outbound record publisher while acks are pending stopOnError
- indicates if a send op should be terminated when error encounteredscheduler
- the scheduler used for publishing send resultskeySerializer
valueSerializer
These are passed to the KafkaProducer directly and are expected to already be configured and configure()
will not be called on them. So they are additive to the KP ones and not a problem here.
We are not surfacing the in the RKP anyways but I wanted to detail what I found. Based on the above analysis of the KafkaProperties and the Sender/ReceiverOptions of ReactorKafka, here is a suggested list of properties as well as a suggested "layout" of them. I think we can use this as a starting point of how to map them out.
spring:
kafka:
reactor:
# common properties (just like KafkaProperties)
bootstrap-servers: localhost:9093
ssl: ...
security: ...
properties: ...
client.id: fooDemoApp
# Producer class - pass to Kafka producer props (aka SenderOptions.properties)
producer:
key-serializer: org.apache.kafka.common.serialization.LongSerializer
value-serializer: com.example.FooSerializer
buffer-memory: 32MB # used in tandem w/ max-in-flight
...
# SenderOptions props
sender:
close-timeout: 5m
max-in-flight: 256
stop-on-error: false
# Consumer class - pass to Kafka consumer props (aka ReceiverOptions.properties)
consumer:
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
value-deserializer: com.example.FooDeserializer
...
# DELETE enable-auto-commit:
# DELETE auto-commit-interval:
# ReceiverOptions props
receiver:
subscribe-topics:
subscribe-pattern:
poll-timeout:
close-timeout:
commit-interval:
commit-batch-size:
max-commit-attempts:
commit-retry-interval:
max-deferred-commits:
atmost-once-commit-ahead-size:
This assumes that no properties will be used for ReactorKafka directly under spring.kafka.
but rather the properties for RK start under spring.kafka.reactor.
. Do we want to allow regular and reactor spring-kafka to share properties under spring.kafka.
? I could see possibly wanting to do that for the list of current common properties
A benefit of not sharing the KP producer/consumer props (the ones under spring.kafka.producer|consumer
) and mapping them instead separately under spring.kafka.reactor.producer|consumer
w/ RK is that both regular and reactor variants can co-exist and be configured independently and not worry about one change affecting the other one.
I would vote for keeping them separate.
@onobc @garyrussell Any concrete decision taken?
@onobc do you suggest implementing one CommonKafkaProperties
and just autoconfigure it with both aliases? Or would you prefer to actually copy-paste it?
Hi @almogtavor ,
Due to competing priorities we have not had a chance to come to an agreement on the approach. More than likely we will close this PR and will revisit in a subsequent proposal. I will be sure we tag you when that happens to keep you in the loop. Thank you for this initial contribution.
@onobc Don't you think we can meanwhile lean on the current PR since it delivers the issue, and just migrate paths to the ideal solution when the time comes? The current PR doesn't include changes to Spring-Kafka's autoconfigurations, so that'll only be an additional feature. It just feels like it's going to take lots of time for some pretty semantic changes. After all, merging an initial and basic version will still help to prettify the usage of Reactor Kafka with Spring Boot. Wdyt?
@almogtavor Unfortunately, I don't think we should do that. Until we know exactly what direction we want to take, merging something may set us off in the wrong direction. If we don't have a chance to course correct before 3.0 is released later this year, backwards compatibility will then limit our options. We want to avoid that by giving ourselves enough time to maximise our chances of getting it right first time round.
@wilkinsona makes sense.
will the methods annotated @ReactiveKafkaListener supports @RetryableTopic ?
There is no such thing as @ReactiveKafkaListener
.
Also @RetryableTopic
is a feature of spring-kafka, not reactor-kafka so, no.
Hi @garyrussell any plan when we'll be getting @Listener
support for Reactor-Kafka and other Annotations similar to Spring-Kafka ? If not ready yet then any future plan for the same ?
Thanks
There are no such plans; an evaluation resulted in there not being much value add that spring-kafka can provide over reactor-kafka.
There are no such plans; an evaluation resulted in there not being much value add that spring-kafka can provide over reactor-kafka.
So, If I want to implement reactor-kafka then I have only below referral doc as of now?
Yes.
There are no such plans; an evaluation resulted in there not being much value add that spring-kafka can provide over reactor-kafka.
Why do you think there is no value in that? Isn't the difference similar to the one between WebMVC and WebFlux (i.e. blocking vs non-blocking?)
That is not a useful comparison. If you feel spring-kafka can provide some useful value add over reactor-kafka, please start a discussion with your thoughts over there. https://github.com/spring-projects/spring-kafka/discussions
Let's not continue here.
I am open to ideas, but nobody has provided a compelling reason to do anything so far.
Replied here https://github.com/spring-projects/spring-kafka/discussions/2351#discussioncomment-3788458 Any help is appreciated.
@wilkinsona @garyrussell any reason for this to not get solved for so long? There are already 3 working PRs on the subject
@almogtavor Thanks for your patience. That there are 3 working PRs is largely why nothing's happened. We know we want to do something here but as of yet we haven't managed to identify exactly what we want to do. We have a number of competing priorities at the moment and adding support for Reactor Kafka hasn't yet made it to the top of the list. Unfortunately, that's unlikely to change until we're able to carve out a block of time to work on this and figure out exactly what needs to be done.
As said in https://github.com/reactor/reactor-kafka/issues/100#issuecomment-995178795 native support for Reactor Kafka would be nice to lots of users. Spring Boot currently supports WebFlux, but auto-configuration and more capabilities aren't supported yet with Reactor Kafka. As others said in different issues, I also think the connection between Reactor Kafka and Spring Boot should be more documented and supported since it's the most basic usage of Reactor Kafka. There is currently no official way of using Reactor Kafka with Spring Boot, which is pretty odd.