spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Wrong type header information when using Function #1187

Closed ghost closed 2 years ago

ghost commented 2 years ago

Hi guys!

I hope you can help me in solving a problem I'm finding when working with Spring Cloud Stream and its Apache Kafka Binder.

I have a function like this:

@Configuration
class StreamConfiguration {

    @Bean
    Function<InputObject, OutputObject> test() {
        return inputObject -> {
            // Any stuff goes here
            return outputObject;
        } 
    }

}

When I perform my integration tests and I try to consume my OutputObject generated message, I get the following error (it seems that the JsonDeserializer believes that it has to parse an InputObject instance when it has to parse an OutputObject one):

java.lang.ClassCastException: class InputObject cannot be cast to class OutputObject (InputObject and OutputObject are in unnamed module of loader 'app')

After struggling with the cause of this exception, I found that the problem seems to be in type information header, because if I ignore this information on my JsonDeserializer it works like a charm:

Just to give you all the information, here is my application.yml (which I hope to be fine):

spring:
  cloud:
    stream:
      function:
        definition: test
      bindings:
        test-in-0:
          destination: input-topic
        test-out-0:
          destination: output-topic
      kafka:
        binder:
          brokers: localhost:9092

Is there any configuration missing by my side, or something that I'm not doing right? Or is there an issue somewhere in the binder that is causing this problem?

Thank you very much for your time and support!

sobychacko commented 2 years ago

When you have this code in the snippet you provided - JsonDeserializer<OutputObject> jsonDeserializer = new JsonDeserializer<>(OutputObject.class);, you are using OutputObject with a deserializer, it should be serializer. However, I don't think you need to do any of those if you are using the following type of configuration.

Can you try this in your configuration?

spring.cloud:
  stream:
    function.definition: test
    bindings:
      test-in-0:
        destination: input-topic
        consumer:
          useNativeDecoding: true
      test-out-0:
        destination: output-topic
        producer:
          useNativeEncoding: true
    kafka:
      bindings:
        test-in-0:
          consumer: 
            configuration:
              value.deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
        test-out-0:
          producer:
            configuration:
              value.serializer: org.springframework.kafka.support.serializer.JsonSerializer

We are explicitly asking the binder to stay away from any data serialization and let Kafka do it natively by enabling native decoding and encoding. When we do that, we need to provide the proper de/serializer configuration.

Note that on the inbound side, it is Deserializer and on the outbound, it is Serializer.

ghost commented 2 years ago

Hi @sobychacko!

After putting your configuration, and adding to it my packages to te trusted sources list, it works like a charm!

Thank you very much for your time and support!