confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
128 stars 1.04k forks source link

Materialized table from a topic in ksqlDB #9101

Open sujatapatnaik52 opened 2 years ago

sujatapatnaik52 commented 2 years ago

I am trying to create a materialized table from a topic. I am creating and producing data into the topic as follows:


kafka-topics.sh create --bootstrap-server <ip:port> --partitions 2 --topic my-users-topic

kafka-console-producer.sh --bootstrap-server <ip:port> --topic my-users-topic
>{"id":1,"usertimestamp":1651818648,"gender":"nobody","region_id":"nowhere"}

kafka-console-consumer.sh --bootstrap-server <ip:port> --topic my-users-topic --from-beginning --property print.headers=true --property print.partition=true
Partition:1 NO_HEADERS  {"id":1,"usertimestamp":1651818648,"gender":"nobody","region_id":"nowhere"}

After this, I created the materialized table as follows:

ksql> CREATE SOURCE TABLE testusers (
     id BIGINT PRIMARY KEY,
     usertimestamp BIGINT,
     gender VARCHAR,
     region_id VARCHAR
   ) WITH (
     KAFKA_TOPIC = 'my-users-topic', 
     VALUE_FORMAT = 'JSON'
   );

Message                                
----------------------------------------
 Created query with ID CST_TESTUSERS_57 
----------------------------------------

When I am querying the above table, I see no data

ksql> select * from testusers;
+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+
|ID                                              |USERTIMESTAMP                                   |GENDER                                          |REGION_ID                                       |
+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+
Query terminated

The persistent query is in running state but the data is not getting updated. Is there something obvious that I am missing here? When I am trying to create the materialized table from a stream with latest offset, it works as explained in the docs.

Thank you.

Gerrrr commented 2 years ago

Similar to other databases, ksqlDB's primary key cannot be null. The record in your example has a JSON value, but does not have a key. This is the reason why it does not show up in the source table. You can see that in the stream:

ksql> CREATE STREAM testusers (
>     id BIGINT KEY,
>     usertimestamp BIGINT,
>     gender VARCHAR,
>     region_id VARCHAR
>   ) WITH (
>     KAFKA_TOPIC = 'my-users-topic',
>     VALUE_FORMAT = 'JSON'
>   );

 Message
----------------
 Stream created
----------------
ksql> select * from testusers;
+--------------------------------------+--------------------------------------+--------------------------------------+--------------------------------------+
|ID                                    |USERTIMESTAMP                         |GENDER                                |REGION_ID                             |
+--------------------------------------+--------------------------------------+--------------------------------------+--------------------------------------+
|null                                  |1651818648                            |nobody                                |nowhere                               |
Query Completed
Query terminated

You should add the key to the records and make sure that the key type matches the type in the source table.