confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
111 stars 1.04k forks source link

KSQL stream PARTITION BY is affected if KEY declared on source stream #1345

Open rmoff opened 6 years ago

rmoff commented 6 years ago

This behaves as expected. The derived stream is keyed on the PARTITION BY column:

ksql> CREATE STREAM CUSTOMER_SRC2 with (kafka_topic='asgard.inventory.customers', VALUE_FORMAT='AVRO');

 Message
----------------
 Stream created
----------------
ksql> SELECT ROWKEY, ID FROM CUSTOMER_SRC2 LIMIT 5;
 | 1
 | 2
 | 3                                                                                                                                                 
 | 4
 | 5
LIMIT reached for the partition.
Query terminated
ksql>
ksql> CREATE STREAM CUSTOMER_REKEY5 AS SELECT * FROM CUSTOMER_SRC2 PARTITION BY ID;

 Message
----------------------------
 Stream created and running
----------------------------
ksql> SELECT ROWKEY, ID FROM CUSTOMER_REKEY5 LIMIT 5;
1 | 1
3 | 3
7 | 7
8 | 8
9 | 9
LIMIT reached for the partition.
Query terminated

This does not behave as expected - note the (mistaken) use of KEY in the WITH clause:

ksql> CREATE STREAM CUSTOMER_SRC with (kafka_topic='asgard.inventory.customers', VALUE_FORMAT='AVRO', key='Id');

 Message
----------------
 Stream created
----------------

ksql> SELECT ROWKEY, ID FROM CUSTOMER_SRC LIMIT 5;
 | 1
 | 2
 | 3                                                                                                                                                 | 4

 | 5
LIMIT reached for the partition.
Query terminated
ksql>

ksql> CREATE STREAM REKEY1 AS SELECT * FROM CUSTOMER_SRC PARTITION BY ID;

 Message
----------------------------
 Stream created and running
----------------------------
ksql> SELECT ROWKEY, ID FROM REKEY1 LIMIT 5;
 | 7
( | 20
* | 21
. | 23
2 | 25
LIMIT reached for the partition.
Query terminated
ksql>

Note that the messages have not been rekeyed.

If a different column other than that specified in the CREATE STREAM KEY clause is used for rekeying, it works fine:

ksql> CREATE STREAM REKEY2 AS SELECT * FROM CUSTOMER_SRC PARTITION BY CITY;

 Message
----------------------------
 Stream created and running
----------------------------
ksql> SELECT ROWKEY, CITY FROM REKEY2 LIMIT 5;
Parthenay | Parthenay
Soisy-sous-Montmorency | Soisy-sous-Montmorency
Saint-Gaudens | Saint-Gaudens
Graz | Graz
Niederwaldkirchen | Niederwaldkirchen
LIMIT reached for the partition.
Query terminated
ksql>
big-andy-coates commented 5 years ago

Is this not a case of user error? When running a CT/CS with KEY='ID' you're telling KSQL that the key of the records is the same as the ID field. So when you do a PARTITION BY ID KSQL is correctly saying 'I don't need to repartition as its already partitioned by ID'...

mjsax commented 3 years ago

I think we can close this ticket, because the WITH (key=...) clause was removed during the "structured key" work AFAIK?