spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Dynamic routing by KafkaBinder #1164

Open uladzislau-belykh opened 2 years ago

uladzislau-belykh commented 2 years ago

Hi,

I am trying to create a simple application that will consume records from one topic and route them to multiple topics. I have an expression that resolves the destination route. Also I use Ack and KafkaIntegrationHeaders.FLUSH for providing guarantees.

I've had the next tries:

  1. I've tried to use BinderAwareChannelResolver and to resolve each destination separately. The problem is that we can't make "flush" for all of them at the same time, because each of them has separate KafkaTemplate

  2. For the next try, I've found topicExpression in KafkaProducerMessageHandler. But for using it Kafka binder creates a useless topic(by name of destination) and the "autoCreateTopics" property doesn't work for it. Is it expected behavior?

Can you advise me on how to do my task in the best way?

sobychacko commented 2 years ago

BinderAwareChannelResolver is deprecated in favor of the following options.

  1. Use the spring.cloud.stream.sendto.destination property. More details here.
  2. For more granular control, a new StreamBridge API is provided. See more details here.

Try one of these options and see if that fits your use case.

uladzislau-belykh commented 2 years ago

First of all, sorry, I mixed parts from different modules.

As I know all of them use the same workflow inside (create a new MessageHandler, ProducerFactory, and KafkaTemplate for each new destination).

In my case, I need to have the opportunity to call "flush" for all my destinations at the same time. Probably, it can be achieved by using the same ProducerFactory for all my destinations(as it works when transactionIdPrefix is set). Can it be done without using transactions?