spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Inbound KTable (and possibly GlobalKTable) ignoring nativeDecoding=false #1191

Closed JoseLora closed 2 years ago

JoseLora commented 2 years ago

We're testing the use of Kafka Streams via Spring Cloud Stream function support with Avro input/output records, but setting nativeEncoding=false and nativeDecoding=false in order to use a custom MessageConverter where we do the Avro conversion.

The default serdes are StringSerde for keys and ByteArraySerde for values.

Everything is ok when we only use a KStream to KStream function, for example:

    @Bean
    public Function<KStream<String, DataRecordAvro>, KStream<String, DataRecordAvro>> wordsCount() {
      return input -> input
          .flatMapValues(value -> Arrays.asList(value.getName().toString().toLowerCase().split("\\W+")))
          .map((key, value) -> new KeyValue<>(value, value))
          .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
          .windowedBy(TimeWindows.of(Duration.ofSeconds(5)).grace(Duration.ofMillis(0)))
          .count()
          .toStream()
          .map((key, value) -> new KeyValue<>(key.key(), new DataRecordAvro(key.key(), value)));
    }

but when we try a little bit more complex example involving an input KTable like this:

    @Bean
    public BiFunction<KStream<String, DataRecordAvro>, KTable<String, DataRecordAvro>, KStream<String, DataRecordAvro>> userClicksRegionKTableAvro() {
      return (userClicksStream, usersRegionKTable) -> userClicksStream
          .leftJoin(usersRegionKTable,
              (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region.getName().toString(), clicks.getCount()))
          .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
          .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
          .reduce(Long::sum)
          .mapValues((key, value) -> new DataRecordAvro(key, value))
          .toStream();
    }

(The DataRecordAvro class only have two members: CharSequence name; Long count;)

When received the first record this exception is thrown:

ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.String, and value: com.xxxx.kstreams.fixtures.avro.DataRecordAvro.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.

The processor where the exception is thrown seems to be:

KSTREAM-LEFTJOIN-0000000011:
    states:     [user-regions-avro-STATE-STORE-0000000008]

Here is another example that works ok (without input Avro records, leaving consumer useNativeDecoding as default true):

    @Bean
    public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, DataRecordAvro>> userClicksRegionKTable() {
      return (userClicksStream, usersRegionKTable) -> userClicksStream
          .leftJoin(usersRegionKTable,
              (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks))
          .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
          .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
          .reduce(Long::sum)
          .mapValues((key, value) -> new DataRecordAvro(key, value))
          .toStream();
    }

Could be possible to add support for disabling nativeDecoding for inbound KTable and GlobalKTable the same way as already working for KStream?

(Copy of this SO question)

sobychacko commented 2 years ago

Issue moved to spring-cloud/spring-cloud-stream #2320 via ZenHub