spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Setting own bootstrap.server/Kafka cluster for the DLT of a binding #1160

Closed orvyl closed 2 years ago

orvyl commented 2 years ago

We have this use case that we need to send failed to consume messages to a DLT so the enable-dlq would have been perfect, however, the DLT belongs in a separate cluster. I did try the following:

spring.cloud.stream.binders.some-kafka-binder.type=kafka
spring.cloud.stream.binders.some-kafka-binder.environment.spring.cloud.stream.kafka.binder.brokers=cluster0:29092

spring.cloud.stream.bindings.processor-in-0.binder=some-kafka-binder
spring.cloud.stream.bindings.processor-in-0.group=${spring.application.name}
spring.cloud.stream.bindings.processor-in-0.destination=outbound-topic
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.configuration.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.configuration.schema.registry.url=${schema.registry.url}
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.configuration.specific.avro.reader=true
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.enable-dlq=true
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.dlq-name=outbound-topic.DLT
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.dlq-producer-properties.configuration.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.dlq-producer-properties.configuration.schema.registry.url=${schema.registry.url}
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.dlq-producer-properties.configuration.bootstrap.servers=cluster2:9092

...but I got this error:

Caused by: java.lang.IllegalStateException: bootstrap.servers cannot be overridden at the binding level; use multiple binders instead
        at org.springframework.util.Assert.state(Assert.java:76)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.getProducerFactory(KafkaMessageChannelBinder.java:560)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.getErrorMessageHandler(KafkaMessageChannelBinder.java:1148)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.getErrorMessageHandler(KafkaMessageChannelBinder.java:158)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:695)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:639)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:734)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:158)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:408)
        ... 27 common frames omitted

I was hoping I can set directly the bootstrap server of the DLT via setting the configuration.bootstrap.servers.

Here's my original question: https://stackoverflow.com/questions/69611617/how-to-set-a-seperate-bootstrap-server-to-a-dlt-of-a-binding

garyrussell commented 2 years ago

You can disable DLT processing in the binder and use a ListenerContainerCustomizer bean to configure the listener container with a SeekToCurrentErrorHandler and a DeadLetterPublishingRecoverer (configured with a KafkaTemplate that points to the other cluster. This moves retry and recovery to the listener container.

See https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current and https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> listenerContainerCustomizer() {
    return (container, topic, group) -> {
        container.setErrorHandler(...);
    };
}
sobychacko commented 2 years ago

@orvyl See if Gary's suggestion above is sufficient for your use case.

orvyl commented 2 years ago

WIll try this now. Creating a bean ListenerContainerCustomizer I assume this will be used across all consumer bindings or it can be set with specific binding(in this case the anotherProcessor)?

Here are more details about our case. Currently, we have multiple bindings, and one of the bindings uses the enable-dlq which is working well. Here's my current config:

spring.application.name=my-application-with-multiple-kafka-binders

spring.cloud.stream.binders.first-kafka.type=kafka
spring.cloud.stream.binders.first-kafka.environment.spring.cloud.stream.kafka.binder.brokers=localhost:9092

spring.cloud.stream.binders.second-kafka.type=kafka
spring.cloud.stream.binders.second-kafka.environment.spring.cloud.stream.kafka.binder.brokers=localhost:29092

spring.cloud.stream.bindings.processor-in-0.binder=first-kafka
spring.cloud.stream.bindings.processor-in-0.group=${spring.application.name}
spring.cloud.stream.bindings.processor-in-0.destination=processor-inbound
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.configuration.value.deserializer=com.paymaya.ditto.kafka.serialization.TransactionRequestSerDe
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.enable-dlq=true
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.dlq-name=processor-inbound.DLT
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.dlq-producer-properties.configuration.value.serializer=com.company..kafka.serialization.ProcessSerDe

spring.cloud.stream.function.bindings.processor-out-0=processor-response

spring.cloud.stream.bindings.anotherProcessor-in-0.binder=second-kafka
spring.cloud.stream.bindings.anotherProcessor-in-0.group=${spring.application.name}
spring.cloud.stream.bindings.anotherProcessor-in-0.destination=anotherProcessor-inbound
spring.cloud.stream.kafka.bindings.anotherProcessor-in-0.consumer.configuration.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.kafka.bindings.anotherProcessor-in-0.consumer.configuration.schema.registry.url=${schema.registry.url}
spring.cloud.stream.kafka.bindings.anotherProcessor-in-0.consumer.configuration.specific.avro.reader=true
#spring.cloud.stream.kafka.bindings.anotherProcessor-in-0.consumer.enable-dlq=true
#spring.cloud.stream.kafka.bindings.anotherProcessor-in-0.consumer.dlq-name=anotherProcess-inbound.DLT
#spring.cloud.stream.kafka.bindings.anotherProcessor-in-0.consumer.dlq-producer-properties.configuration.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
#spring.cloud.stream.kafka.bindings.anotherProcessor-in-0.consumer.dlq-producer-properties.configuration.schema.registry.url=${schema.registry.url}

spring.cloud.stream.function.bindings.anotherProcessor-out-0=processor-response

spring.cloud.stream.bindings.processor-response.producer.use-native-encoding=true
spring.cloud.stream.bindings.processor-response.binder=first-kafka
spring.cloud.stream.bindings.processor-response.destination=processor-outbound
spring.cloud.stream.kafka.bindings.processor-response.producer.useTopicHeader=true
spring.cloud.stream.kafka.bindings.processor-response.producer.configuration.value.serializer=com.company.kafka.serialization.ProcessResponseSerDe

spring.cloud.function.definition=processor;anotherProcessor
spring.cloud.stream.default-binder=first-kafka

The comment properties are the dlq related which we hope we can set its own bootstrap-server.

garyrussell commented 2 years ago

Creating a bean ListenerContainerCustomizer I assume this will be used across all consumer bindings or it can be set with specific binding(in this case the anotherProcessor)?

The customizer provides the destination (topic) and group (as well as the container) so you can provide a different customization for each binding.

orvyl commented 2 years ago

...you can provide a different customization for each binding.

Can you provide an example? In my perspective, all of my binding setups are in configuration properties. Our team is still new with Spring Cloud and aggressively creating POC for a major transition. We shifted to it because of its configuration properties and lesser boilerplate code.

Here's my bean so far:

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<String, PaymentResponseMessage>> listenerContainerCustomizer(P4Properties p4Properties) {
        return (container, destinationName, group) -> {
            KafkaTemplate<String, PaymentResponseMessage> template = new KafkaTemplate<>(
                    new DefaultKafkaProducerFactory<>(p4Properties.getDltConfiguration()));
            SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template));
            errorHandler.addNotRetryableExceptions(RuntimeException.class);
            container.setErrorHandler(errorHandler);
        };
    }

I created a method to configure it but I can't think of where can I get the container bean.

@Autowired
    public void setupListenerContainerCustomizer(AbstractMessageListenerContainer<String, PaymentResponseMessage> listenerContainer, ListenerContainerCustomizer<AbstractMessageListenerContainer<String, PaymentResponseMessage>> listenerContainerCustomizer) {
        listenerContainerCustomizer.configure(listenerContainer, "topic-name", "group-name");
    }
garyrussell commented 2 years ago

You don't need anything more than the bean definition; the binder will call it for each consumer binding it creates.

If you are using multiple binders (or even a single named one), declaring the bean won't work; you have to customize each binder instead by setting its listener container customizer.

See https://docs.spring.io/spring-cloud-stream/docs/3.1.4/reference/html/spring-cloud-stream.html#binder-customizer

sobychacko commented 2 years ago

Closing the issue. Please follow the suggestions from @garyrussell.