confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
126 stars 1.04k forks source link

ROWKEY in windowed KTable stores utf-8 null characters #1568

Open codestoned1 opened 6 years ago

codestoned1 commented 6 years ago

After creating a KSQL table to compute windowed aggregations/counts, I am encountering an error when trying to export the data from the underlying topic using Kafka Connect.

the KSQL query I am runnng : create table json_test_table_windowed2 with (kafka_topic='json_test_table_windowed2',value_format='json') as select thing, request_method, count(*) as count from json_test_stream2 WINDOW TUMBLING (size 10 seconds) group by thing, request_method

The output when selecting from the KSQL CLI:

ksql_cli_windowed

The output when consuming using a basic consumer in the python client:

consumer_windowed

The reason this is a problem:

I have a JDBC sink connector (for PostgreSQL) configured to automatically create PostgreSQL tables and use the ROWKEY from a specific topic as the primary key for the PostgreSQL table. It is currently throwing an error due to the null characters present in the ROWKEY.

The error kafka connect throws:

connect_error

Is this a bug or intentional behavior? And if so, is there any way I can work around this?

The Kafka Connector configuration

{
    "timestamp.column.name": "modified",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "2",
    "topics": "avro_test_table_windowed2",
    "value.converter.schema.registry.url": "http://kafka-schema-registry-service:8081",
    "name": "standard-sink-avro_test_table_windowed2",
    "topic": "avro_test_table_windowed2",
    "auto.create": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "connection.url": <sensitive>
    "insert.mode": "upsert",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "pk.mode": "record_key",
    "pk.fields": "ROWKEY"
}
blueedgenick commented 6 years ago

Hi @zlex7 - i think your problem there is that you are trying to use the entire value of ROWKEY, including the window start time, and that's where you get those unprintable characters from.

The thing to realize is that the KSQL CLI will 'pretty-print' the ROWKEY for you by breaking it into it's constituent parts (the actual grouping values and the window-start timestamp). The ROWKEY has to contain the window time in order to distinguish values for the same grouping keys but different time windows.

In your case, it sounds like you need to use a different field from the message as your JDBC store table key - perhaps a concatenation of the thing and request_method fields. Not sure how you imagine to store the different windows in your destination table ?

codestoned1 commented 6 years ago

Thanks for the response! Yep, it seems like the bytes that are used to store the Window object just don't deserialize to utf-8. I ended up just converting the ROWTIME (which should contain the window start time) to a string and using that as a key. Do you know of any easier way to force easy serialization of a windowed rowkey? For example, somehow converting the window to a string.

yaobukeji123 commented 5 years ago

i have the same question to as you, now i can not import the data in ksql'table to mysql,how do you solve the problem later,please help .thank you

uqix commented 3 years ago

It just seems a little weird that the key of a windowed aggregation is a string but is not a valid one.