Closed dnitzan closed 10 months ago
We already added this for @RabbitListener
last year.
However, managing the offsets for Kafka could make this tricky to implement - we wouldn't want to commit the offset until the async operation is complete.
If the next async operation completes first, and its offset committed, and then the first one fails, it's too late because its offset has already been implicitly committed.
Since Kafka message are not discretely acknowledged, going async with Kafka record consumption is really not recommended because of the risk of message loss and/or the complexity of managing offsets.
For this reason we didn't proceed here, but we can put it on the backlog and give it some more thought.
True, we faced the same issues building a custom request/reply mechanism on top of Spring Kafka. We eventually elected to just commit periodically (with ack-mode=time
) regardless of downstream asynchronous processing and it is perfectly fine for us. If such a relaxed guarantee is sufficient, then it greatly simplifies the implementation.
OK; with that caveat; we'll try to get it into a future release.
I think it is reasonable difficulty. But still may could be done even without a full prefect asynchronized flow.
for example.
when ackMode=BATCH -> That container could async all the listeners within same BATCH but refer the ack until all messages in a poll() responsed
when ackMode=TIME -> same as BATCH, but synchronize all once by time, not poll()
when ackMode=RECORD -> No real concurrently processing. Just one message a time, but allow developers to use async API.
The real difficulty is timeout/retry management which make the state complex.
For another example, if the first message jammed, the 2nd failed, but 3rd successes. should it ack or retry from where ? but this is a problem business logic to worry about. not the middleware That spring-kafka should provide some configuration.
I have added support for out-of-order manual commits, which will now make implementing this possible.
Also support Kotlin Coroutines, discussed here https://github.com/spring-projects/spring-kafka/discussions/2653
Hi, @sobychacko @artembilan may i pick it up.
@Wzy19930507 Certainly!
I'm sorry. What is the plan for this issue? Looks like Gary has added already enough functionality. What else you see has to be fixed?
Thanks
What else you see has to be fixed?
Add support for Mono
and Future
listener method return types in the MMLA, similar to what we have in RabbitMQ.
This is now possible when out of order commits are enabled.
Cool! Then it sounds like a copy/paste from the: https://github.com/spring-projects/spring-amqp/blob/main/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/AbstractAdaptableMessageListener.java#L375
@KafkaListener
@KafkaHandler
@SendTo
support async return, duplicate with #2325
Mono
and CompletableFuture
copy/paste from https://github.com/spring-projects/spring-amqp/issues/2374,KafkaListenerErrorHandler
, copy/paste from https://github.com/spring-projects/spring-amqp/issues/2461@KafkaHandler
@SendTo
work with KafkaListenerErrorHandler
The above functions may be submitted multiple times, a cool asynchronous feature is implemented in amqp, i will try to reproduce them in spring kafka.
Request/reply semantics requires that the server side listener method (annotated with
@SendTo
) return a response synchronously. There are use cases where server side request processing is asynchronous such that the listener method can't return a result immediately and must defer the response, releasing the consumer thread to handle other incoming requests. Once the result is available, an application thread would need to send it back to the client. This would be a feature similar to Servlet 3.0 Asynchronous Processing.