confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
5.88k stars 1.04k forks source link

Pre-join repartitions should only include columns necessary for the join #7489

Open vcrfxia opened 3 years ago

vcrfxia commented 3 years ago

Is your feature request related to a problem? Please describe.

When ksqlDB repartitions sources in preparation for a join (either to rekey a stream, or to ensure copartitioning for key format mismatches or in the case of Schema-Registry-enabled key formats), ksqlDB includes all columns in the repartition topic, including columns that are not accessed as part of the join or the projection that follows the join. This is wasteful since these additional columns are written not only to the repartition topic but also to any state stores needed as part of the join. We should optimize ksqlDB to only include the necessary columns in the pre-join repartition, similar to how ksqlDB optimizes the set of columns written to repartition and changelog topics for aggregations.

Describe the solution you'd like

ksqlDB's query planner should determine the set of columns necessary for a join upfront and only include these columns in repartition topics prior to the join.

Describe alternatives you've considered

N/A

Additional context

Note that this optimization is different from the one proposed in https://github.com/confluentinc/ksql/issues/4748. This one is about only including the necessary columns while https://github.com/confluentinc/ksql/issues/4748 is about only including the relevant rows.

mjsax commented 3 years ago

Not sure if we should just "hack this in" or actually clean it up via https://github.com/confluentinc/ksql/issues/7548 properly.