confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
120 stars 1.04k forks source link

Stream-Stream self-join requires duplicate topic #2030

Open rmoff opened 6 years ago

rmoff commented 6 years ago

Source events in topic atm_txns_gess

ksql> DESCRIBE EXTENDED ATM_TXNS_GESS;

Name                 : ATM_TXNS_GESS
Type                 : STREAM
Key field            :
Key format           : STRING
Timestamp field      : TIMESTAMP
Value format         : JSON
Kafka topic          : atm_txns_gess (partitions: 1, replication: 1)

 Field          | Type
-------------------------------------------------
 ROWTIME        | BIGINT           (system)
 ROWKEY         | VARCHAR(STRING)  (system)
 ACCOUNT_ID     | VARCHAR(STRING)
 ATM            | VARCHAR(STRING)
 LOCATION       | STRUCT<LON DOUBLE, LAT DOUBLE>
 AMOUNT         | INTEGER
 TIMESTAMP      | VARCHAR(STRING)
 TRANSACTION_ID | VARCHAR(STRING)
-------------------------------------------------

Local runtime statistics
------------------------
consumer-messages-per-sec:      2.12 consumer-total-message-bytes:   2961608 consumer-total-messages:     13708     last-message: 10/9/18 2:56:30 PM UTC
consumer-failed-messages:         0 failed-messages-per-sec:         0      last-failed:       n/a
(Statistics of the local KSQL server interaction with the Kafka topic atm_txns_gess)

Self-join :

ksql> SELECT A.ATM, B.ATM FROM ATM_TXNS_GESS A INNER JOIN ATM_TXNS_GESS B WITHIN (0 MINUTES, 10 MINUTES) ON A.ACCOUNT_ID=B.ACCOUNT_ID;
Invalid topology: Topic atm_txns_gess has already been registered by another source.
ksql>

This error (Invalid topology: Topic atm_txns_gess has already been registered by another source.) forces a workaround whereby a second topic is built from the first, and then used for the join:

ksql> CREATE STREAM ATM_TXNS_GESS_02 WITH (PARTITIONS=1) AS SELECT * FROM ATM_TXNS_GESS;

ksql> SELECT A.ATM, B.ATM FROM ATM_TXNS_GESS A INNER JOIN ATM_TXNS_GESS_02 B WITHIN (0 MINUTES, 10 MINUTES) ON A.ACCOUNT_ID=B.ACCOUNT_ID;
ATM : 3409740389 | ATM : 3409740389
ATM : 3663514950 | ATM : 3663514950
Barclays Bank PLC | Barclays Bank PLC
Lloyds | Lloyds
Morrisons | Morrisons
Santander | Santander
ATM : 51401467 | ATM : 51401467

Can KSQL permit self-joins on streams sourced from the same topic?

mjsax commented 6 years ago

Seems to be an Kafka Streams limitation? Having said this, I think KSQL could build a workaround by duplicating the data into a second topic under the hood. However, this would be wasteful. The better fix would be to add a self-join to Kafka Streams IMHO. Do you mind creating an AK Jira for this?

big-andy-coates commented 6 years ago

+1 for fixing in KStreams and avoiding duplicating the topic data.

rmoff commented 6 years ago

I've raised https://issues.apache.org/jira/browse/KAFKA-7497

Kaiserchen commented 6 years ago

@mjsax KSQL could just add another child to the source Node. Highly recommend not fixing in streams but have KSQL reuse the Kstreamsource instead of re-registering. How would that fix look anyways? if 2 sources register with overlapping wildcards?

mjsax commented 6 years ago

@rmoff Thanks for creating the AK ticket.

@Kaiserchen I left a comment on the AK ticket (partly copied below) that explains why I think KSQL cannot solve this issue itself:

Once could express a self-joining like this in current Streams API:

KStream stream = builder.stream(...); stream.join(stream, ...);

However, the execution of the join would not be efficient, as two state stores with two changelog topics would be created (both containing the exact same data). Also, and this seems to be the most severs issue, each record would join with itself, what is actually not desired...

mjsax commented 3 years ago

https://issues.apache.org/jira/browse/KAFKA-6687 might be useful for this case? We still should address https://issues.apache.org/jira/browse/KAFKA-7497 at some point, but maybe KAFKA-6687 is good enough to just unlock this feature while KAFKA-7497 would be an internal optimization to compute the self-join more efficient?

mjsax commented 3 years ago

Given my above comment:

Also, and this seems to be the most severs issue, each record would join with itself, what is actually not desired...

I think this is actually not correct... At least if we consider self-joins in standard SQL, a record would join with itself. We should follow the same semantics, and thus, it's possible (even not efficient) today with Kafka Stream to do a self-join.

Thus, if we don't worry too much about efficiency, ksqlDB could implement it.

Hubbitus commented 2 weeks ago

Hello. https://issues.apache.org/jira/browse/KAFKA-7497 (https://issues.apache.org/jira/browse/KAFKA-14209) had been fixed. Will it be supported then in ksql?