spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Messages aren't sent to DLQ using Spring Cloud Stream in batch mode #1114

Closed serjteplov closed 3 years ago

serjteplov commented 3 years ago

Trying to configure Spring to send bad messages to dead letter queue while using batch mode. But as a result in dlq topic there is nothing.

I use Spring Boot 2.5.3 and Spring Cloud 2020.0.3. This automatically resolves version of spring-cloud-stream-binder-kafka-parent as 3.1.3.

Here is application.properties:

spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true  
spring.cloud.stream.bindings.input-in-0.content-type=text/plain  
spring.cloud.stream.bindings.input-in-0.destination=topic4  
spring.cloud.stream.bindings.input-in-0.group=batch4  
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=5  
spring.kafka.bootstrap-servers=broker:9092
server.port=8009

Here is application and batch listener in functional programming model:

@SpringBootApplication
public class DemoKafkaBatchErrorsApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoKafkaBatchErrorsApplication.class, args);
    }

    @Bean
    public Consumer<List<byte[]>> input() {
        return messages -> {

            for (int i = 0; i < messages.size(); i++) {

                throw new BatchListenerFailedException("Demo: failed to process = ", i);
            }
        };
    }

    @Bean
    public RecoveringBatchErrorHandler batchErrorHandler(KafkaTemplate<String, byte[]> template) {
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
        return new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(2L, 10));
    }
}

Sending to topic:

./kafka-console-producer.sh --broker-list broker:9092 --topic topic4 < input.json

Reading from DLQ:

./kafka-console-consumer.sh --bootstrap-server broker:9092 --topic error.topic4.batch4 --from-beginning --max-messages 100

So after running this app I got nothing in dlq topic, but in console lots of messages like:

Caused by: org.springframework.kafka.listener.BatchListenerFailedException: Demo: failed to process =  @-0
    at com.example.demokafkabatcherrors.DemoKafkaBatchErrorsApplication.lambda$input$0(DemoKafkaBatchErrorsApplication.java:29) ~[classes/:na]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:854) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:643) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:489) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:77) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:727) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:560) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.2.jar:5.5.2]
    ... 27 common frames omitted

Cannot figure out where is the problem, would be nice if you give any help

garyrussell commented 3 years ago

Don't use GitHub issues to ask questions; they are for reporting bugs or asking for new features.

I have answered your question on Stack Overflow: https://stackoverflow.com/questions/68634379/failed-sending-to-dlq-using-spring-cloud-stream-in-batch-mode/68714782#68714782