We're testing the use of Kafka Streams via Spring Cloud Stream function support with Avro input/output records, but setting nativeEncoding=false and nativeDecoding=false in order to use a custom MessageConverter where we do the Avro conversion.
The default serdes are StringSerde for keys and ByteArraySerde for values.
Everything is ok when we only use a KStream to KStream function, for example:
@Bean
public Function<KStream<String, DataRecordAvro>, KStream<String, DataRecordAvro>> wordsCount() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.getName().toString().toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)).grace(Duration.ofMillis(0)))
.count()
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new DataRecordAvro(key.key(), value)));
}
but when we try a little bit more complex example involving an input KTable like this:
(The DataRecordAvro class only have two members: CharSequence name; Long count;)
When received the first record this exception is thrown:
ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.String, and value: com.xxxx.kstreams.fixtures.avro.DataRecordAvro.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
The processor where the exception is thrown seems to be:
We're testing the use of Kafka Streams via Spring Cloud Stream function support with Avro input/output records, but setting
nativeEncoding=false
andnativeDecoding=false
in order to use a customMessageConverter
where we do the Avro conversion.The default serdes are
StringSerde
for keys andByteArraySerde
for values.Everything is ok when we only use a KStream to KStream function, for example:
but when we try a little bit more complex example involving an input KTable like this:
(The
DataRecordAvro
class only have two members:CharSequence name; Long count;
)When received the first record this exception is thrown:
The processor where the exception is thrown seems to be:
Here is another example that works ok (without input Avro records, leaving consumer
useNativeDecoding
as default true):Could be possible to add support for disabling nativeDecoding for inbound
KTable
andGlobalKTable
the same way as already working forKStream
?(Copy of this SO question)