Closed aliaksandr-pleski closed 2 years ago
@aliaksandr-pleski Thanks for this report. After looking at this issue a bit further, I think we can resolve it in a couple of ways.
The reason you are getting this error is due to the fact that __typeid__
header is not set properly by your function()
. This is supposed to be set by the JsonSerializer
in Spring Kafka. Your streamFunction
function is a Kafka Streams function and therefore by default it uses the JsonSerializer
and adds the __typdid__
header as ClassB
on the outbound. This is then passed down to function
's input, but function
is based on regular Kafka binder and it uses message conversion behind the scenes (on both input and output) and does not use JsonSerializer
(or native Kafka serialization). When function
publishes the data (as type ClassC
), it doesn't properly update the type id header (since no JsonSerializer
is involved). It simply passes down the same __typeid__
value it received on the input (which is ClassB
) and when it reaches the next function (consumer1
), you run into that ClassCastException
you are getting.
You can get around to this problem in two ways. The first one is pretty trivial while the second one needs some configuration changes.
you can add the __typeid__
header programmatically when sending the messages from the function
method. Below is an example.
@bean
public Function<ClassB, Message<ClassC>> function() {
return classB -> {
final Map<String, Object> headers = new HashMap<>();
headers.put(KafkaHeaders.MESSAGE_KEY, classB.b);
headers.put(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, ClassC.class.getCanonicalName());
return MessageBuilder.createMessage(
new ClassC(Integer.parseInt(classB.b)),
new MessageHeaders(headers)
);
};
}
Note the explicit addition of DEFAULT_CLASSID_FIELD_NAME
header.
The other option is to force the function
method to use native encoding on the outbound so that it uses Kafka's serialization using the JsonSerializer
from Spring Kafka. This way, the proper typeid
header is added when the record is published. Here are the relevant configuration changes needed.
spring:
cloud:
function:
definition: supplier;streamFunction;function;consumer1
stream:
kafka:
bindings:
function-out-0:
producer:
configuration:
value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
....
bindings:
...
function-in-0:
binder: kafka
destination: scs-cc-streamFunction
function-out-0:
binder: kafka
destination: scs-cc-consumer
producer:
useNativeEncoding: true
....
....
I verified that both solutions work. Please let us know if they work on your end with these solutions.
@sobychacko thanks for the quick response!
Both options are working for me. I personally find option 2 more preferable.
The fact that the regular Kafka function
doesn't properly update __typeid__
header and just proxies existing one - is that the expected behavior?
Since the regular binder uses message conversion by default from Spring, it doesn't go through the native Kafka serializer, and that code for specifically adding that header is part of the JsonSerializer
class. Therefore, this is the expected behavior.
Closing the issue now, feel free to re-open if you find something else related to this.
Got it, thanks!
Hi everybody,
I have a
spring-cloud-stream
application. Spring Boot is 2.6.2 and Spring Cloud is 2021.0.0. Working with Kafka and Kafka Streams.The app has the following data pipeline:
There are 3 consumers that expect
ClassC
objects:Consumers 2 and 3 work fine:
Where Consumer 1 gets following
ClassCastException
:Seems like this behavior isn't correct. Probably it could be related to the fact that there is one regular Kafka function between 2 Kafka Streams functions.
You can find sample application here: https://github.com/aliaksandr-pleski/spring-cloud-stream-kafka-class-cast-exception