confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
118 stars 1.04k forks source link

Allow to preserve partitions for non-keyed STREAMS #7984

Open mjsax opened 3 years ago

mjsax commented 3 years ago

Is your feature request related to a problem? Please describe. ksqlDB allows to create STREAMS without a key. For this case, if the stream is queried and a result STREAM is produced (eg, a simple filter query), result records are written into a "random" partition (based on the default partitioner). It might be a nice feature to allow people to preserve the original partitioning. I way a few questions about this in different community channels, so just want to create a ticket to track it.

Describe the solution you'd like Not 100% sure what the best solution would be. We expose partition information via KLIP-50, however, PARTITION BY partition() would copy the partition-id into the key of the message... Maybe we would need to have some specific WITH(preserve_partitions='true') clause instead? If the output stream is keyed, preserve_partitions would be ignored.

vcrfxia commented 3 years ago

We could do this in the ksql layer by using a custom partitioner. Not an option to do this in Streams itself.

mjsax commented 3 years ago

I just discovered (by chance) https://issues.apache.org/jira/browse/KAFKA-10448 and the corresponding KIP https://cwiki.apache.org/confluence/display/KAFKA/KIP-669%3A+Preserve+Source+Partition+in+Kafka+Streams+from+context

Seems there might also be demand in the KS layer. Thus, we might have to options to add this feature to ksqlDB. -- Did not re-read the KIP but maybe it's not easily possible (at least when the KIP was created -- not sure if anything changes) to use a custom partitioner for this use-case.