reactor / reactor-kafka

Reactive Kafka Driver with Reactor
http://projectreactor.io
613 stars 227 forks source link

Concurrency set to > 1 still have one reactor thread only consuming from multiple partitions #332

Open patpatpat123 opened 1 year ago

patpatpat123 commented 1 year ago

Hello Reactor Kafka team,

Apologies to trouble you with this issue.

I am having a topic with multiple partitions (4 partitions) with one single consumer running in a multi core machine.

Using Spring-Kafka (I understand this is another project, but I am just utilizing this a comparison example) setting this: factory.setConcurrency(4); I see one thread per partition behaviour.

Using Spring Cloud Stream (I understand I am mentioning another project) setting: spring.cloud.stream.bindings.input.consumer.concurrency=4 I also see one thread per partition behavior.

I tried setting either one in Reactor Kafka, and unfortunately, I am still seeing one thread only consuming all partitions, even with multicore machines.

I would have expected one reactor core, one reactor thread per partition, hence raising this issue.

Thank you in advanced for your help.

garyrussell commented 1 year ago

You either have to create multiple KafkaReceivers or add a publishOn(someScheduler) element to the pipeline. If you do the latter you may need to enable out of order offset commits, depending on how you are committing offsets.

garyrussell commented 1 year ago

In future, please ask questions on Stack Overflow - GitHub issues are for reporting bugs or asking for new features.

patpatpat123 commented 1 year ago

Thank you @garyrussell.

Your explanations are very clear. Would you allow me to turn this into an enhancement request, feature request, when one uses reactor kafka to consume from multiple partition, to have some kind of out of the box mechanism which would handle multiple partitions with multiple reactor core / reactor thread please?

And this, without, having to manually create multiple instances of KafkaReceiver (have user side to scale up instances), or to play with publishOn and a dedicated Scheduler?

I know it is not good to just throw other projects name, but Spring Kafka, Spring Cloud Stream have easy and friendly configurations in order to make this happen (without any code change)

Something a bit more far away, Reactor Netty would propose out of the box feature to scale one reactor thread / reactor core per incoming request.

Mirroring this behavior on the Spring Reactor projects stack, it would be cool to have a behavior where messages coming from multiple partitions, Reactor Kafka would take care of it.

If you believe this enhancement request does not make sense, please feel free to close this. This is just opening an idea for discussion.

Thank you again.

garyrussell commented 1 year ago

Also, note that there is a reactive kafka binder for spring-cloud-stream (based on this project); it honors the concurrency property by creating multiple receivers and merging their fluxes.

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_concurrency

patpatpat123 commented 1 year ago

Agreed. I was using the Spring Cloud Stream Reactive Kafka binder mentioned on my first post.

This would requires one to pull in the whole Spring Cloud Stream ecosystem in order to benefit from this feature.

For a regular Reactive Kafka application, this project is simple, small, fast, efficient enough. Would it be possible to have this feature as part of Reactor Kafka, without having to go with the entire (to be honest, much heavier) Spring Cloud Stream please?

Totally ok with an argument (just use Spring Cloud Stream, closing), but I thought having this feature directly in the lighter Reactor Kafka would be cool

garyrussell commented 1 year ago

It would be pretty easy to write a simple wrapper around multiple receivers.

Contributions are always welcome.