confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
127 stars 1.04k forks source link

No way to have the output of a function as both a new field and in Partition By with CSAS #7998

Open wlaforest opened 3 years ago

wlaforest commented 3 years ago

I would like to be able to use the output of a function for both an output field and in the partition by. Initially I used the alias of the output field in the partition by which seemed very natural since the partition by is for the new stream being created.

CREATE STREAM BUS WITH (KAFKA_TOPIC='bus_prepped', PARTITIONS=10, REPLICAS=1) AS SELECT geo_geohash(Lat,Lon,5) GEOHASH, * FROM BUS_RAW b PARTITION BY GEOHASH EMIT CHANGES;

There is some discussion of this issue here (https://github.com/confluentinc/ksql/issues/2701).

Then when I tried to use the function for the new field and the partition by I get an error. so for

CREATE STREAM BUS WITH (KAFKA_TOPIC='bus_prepped', PARTITIONS=10, REPLICAS=1) AS SELECT geo_geohash(Lat,Lon,5) GEOHASH, * FROM BUS_RAW b PARTITION BY geo_geohash(Lat,Lon,5) EMIT CHANGES;

I get the error message

The projection contains the key column more than once:GEOHASHandKSQL_COL_0. Each key column must only be in the projection once. If you intended to copy the key into the value, then consider using theAS_VALUEfunction to indicate which key reference should be copied.

But using AS_VALUE doesn't work because the key does not yet exist or perhaps I haven't figured out how to properly invoke it in this circumstance.

  1. I don't believe partition by has to match the same behavior as group by as discussed in issue 2701 because I would argue partition by is specifically related to the new stream. If we do this then we should be able to just use the new field in the output stream.
  2. If we really insist on having the same behavior, then we should be able to use the function for a value field and in the partition by (but if its the same expression obviously do not calculate it twice
mjsax commented 3 years ago

Related to https://github.com/confluentinc/ksql/issues/1034

In general, SQL only allows to refer to column names from input schemas (ie, input streams and tables in our case). If you apply a function in GROUP BY for example, you need to call the same function in the SELECT clause to include the grouping column in the result schema. -- It's up to the system to detect that both expressions are the same and to not recompute it twice.

Some systems defer from SQL standard and allow AS <name> on different places, and it can increase usability for the use. However, it requires a more complex parsing and logical planning, as it might require to travers the plan multiple times to stitch all names together.

For ksqlDB, there is one more non-standard aspect: ksqlDB might create new output columns that are not part of the input schema at all. Thus, to allow people to rename/use those columns, we first need to create the ksqlDB generate output schema, before we can apply AS <name> of someFunction(generatedColumnName) which requires a multi-pass parsing/query planning approach.

Bottom line: I think it's worth to do, but it's complicated.

PeterLindner commented 3 years ago

A subquery may be an alternative (once they are implemented #745 )

CREATE STREAM BUS
WITH (...)
SELECT 
 GEOHASH,
 OTHERS
FROM (
 SELECT
  geo_geohash(LAT, LON, 5) AS GEOHASH,
  OTHERS
 FROM BUS_RAW
)
PARTITION BY GEOHASH
EMIT CHANGES;

I at least would expect subqueries to work that way.

Another benefit would be, that it also feels natural for quite complex uscases involving collection types or table functions like I mentioned in https://github.com/confluentinc/ksql/issues/7758#issuecomment-877401153

wlaforest commented 3 years ago

@mjsax Thanks for the insight on the complexity. I hadn't considered the need for multi-pass parsing query planning but now that you explain it it makes sense. The thing that is causing me pain here is that I have to materialize the data twice just to partition by something that is available in the first query.

@PeterLindner sub-query seems like it should work but to me seems less natural for this use case. Also it seems like it might be complicated to get the query planning right such that it wouldn't materialize the data twice which is really important. I could be wrong about that though.