confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
124 stars 1.04k forks source link

Unable to Process From Delimited Source With no Enclosure #5348

Open archy-bold opened 4 years ago

archy-bold commented 4 years ago

Is your feature request related to a problem? Please describe.

Passing a delimited stream that doesn't use an enclosures/encapsulator to wrap field causes an error when quotation marks are present in the row:

Error deserializing delimited row (line 1) invalid char between encapsulated token and delimiter

Example stream:

CREATE STREAM EXAMPLE_STREAM
(user_id VARCHAR, first_field VARCHAR, second_field INT)
WITH (KAFKA_TOPIC='example-topic', VALUE_FORMAT='DELIMITED', VALUE_DELIMITER='TAB');

Example data:

1797901177. Some text "containing quotes"   23

Describe the solution you'd like

The ideal solution would be to be able to specify the enclosure/encapsulator when creating the stream as you can set the delimiter.

Describe alternatives you've considered

As in https://github.com/confluentinc/ksql/issues/1438, I tried to create from the source as text, so I could then use SPLIT(row, '\t') to separate the fields, however the only valid VALUE_FORMAT options are AVRO, JSON and DELIMITED.

I think having a String or bytes type makes sense as ksql is obviously able to process string data without it being ordered first.

Any workarounds would be appreciated.

archy-bold commented 4 years ago

If anyone happens to stumble across this thread, I've managed to find a solution to this by using the VALUE_FORMAT KAFKA, which is intended to be used for keys but seems to work here too. It's a two-step process to get the data as follows:

CREATE STREAM TAB_DELIMITED_SRC
(row STRING)
WITH (KAFKA_TOPIC='tab-delimited', VALUE_FORMAT='KAFKA');

CREATE STREAM TAB_DELIMITED_SPLIT
WITH (VALUE_FORMAT='AVRO') AS
SELECT SPLIT(row, '\    ')[1] AS firstcol,
SPLIT(row, '\   ')[2] AS secondcol,
CAST(SPLIT(row, '\  ')[3] AS INT) AS thirdcol_int
FROM TAB_DELIMITED_SRC;

A couple of things to note, indexes begin at 1, not 0. And the tab character needs to be an actual printed tab char escaped with a forward slash. Using '\t' won't work here.

archy-bold commented 4 years ago

Instead of the above, if you're using at least v0.10.0 of ksqlDB, you can use the CHR() function to reliably split on the tab control character as follows: SPLIT(row, CHR(9))