timeplus-io / proton

A stream processing engine and database, and a fast and lightweight alternative to ksqlDB and Apache Flink, 🚀 powered by ClickHouse
https://timeplus.com
Apache License 2.0
1.58k stars 69 forks source link

Failed to read partial columns from Kafka source with data format CSV #817

Open yl-lisen opened 3 months ago

yl-lisen commented 3 months ago

Describe what's wrong

How to reproduce

CREATE EXTERNAL STREAM account
(
  `id` int,
  `name` string
)
SETTINGS type = 'kafka', topic = 'topic_account', brokers = 'stream-store:9092', data_format = 'CSV';

--- Ingest Kafka message: "1, 1"

--- Read all columns is ok
timeplusd :) select * from account;

SELECT
  *
FROM
  account

Query id: a15b8db7-2af1-4d4c-9a9c-8b074518935d

┌─id─┬─name─┐
│  1 │ 1    │
└────┴──────┘

--- Read partial column but no output.
select id from account;

SELECT
  id
FROM
  account

Query id: 2d032487-becb-4e26-b773-7733dd5521e7

↗ Progress: 0.00 rows, 0.00 B (0.00 rows/s., 0.00 B/s.)                                                                  ^
Cancelling query.

there is error log

2024.08.12 20:41:51.234635 [ 2779177 ] {af315de4-b892-455b-949f-0c013580efcd} <Error> account.rdkafka#consumer-6: Failed to parse message at 2: Expected end of line: (at row 1)
: 
Row 1:
Column 0,   name: id, type: int32, parsed text: "1"
ERROR: There is no line feed. "1" found instead.
 It's like your file has more columns than expected.
And if your file has the right number of columns, maybe it has an unquoted string value with a comma.

Error message and/or stacktrace

Additional context

zliang-min commented 3 months ago

This is because it uses the columns from SELECT to create the InputFormat. For formats with schema information, like Protobuf, Avro, etc. they know how to get the wanted columns.

In order to make formats like CSV works, we will need to either initialize such format with the full table schema ( which means it will generate the full chunk of columns no matter how many columns are selected by the query. So it will be less efficient, but this will be a simpler change ), or we will need to refactor such formats and make them be able to skip reading some fields ( they will need to know the column mapping to figure out which fields to skip. The format will still need to parse the data, but it does not need to generate the unneeded columns).

yl-lisen commented 3 months ago

agree it, we can make this scenario works first, next, do partial parsing optimization