confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
114 stars 1.04k forks source link

ROWKEY is never present in ksqlDB #10352

Closed pavel-kalmykov closed 4 months ago

pavel-kalmykov commented 4 months ago

Describe the bug For some reason, it appears that any streams/tables created do not have the ROWKEY column. As per my understanding, this is something ksqlDB always inserts, at least according to the CREATE STREAM statement documentation. Oddly enough, the ROWTIME column is present, though.

To Reproduce To reproduce this, I am using the cp-all-in-one-kraft docker-compose template, v.7.6.1. Additionally, I also use Nix's confluent-platform package to issue the topic creation/production commands. I create a topic, produce some data into it, and create a stream with ksql. So the setup flow goes something like this:

$ docker compose up -d
[+] Running 9/9
 ✔ Network ksql-all_default               Created
 ✔ Container broker                       Started
 ✔ Container postgres                     Started
 ✔ Container schema-registry              Started
 ✔ Container connect                      Started
 ✔ Container rest-proxy                   Started
 ✔ Container ksqldb-server                Started
 ✔ Container ksqldb-cli                   Started
 ✔ Container ksql-datagen                 Started
 ✔ Container control-center               Started
$ nix-shell -p confluent-platform

[nix-shell:~]$ kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic messages
Created topic messages.

[nix-shell:~]$ kafka-console-producer --broker-list localhost:9092 --topic messages << EOF
hello
world
EOF
[nix-shell:~]$ ksql

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =        The Database purpose-built       =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2022 Confluent Inc.

CLI v7.6.0, Server v7.6.1 located at http://localhost:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> show topics;

 Kafka Topic                 | Partitions | Partition Replicas 
---------------------------------------------------------------
 default_ksql_processing_log | 1          | 1                  
 docker-connect-configs      | 1          | 1                  
 docker-connect-offsets      | 25         | 1                  
 docker-connect-status       | 5          | 1                  
 messages                    | 1          | 1                  
---------------------------------------------------------------
ksql> print 'messages' from beginning limit 2;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: KAFKA_STRING
rowtime: 2024/05/27 23:39:54.275 Z, key: <null>, value: hello, partition: 0
rowtime: 2024/05/27 23:39:54.281 Z, key: <null>, value: world, partition: 0
Topic printing ceased
ksql> -- when printing, I am already missing the rowkey value, although the rowtime is present there;
ksql> CREATE STREAM messages_stream (message VARCHAR) WITH (KAFKA_TOPIC='messages', VALUE_FORMAT='DELIMITED');

 Message        
----------------
 Stream created 
----------------
ksql> describe messages_stream extended;

Name                 : MESSAGES_STREAM
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : DELIMITED
Kafka topic          : messages (partitions: 1, replication: 1)
Statement            : CREATE STREAM MESSAGES_STREAM (MESSAGE STRING) WITH (CLEANUP_POLICY='delete', KAFKA_TOPIC='messages', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');

 Field   | Type            
---------------------------
 MESSAGE | VARCHAR(STRING) 
---------------------------

Local runtime statistics
------------------------

(Statistics of the local KSQL server interaction with the Kafka topic messages)
ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

Expected behavior After the above set up, I would expect to be able to query the internal/system columns (ROWKEY/ROWTIME) for my stream.

Actual behaviour Following the console output described above, we issue two queries to see how selecting the ROWKEY results in an error:

ksql> select rowtime, * from messages_stream emit changes limit 2;
+--------------------------------------+--------------------------------------+
|ROWTIME                               |MESSAGE                               |
+--------------------------------------+--------------------------------------+
|1716853194275                         |hello                                 |
|1716853194281                         |world                                 |
Limit Reached
Query terminated
ksql> select rowkey, rowtime, * from messages_stream emit changes limit 2;
Line: 1, Col: 8: SELECT column 'ROWKEY' cannot be resolved.

Additional context This error also applies to tables, rendering them completely unusable.

In any case, I think I am missing something, but I do not know what it can be.

pavel-kalmykov commented 4 months ago

All right, I just realized that this changed a couple of versions ago by reading https://www.confluent.io/blog/ksqldb-0-10-updates-key-columns/. My bad.

Feel free to delete this issue if appropriate — I apologise for the inconvenience.