spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Support allowNonTransactional config for ProducerConfigurationMessageHandler #990

Closed mhyeon-lee closed 3 years ago

mhyeon-lee commented 3 years ago

Producers for batch processing may not need transactions. Could you provide allowNonTransactional config to ProducerConfigurationMessageHandler and KafkaTemplate?

garyrussell commented 3 years ago

This is a reasonable request.

In the meantime, there are two workarounds:

  1. Use the multi-binder support to configure two binders; one transactional and one not.

  2. Use a customizer with a bit of reflection:

@SpringBootApplication
public class Kbgh990Application {

    public static void main(String[] args) {
        SpringApplication.run(Kbgh990Application.class, args);
    }

    @Autowired
    StreamBridge bridge;

    @Bean
    ApplicationRunner runner() {
        return args -> this.bridge.send("notTx", "foo");
    }

    @Bean
    ProducerMessageHandlerCustomizer<?> customizer() {
        return (handler, destinationName) -> {
            if (destinationName.equals("non.tx.topic")) {
                ((KafkaProducerMessageHandler<?, ?>) handler).getKafkaTemplate().setAllowNonTransactional(true);
                new DirectFieldAccessor(handler).setPropertyValue("allowNonTransactional", true);
            }
        };
    }

}
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.notTx.destination=non.tx.topic
mhyeon-lee commented 3 years ago

@garyrussell Thank you for your consideration I will apply the workaround you gave as a guide, and will switch when options are added later.