confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
102 stars 1.04k forks source link

KSQL Multi Stream Join duplicates the data #7253

Closed Psykepro closed 3 years ago

Psykepro commented 3 years ago

Describe the bug KSQL Multi Join Table generates double the number of records expected.

To Reproduce Steps to reproduce the behavior, include:

  1. The version of KSQL: v6.0.2
  2. Sample source data: Source data is just messages created from debezium using Log Miner for tables from Oracle.
  3. I created a Source streams and streams partitioned by the ID from the topics with the debezium messages for 3 tables:
    
    CREATE STREAM IF NOT EXISTS TABLE_A_SRC WITH (KAFKA_TOPIC = 'TABLE_A', VALUE_FORMAT='AVRO');
    CREATE STREAM IF NOT EXISTS TABLE_A_BY_PK AS SELECT * FROM TABLE_A_SRC PARTITION BY ID;

CREATE STREAM IF NOT EXISTS TABLE_B_SRC WITH (KAFKA_TOPIC = 'TABLE_B', VALUE_FORMAT='AVRO'); CREATE STREAM IF NOT EXISTS TABLE_B_BY_PK AS SELECT * FROM TABLE_B_SRC PARTITION BY ID;

CREATE STREAM IF NOT EXISTS TABLE_C_SRC WITH (KAFKA_TOPIC = 'TABLE_C', VALUE_FORMAT='AVRO'); CREATE STREAM IF NOT EXISTS TABLE_C_BY_PK AS SELECT * FROM TABLE_C_SRC PARTITION BY ID;

4. Created Stream With Multiple Joins as follows:

CREATE STREAM TABLE_A_ENRICHED AS SELECT ta.ID, ta.NAME, ta.__OP, tb.ID, tb.START_DATE, tb.END_DATE, tc.ID, tc.DESCRIPTION FROM TABLE_A_BY_PK as ta LEFT JOIN TABLE_B_BY_PK as tb WITHIN 365 DAYS ON tb.ID= ta.B_ID LEFT JOIN TABLE_C_BY_PK as tc WITHIN 365 DAYS ON tc.ID = ta.C_ID

PARTITION BY tc.ID

EMIT CHANGES;



**Expected behavior**
To create in stream TABLE_A_ENRICHED 1 message per record from TABLE_A_BY_PK .

**Actual behavior**
It creates messages in TABLE_A_ENRICHED double the number of records in TABLE_A_BY_PK.
Psykepro commented 3 years ago

Now I check creating join stream just with TABLE_A with TABLE_B separately and joining TABLE_A with TABLE_C and the first one ended up with a proper count of records but the second one has them duplicated again. I can't use the real names of columns and everything because of privacy reasons. But when I do the same join in Oracle with the 3 tables there is no problem only with KSQL streams joining is creating duplicate rows. Can you please point me out what to look at? What can cause duplicating all rows?

maksymilian-gasztych commented 3 years ago

Not sure I understand correctly, but you created a stream TABLE_A_ENRICHED, so how can you expect to have only one record there ? stream will have as many records as there are updates to the source tables. (depends also on commit.interval.ms and other stuff though) If you created TABLE, not stream, you wouldn't have duplicated records. I don't think that non windowed joins on streams are supported so I'm surprised you could even create that stream without errors. https://docs.ksqldb.io/en/latest/developer-guide/joins/join-streams-and-tables/#join-capabilities

I would also suggest that you give your streams STREAM prefix, not TABLE prefix because it's hard to maintain in the long run.

Psykepro commented 3 years ago

No, not to have only one record but to have one record per record in TABLE_A. And yes I have STREAM prefix in the real names just wanted to give example with more abstract names. And those streams are windowed using the WITHIN clause.

But the problem is that I get duplicate records for each record in TABLE_A if I join it with TABLE_C but I don't get them if I join only with TABLE_B which doesn't make sense.

I tried to create a table but it doesn't allow me because I need to partition them by specific existing key/column from the messages. And for the KSQL Tables need to specify a new key as the primary key which doesn't work for this case.

Psykepro commented 3 years ago

Also when I try to create table for example like that where :

CREATE TABLE TABLE_TRANSACTIONS_ENRICHED AS
    SELECT
        a.AGR_SERIAL as AGR_SERIAL, a.AGR_START_DATE, a.AGR_END_DATE,
        t.TRN_SERIAL as TRN_SERIAL, t.TRN_DEBIT_VALUE, t.TRN_CREDIT_VALUE, t.TRN_TRANSACTION_COMMENT, t.TRN_PAYING_IN_REFERENCE, t.TRN_DATE_TIME, t.TRN_EFFECTIVE_DATE, t.__OP, t.TRN_AGREEMENT,
        tt.TRT_SERIAL, tt.TRT_DESCRIPTION, COUNT(t.TRN_SERIAL) as TOTAL
    FROM STREAM_TRANSACTIONS_BY_PK as t
    INNER JOIN STREAM_TRANSACTION_TYPES_BY_PK as tt WITHIN 365 DAYS ON t.TRN_TYPE = tt.TRT_SERIAL
    INNER JOIN STREAM_AGREEMENTS_BY_PK as a WITHIN 365 DAYS ON t.TRN_AGREEMENT = a.AGR_SERIAL

        GROUP BY TRN_SERIAL, TRN_AGREEMENT, AGR_SERIAL, AGR_START_DATE,
        AGR_END_DATE, TRN_DEBIT_VALUE, TRN_CREDIT_VALUE, TRN_TRANSACTION_COMMENT,
        TRN_PAYING_IN_REFERENCE, TRN_DATE_TIME, TRN_EFFECTIVE_DATE, t.__OP,
        TRT_SERIAL, TRT_DESCRIPTION

EMIT CHANGES;

I got this error for several columns in the KSQL log: [2021-03-19 14:15:48,578] ERROR {"type":1,"deserializationError":null,"recordProcessingError":{"errorMessage":"Group-by column with index 5 resolved to null. The source row will be excluded from the table.","record":null,"cause":[]},"productionError":null} (processing.CTAS_TABLE_TRANSACTIONS_ENRICHED_201.Aggregate.GroupBy)

Then I try like that with IFNULL:

CREATE TABLE TABLE_TRANSACTIONS_ENRICHED AS
    SELECT
        a.AGR_SERIAL as AGR_SERIAL, IFNULL(a.AGR_START_DATE, ''), IFNULL(a.AGR_END_DATE, ''),
        t.TRN_SERIAL as TRN_SERIAL, IFNULL(t.TRN_DEBIT_VALUE, 0), IFNULL(t.TRN_CREDIT_VALUE, 0), t.TRN_TRANSACTION_COMMENT, t.TRN_PAYING_IN_REFERENCE, t.TRN_DATE_TIME, t.TRN_EFFECTIVE_DATE, t.__OP, t.TRN_AGREEMENT,
        tt.TRT_SERIAL, tt.TRT_DESCRIPTION, COUNT(t.TRN_SERIAL) as TOTAL
    FROM STREAM_TRANSACTIONS_BY_PK as t
    INNER JOIN STREAM_TRANSACTION_TYPES_BY_PK as tt WITHIN 365 DAYS ON t.TRN_TYPE = tt.TRT_SERIAL
    INNER JOIN STREAM_AGREEMENTS_BY_PK as a WITHIN 365 DAYS ON t.TRN_AGREEMENT = a.AGR_SERIAL

        GROUP BY TRN_SERIAL, TRN_AGREEMENT, AGR_SERIAL, AGR_START_DATE,
        AGR_END_DATE, TRN_DEBIT_VALUE, TRN_CREDIT_VALUE, TRN_TRANSACTION_COMMENT,
        TRN_PAYING_IN_REFERENCE, TRN_DATE_TIME, TRN_EFFECTIVE_DATE, t.__OP,
        TRT_SERIAL, TRT_DESCRIPTION

EMIT CHANGES;

And then I got this error:

Function 'IFNULL' does not accept parameters (BIGINT, STRING).
Valid alternatives are:
IFNULL(T expression, T altValue)
For detailed information on a function run: DESCRIBE FUNCTION <Function-Name>;

Btw I'm not sure what to use for default value for timestamps as the START_DATE and END_DATE are timestamps in the kafka messages.

Psykepro commented 3 years ago

I have managed to create tables from debezium messages by switching to format JSON but now when I try to join the 3 tables with query:

CREATE TABLE TABLE_TRANSACTIONS_ENRICHED AS
    SELECT
        a.AGR_SERIAL, a.AGR_START_DATE, a.AGR_END_DATE,
        t.TRN_SERIAL, t.TRN_DEBIT_VALUE, t.TRN_CREDIT_VALUE, t.TRN_TRANSACTION_COMMENT, t.TRN_PAYING_IN_REFERENCE, t.TRN_DATE_TIME, t.TRN_EFFECTIVE_DATE, t.__OP,
 tt.TRT_DESCRIPTION
  FROM TABLE_TRANSACTIONS as t
    LEFT JOIN TABLE_TRANSACTION_TYPES as tt ON t.TRN_TYPE = tt.TRT_SERIAL
    LEFT JOIN TABLE_AGREEMENTS as a ON t.TRN_AGREEMENT = a.AGR_SERIAL

I got error: Cannot repartition a TABLE source. If this is a join, make sure that the criteria uses the TABLE's key column TRN_SERIAL instead of TRN_TYPE

mjsax commented 3 years ago

Did you inspect the result data? Could the issue be related to LEFT JOIN -- a stream-stream left-join might produce spurious left-join results, even if the record produce an inner-join result later? Cf https://issues.apache.org/jira/browse/KAFKA-10847 (there is already WIP for this ticket and thus it should be fixed soon).

In general, I agree with @maksymilian-gasztych -- if you read a table changelog, you might want to consume it as a TABLE, not as a STREAM to get desired semantics -- not sure why you need to repartition the data though? (I am not familiar with the details of debezium... Is the PK of your table not in the Kafka message-key as it should be?)

One workaround could be, to read the original topic as STRAM to re-partition it on the PK and write it back into a topic. (Repartition a TABLE does semantically not make sense, because a table must be partitioned by its PK, and if you create a TABLE the assumption is always that the data is already partitioned by its PK.) In a second step you read the correctly partition topics as TABLE and do the join.

Psykepro commented 3 years ago

@mjsax for the Streams yep I guess this is the problem which is explained in the ticket: https://issues.apache.org/jira/browse/KAFKA-10847

And for the re-partition the problem is that I want to join on keys which are not the PRIMARY KEY in TABLE_TRANSACTIONS. I want to have a table partitioned by TRN_SERIAL while joining TABLE_TRANSACTION_TYPES on TRN_TYPE and TABLE_AGREEMENTS on TRN_AGREEMENT. But seems like this is not possible currently as I found this issue: https://github.com/confluentinc/ksql/issues/4424. In which I see the last comment the guy says they are working on the solution currently.

mjsax commented 3 years ago

4424 is for table-table joins though. And yes, it's WIP to add FK-join support for table-table joins.

Doing a stream-stream join to "mimic" the FK-table-table join behavior could work, but it's not guaranteed. Semantics of a stream-stream join a quite different...

Another workaround could be, to read the topic as stream, and group-by-aggregate on the join attribute and collect a list of "rows" in the aggregation function. After the table-table join, you could use EXPLODE to flatten the result.

Overall, it seems that this ticket does not report known issues and there is already WIP to address those issues. Can we close this ticket?

Psykepro commented 3 years ago

@mjsax Yes this was useful info thank you for that! I'm sorry that I couldn't find it before creating the ticket. I'm closing it. :)

mjsax commented 3 years ago

No problem. Glad we could "resolve" this for now.