confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
44 stars 1.04k forks source link

primary key values are getting duplicated in the table instead of replacing with updated values #6433

Open Harshith2396 opened 3 years ago

Harshith2396 commented 3 years ago

the primary key constraint is not working. According to the documentation 'Later messages with the same key replace earlier.' but in my case it doesn't happen. i can see the old values and the new values both i.e, there are duplicates in the table To Reproduce Steps to reproduce the behavior, include:

  1. The version of KSQL used is 10.0.
  2. The command used create is create table client_table(user_ids bigint PRIMARY KEY, username string, password_expires_in_days string) with(kafka_topic='single_column3',value_format='json_sr'); 3.The structure of the table when described is:

Name : CLIENT_TABLE Field | Type

USER_IDS | BIGINT (primary key) USERNAME | VARCHAR(STRING) PASSWORD_EXPIRES_IN_DAYS | VARCHAR(STRING)

  1. when i query the table using the command select * from client_table emit changes;. My output is +-------------------------------------------------+------------- |USER_IDS |USERNAME |PASSWORD_EXPIRES_IN_DAYS +--------------------------------------------------+---------------

    1 null 45
    1 null 45
    1 null 45
    1 null 45
    1 null 45
agavra commented 3 years ago

Hello @Harshith2396 - when you query a table with EMIT CHANGES you are actually querying the changelog to the table. The kafka topic (which is how the changelog is represented) is eventually compacted to keep only the latest value for each key, but until then you will see the "duplicates". When this table is materialized (such as in a JOIN or AGGREAGTE), the semantics will be correct (you will only see one value for that key). You can read more about it here: https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/

Harshith2396 commented 3 years ago

@agavra imagine that i have a topic from which a stream is populated, now how do I create a table with the primary key from that stream without using joins and aggregates. This is my current use case. Streams will maintain history of the changes while the table will have the latest change serving the UI

agavra commented 3 years ago

We're planning on supporting an operator to natively do this in the future, but for now we support a workaround using LATEST_BY_OFFSET. See this tutorial https://docs.ksqldb.io/en/latest/how-to-guides/convert-changelog-to-table/ which I think describes what you want to accomplish.

Harshith2396 commented 3 years ago

@agavra that's actually what i was searching for this works. I am now facing a new issue with the workaround you suggested. The non primitive data types such as arrays, struct etc are not supported by the latest_by_offeset function. Can you please suggest me with an alternative that supports my work with arrays, struct and nested data. This will solve all of the problems i am facing

karno123 commented 3 years ago

Hi @Harshith2396 , Is this problem already solved? how do you actually solve this problem?, I have the same problem 🙇 https://github.com/confluentinc/ksql/issues/7486