confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
128 stars 1.04k forks source link

Decode Keys on tumbling windows #8956

Open fenngineering opened 2 years ago

fenngineering commented 2 years ago

Please could you provide information on how to decode a key from a tumbling window?

I am using the confluent kafka modules in golang to create a consumer to the underlying topic of the tumbling window and I am unable to decode the key so that it can be read by the application.

The key is a string, but is being decoded as the key plus some extra bytes:- "thekey"{randombytes}

After doing some research this seems to be base_key + window start + window end, is there anyway to extract the base key that so its readable when using a kafka consumer? or at least decode it into a JSON struct?

And input will be much appreciated. Thank

vcrfxia commented 2 years ago

Hi @fenngineering , the way to deserialize the key is to deserialize using a time windowed deserializer. ksqlDB uses the same (default) time windowed serdes as Kafka Streams. Here are the relevant Java docs: https://kafka.apache.org/31/javadoc/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.html https://kafka.apache.org/31/javadoc/org/apache/kafka/streams/kstream/WindowedSerdes.TimeWindowedSerde.html

What golang client are you using? Hopefully there is an analog, as these are supported Kafka serdes.

fenngineering commented 2 years ago

Hi @vcrfxia Thanks for your information. I am using the confluent-kafka-go library. And using the kafka.Consumer.ReadMessage() method. Do you know how I would configure this to use the serdes you described in your reply?

Kind regards Andy

mhowlett commented 2 years ago

The go client unfortunately doesn't have an implementation for this (or even a standardized serde API yet, though that doesn't matter much for a one off). You'd need to translate the Java functionality, which is here: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java

fenngineering commented 1 year ago

Hi, is this being actively look at, we are lots of marshalling errors in Go trying to create a json document from this key - thanks