Closed ghost closed 2 years ago
When you have this code in the snippet you provided - JsonDeserializer<OutputObject> jsonDeserializer = new JsonDeserializer<>(OutputObject.class);
, you are using OutputObject
with a deserializer, it should be serializer. However, I don't think you need to do any of those if you are using the following type of configuration.
Can you try this in your configuration?
spring.cloud:
stream:
function.definition: test
bindings:
test-in-0:
destination: input-topic
consumer:
useNativeDecoding: true
test-out-0:
destination: output-topic
producer:
useNativeEncoding: true
kafka:
bindings:
test-in-0:
consumer:
configuration:
value.deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
test-out-0:
producer:
configuration:
value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
We are explicitly asking the binder to stay away from any data serialization and let Kafka do it natively by enabling native decoding and encoding. When we do that, we need to provide the proper de/serializer configuration.
Note that on the inbound side, it is Deserializer
and on the outbound, it is Serializer
.
Hi @sobychacko!
After putting your configuration, and adding to it my packages to te trusted sources list, it works like a charm!
Thank you very much for your time and support!
Hi guys!
I hope you can help me in solving a problem I'm finding when working with Spring Cloud Stream and its Apache Kafka Binder.
I have a function like this:
When I perform my integration tests and I try to consume my OutputObject generated message, I get the following error (it seems that the JsonDeserializer believes that it has to parse an InputObject instance when it has to parse an OutputObject one):
After struggling with the cause of this exception, I found that the problem seems to be in type information header, because if I ignore this information on my JsonDeserializer it works like a charm:
Just to give you all the information, here is my application.yml (which I hope to be fine):
Is there any configuration missing by my side, or something that I'm not doing right? Or is there an issue somewhere in the binder that is causing this problem?
Thank you very much for your time and support!