confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
116 stars 1.04k forks source link

Partition by output of EXPLODE function #7758

Open cjmatta opened 3 years ago

cjmatta commented 3 years ago

Describe the bug When trying to partition by the output of the EXPLODE function I get the following error:

Could not determine output schema for query due to error: Logical and Physical schemas do not match! Logical : `ID` BIGINT, `CITY` STRING Physical: `KSQL_COL_0` STRING KEY, `ID` BIGINT, `CITY` STRING Statement: CREATE STREAM CMATTAREKEYDESTINATION WITH (KAFKA_TOPIC='cmatta-rekey-destination', PARTITIONS=6, REPLICAS=3, VALUE_FORMAT='json') AS SELECT CMATTAREKEYSOURCE.ID ID, EXPLODE(CMATTAREKEYSOURCE.CITIES) CITY FROM CMATTAREKEYSOURCE CMATTAREKEYSOURCE PARTITION BY EXPLODE(CMATTAREKEYSOURCE.CITIES) EMIT CHANGES;

To Reproduce

Version 0.18 (Confluent Cloud)

Register table

CREATE STREAM cmattarekeysource (id bigint, cities ARRAY<STRING>) 
WITH (kafka_topic='cmatta-rekey-source', value_format='json') ;

insert records

insert into cmattarekeysource (id, cities) values (1, ARRAY['New York', 'Philadelphia', 'Boston']);
insert into cmattarekeysource (id, cities) values (2, ARRAY['Washington DC', 'Charlotte', 'Atlanta']);
insert into cmattarekeysource (id, cities) values (3, ARRAY['Los Angeles', 'San Francisco', 'San Diego']);
insert into cmattarekeysource (id, cities) values (4, ARRAY['Dallas', 'Austin', 'Houston']);

attempt to create re-key stream:

CREATE STREAM cmattarekeydestination with (kafka_topic='cmatta-rekey-destination', value_format='json') as 
SELECT 
    id,
    EXPLODE(cities) as city
from cmattarekeysource 
partition by EXPLODE(cities)
emit changes;

Expected behavior I would expect a stream of events for each exploded record partitioned by the exploded item.

Actual behaviour received this error message:

Could not determine output schema for query due to error: Logical and Physical schemas do not match! Logical : `ID` BIGINT, `CITY` STRING Physical: `KSQL_COL_0` STRING KEY, `ID` BIGINT, `CITY` STRING Statement: CREATE STREAM CMATTAREKEYDESTINATION WITH (KAFKA_TOPIC='cmatta-rekey-destination', PARTITIONS=6, REPLICAS=3, VALUE_FORMAT='json') AS SELECT CMATTAREKEYSOURCE.ID ID, EXPLODE(CMATTAREKEYSOURCE.CITIES) CITY FROM CMATTAREKEYSOURCE CMATTAREKEYSOURCE PARTITION BY EXPLODE(CMATTAREKEYSOURCE.CITIES) EMIT CHANGES;
cjmatta commented 3 years ago

Similar query with a different error:

CREATE STREAM cmattarekeydestination with (kafka_topic='cmatta-rekey-destination', value_format='json') as 
SELECT 
    id,
    EXPLODE(cities) as city
from cmattarekeysource 
partition by city
emit changes;

Error:

Could not determine output schema for query due to error: Line: 6, Col: 14: PARTITION BY column 'CITY' cannot be resolved. Statement: CREATE STREAM CMATTAREKEYDESTINATION WITH (KAFKA_TOPIC='cmatta-rekey-destination', PARTITIONS=6, REPLICAS=3, VALUE_FORMAT='json') AS SELECT CMATTAREKEYSOURCE.ID ID, EXPLODE(CMATTAREKEYSOURCE.CITIES) CITY FROM CMATTAREKEYSOURCE CMATTAREKEYSOURCE PARTITION BY CITY EMIT CHANGES;
maksymilian-gasztych commented 3 years ago

I would like that to be fixed too, currently we need to make additional stream on top of exploded one to repartition by "explosion" result.

agavra commented 3 years ago

@maksymilian-gasztych would be the suggested workaround right now - I believe the problem here is that PARTITION BY cannot accept a UDTF (it doesn't make sense to partition by something that could potentially have multiple outputs). I understand what it is that you want, and I believe if anything the second example (partition by city) is what would make sense, but the problem is that partition by works on the input schema not the output schema, which makes it a little clunky.

The error message should definitely be improved.

PeterLindner commented 3 years ago

@agavra with the addition of subqueries (#745) this would be for free, wouldn't it? Something like

CREATE STREAM TARGET AS
SELECT
 ID,
 CITY
FROM (
 SELECT
  ID,
  EXPLODE(CITIES) AS CITY
 FROM SOURCE
 EMIT CHANGES 
)
PARTITION BY CITY
EMIT CHANGES;

should produce the desired result and ideally would`t create an intermediate stream.

agavra commented 3 years ago

@PeterLindner depending on the implementation of subqueries that could definitely be a good way to go about it - nice observation

wlaforest commented 3 years ago

This is highly desirable for me as well otherwise I have to re-partition data with a separate query for a customer.

Somewhat bizarrely I get a different error message when I try to use a UDTF. When I run

CREATE STREAM FENCE WITH (KAFKA_TOPIC='fence', PARTITIONS=10, REPLICAS=1) AS SELECT * FROM FENCE_RAW PARTITION BY GEO_COVERING_GEOHASHES(_raw_data, 5);

where GEO_COVERING_GEOHASHES is a UDTF, I get a messaging saying

Invalid SelectKey: Can't find any functions with the name 'GEO_COVERING_GEOHASHES'. expression:GEO_COVERING_GEOHASHES(_RAW_DATA, 5), schema:ROWKEYSTRING KEY,TYPESTRING,propertiesMAP<STRING, STRING>,GEOMETRYMAP<STRING, STRING>,_RAW_DATASTRING,ROWTIMEBIGINT,ROWKEYSTRING Caused by: Can't find any functions with the name 'GEO_COVERING_GEOHASHES'

but GEO_COVERING_GEOHASHES is installed and working properly. This CSAS works fine

CREATE STREAM FENCE WITH (KAFKA_TOPIC='fence', PARTITIONS=10, REPLICAS=1) AS SELECT GEO_COVERING_GEOHASHES(_raw_data, 5) GEOHASH, * FROM FENCE_RAW;