confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
128 stars 1.04k forks source link

KSQL does not populate ROWKEY #2950

Open alvaradojl opened 5 years ago

alvaradojl commented 5 years ago

This topic may be indirectly related to: #1405

I have a kafka topic with valid messages but with no keys

I created a first stream from this topic, rekeying on a non-null field (remoteAddress):

create stream stream_relay_v0_topic(requestId varchar, requestTime varchar, remoteAddress varchar, requestUri varchar, hostname varchar) \ with (value_format='JSON', kafka_topic='relay_v0_topic', key='remoteAddress');

the description of the stream is:


ksql> describe extended stream_relay_v0_topic;

Name                 : STREAM_RELAY_V0_TOPIC
Type                 : STREAM
Key field            : REMOTEADDRESS
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : relay_v0_topic (partitions: 10, replication: 2)

 Field         | Type                      
 ROWTIME       | BIGINT           (system) 
 ROWKEY        | VARCHAR(STRING)  (system) 
 REQUESTID     | VARCHAR(STRING)           
 REQUESTTIME   | VARCHAR(STRING)           
 REMOTEADDRESS | VARCHAR(STRING)           
 REQUESTURI    | VARCHAR(STRING)           
 HOSTNAME      | VARCHAR(STRING)        

If I query the stream columns remoteAddress and ROWKEY I expect them to be the same but the result is:

ksql> select rowkey, remoteAddress from stream_relay_v0_topic limit 5;
null | 172.19.0.8
null | 172.19.0.5
null | 172.19.0.8
null | 172.19.0.5
null | 172.19.0.8
Limit Reached
Query terminated

What I tried:

I created a second stream to check if I could rekey from another stream instead of the topic and I still have the same problem:


create stream stream_all_requests \
with (partitions=10, value_format='json') AS \
select requestId, requestTime, remoteAddress, requestUri, hostname \
from stream_relay_v0_topic \
partition by remoteAddress;

therefore the described stream is

ksql> describe extended STREAM_ALL_REQUESTS;

Name                 : STREAM_ALL_REQUESTS
Type                 : STREAM
Key field            : REMOTEADDRESS
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : STREAM_ALL_REQUESTS (partitions: 10, replication: 1)

 Field         | Type                      
-------------------------------------------
 ROWTIME       | BIGINT           (system) 
 ROWKEY        | VARCHAR(STRING)  (system) 
 REQUESTID     | VARCHAR(STRING)           
 REQUESTTIME   | VARCHAR(STRING)           
 REMOTEADDRESS | VARCHAR(STRING)           
 REQUESTURI    | VARCHAR(STRING)           
 HOSTNAME      | VARCHAR(STRING)           

Queries that write into this STREAM
-----------------------------------
CSAS_STREAM_ALL_REQUESTS_3 : create stream stream_all_requests with (partitions=10, value_format='json') AS select requestId, requestTime, remoteAddress, requestUri, hostname from stream_relay_v0_topic partition by remoteAddress;

I keep having a ROWKEY null from both streams


ksql> select rowkey, remoteAddress from stream_all_requests limit 5;
null | 172.19.0.5
null | 172.19.0.5
null | 172.19.0.8
null | 172.19.0.8
null | 172.19.0.5

*I tried using docker images https://hub.docker.com/r/confluentinc/cp-ksql-cli/ https://hub.docker.com/r/confluentinc/cp-ksql-server/

tags: latest, 5.1.3, 5.2.1

allenansari174 commented 5 years ago

I have the same issue: I created a stream but rowkey format is like this: {"schema":{"type":"struct","fields":[{"type":"int64","optional":true,"field":"Id"}],"optional":false,"name":"com.github.jcustenborder.kafka.connect.twitter.StatusKey","doc":"Key for a twitter status."},"payload":{"Id":1140239850848882688}}

and the rest of the fields are null

any idea how to solve this issue.

ksql> SELECT * FROM twitter_raw LIMIT 1; 1560689344973 | {"schema":{"type":"struct","fields":[{"type":"int64","optional":true,"field":"Id"}],"optional":false,"name":"com.github.jcustenborder.kafka.connect.twitter.StatusKey","doc":"Key for a twitter status."},"payload":{"Id":1140239850848882688}} | null | null | null