spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
974 stars 595 forks source link

TestBinder+Kafka not working as intended (tombstone records) #2971

Closed rnferreira closed 1 week ago

rnferreira commented 2 weeks ago

Greetings, everyone! I encountered an issue with the most recent release of Spring Cloud Stream + Kafka binder (release train 2023.0.2) when using KafkaNull (tombstone records) and TestBinder. With version 2023.0.1, all tests asserting the output of functions that returned KafkaNull passed, whereas, in this new version, they don't. The output is now transformed into byte[], and the payload now has the byte equivalent of "{}".

Here are the relevant snippets (extremely minimalist):

build.gradle:

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.3.1'
    id 'io.spring.dependency-management' version '1.1.5'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(21)
    }
}

repositories {
    mavenCentral()
}

ext {
    set('springCloudVersion', "2023.0.2")
}

dependencies {
    implementation 'org.springframework.cloud:spring-cloud-stream'
    implementation "org.springframework.cloud:spring-cloud-starter-stream-kafka"
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
//    testImplementation 'org.springframework.cloud:spring-cloud-starter-contract-stub-runner'

}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

tasks.named('test') {
    useJUnitPlatform()
}

Function configuration:

@SpringBootApplication
public class Gh2971Application {

    public static void main(String[] args) {
        SpringApplication.run(Gh2971Application.class, args);
    }

    @Bean
    public Function<Message<?>,Message<?>> myFunction() {
        return v -> MessageBuilder.withPayload(KafkaNull.INSTANCE).build();
    }
}

Test:

// 
@ImportAutoConfiguration(TestChannelBinderConfiguration.class)
@SpringBootTest(properties = {"spring.cloud.function.definition=myFunction"})
class Gh2971ApplicationTests {

    @Value("${spring.cloud.stream.bindings.myFunction-in-0.destination}")
    private String inputTopic;

    @Value("${spring.cloud.stream.bindings.myFunction-out-0.destination}")
    private String outputTopic;

    @Autowired
    private InputDestination input;
    @Autowired
    private OutputDestination output;

    @Test
    void test() {
        var message =
                MessageBuilder.withPayload(KafkaNull.INSTANCE)
                        .build();
        this.input.send(message, this.inputTopic);

        var received = this.output.receive(0L, this.outputTopic);
        assertThat(received)
                .isNotNull()
                .extracting(Message::getPayload)
                .isEqualTo(KafkaNull.INSTANCE);
    }
}

Application properties:

spring.application.name=gh-2971
spring.cloud.function.definition=myFunction
spring.cloud.stream.bindings.myFunction-in-0.destination=in
spring.cloud.stream.bindings.myFunction-out-0.destination=out
spring.cloud.stream.kafka.binder.consumer-properties.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.binder.producer-properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.spring.json.use.type.headers=false

Expected Behavior:

The test passes.

Actual Behavior:

The test fails with the following message:

org.opentest4j.AssertionFailedError: 
expected: org.springframework.kafka.support.KafkaNull@c317472
 but was: [123, 125]
Expected :org.springframework.kafka.support.KafkaNull@c317472
Actual   :[123, 125]

Additional context: Spring Cloud Stream version: 4.1.2 Kafka Binder version: 4.1.2 Test Binder: 4.1.2

olegz commented 2 weeks ago

I can'r see what could have changed, but i just plugged in your sample and i see the same 2 bytes with 4.1.1 as I do with 4.1.2

GenericMessage [payload=byte[2], 
rnferreira commented 2 weeks ago

Hi Oleg! Thank you for getting back to me so quickly. Your comment pointed me in the right direction. If I include the dependency org.springframework.cloud:spring-cloud-starter-contract-stub-runner, the test passes when I use 2023.0.1 and fails on `2023.0.2'.

Was this always the behaviour? Should we now assume that the tombstone is transformed into those two bytes?

Please advise.

PS: I updated my original comment with more context.

olegz commented 2 weeks ago

I have to assume that contract is doing something, but what you can do is compile a simple project with a test and attach it as zip or push it to github so I can take a look

rnferreira commented 2 weeks ago

Here you go @olegz: https://github.com/rnferreira/gh-2971

rnferreira commented 2 weeks ago

Quick update: I dug deeper and found that the method org.springframework.cloud.function.context.config.JsonMessageConverter#convertToInternal is converting the KafkaNull payload into an empty JSON ({}).

Please let me know whether you think this is worth "fixing" or not.