spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
1k stars 610 forks source link

Inconvertible Messages are silently discarded when using the Rabbit Binder with Consumer Side Batching #2986

Closed hgarus closed 1 month ago

hgarus commented 2 months ago

Describe the issue Using the rabbitmq-Binder, with a consumer configured for consumer side batching as drescribed here, when a message is received, which is not convertible to the target type, the message is silently dropped.

To Reproduce

  1. Set up a local rabbitmq
  2. Set up a simple Spring-Boot App using the rabbit-binder
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.3.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <java.version>17</java.version>
        <spring-cloud.version>2023.0.3</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    </project>
  3. Configure Rabbit Consumer for Batching
    
    spring.cloud.stream.rabbit.default.consumer.auto-bind-dlq=true # not necessary for reproduction
    spring.cloud.stream.bindings.listener-in-0.destination=some-destination
    spring.cloud.stream.bindings.listener-in-0.group=l
    spring.cloud.stream.bindings.listener-in-0.consumer.batch-mode=true
    spring.cloud.stream.rabbit.bindings.listener-in-0.consumer.batch-size=10
    spring.cloud.stream.rabbit.bindings.listener-in-0.consumer.enable-batching=true
    spring.cloud.stream.rabbit.bindings.listener-in-0.consumer.receive-timeout=100

Useful to see what is actually happening (makes this less silent)

logging.level.org.springframework.cloud.function.context.config.JsonMessageConverter=debug


5. Set up a consumer consuming some Dto, which is sent as JSON:
```java
@SpringBootApplication
public class Demo1Application {

    public static void main(String[] args) {
        SpringApplication.run(Demo1Application.class, args);
    }

    public record MyDto(int i) {}

    @Bean
    public Consumer<List<MyDto>> listener() {
        return System.out::println;
    }
}
  1. Send a message which cannot be converted to MyDto

    
    @Bean
    public ApplicationRunner applicationRunner(AmqpTemplate amqpTemplate) {
        return args -> {
            // send two test messages to the consumer, first one deserializes fine, second does not
            amqpTemplate.send("some-destination", "", MessageBuilder.withBody("{\"i\": 42}".getBytes(StandardCharsets.UTF_8)).setContentType("application/json").build());
            amqpTemplate.send("some-destination", "", MessageBuilder.withBody("{\"i\": \"A STRING\"}".getBytes(StandardCharsets.UTF_8)).setContentType("application/json").build());
        };
    }
  2. Observe only one message is printed to console, the DLQ is empty.
    [MyDto[i=42]]

Version of the framework spring-boot 3.3.2 and spring cloud 2023.0.3 Expected behavior I would expect both messages to be DLQed instead. This would match the behaviour without batching, where an inconvertible Message is DLQed.

Additional context

The Example is a bit artificial, a more realistic example would be an unexpected Enum-Value.

This might also apply to the kafka-binder, I haven't tested that.

olegz commented 2 months ago

It appears to be duplicate to this - https://github.com/spring-cloud/spring-cloud-function/issues/1174

olegz commented 1 month ago

This was resolved via a series of commits ins spring-cloud-stream

5d881b2ad 29a355832 058fc660a fac9eb1eb 14c10462f

As well as the commit in spring-cloud-function

https://github.com/spring-cloud/spring-cloud-function/commit/6dee668e3ec20710460b0384a8b6afe53d078633

It was alos back-ported to 4.1.x and will be available with the next release