risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.87k stars 569 forks source link

Obscure error message on wrong source definition #3224

Open TennyZhuang opened 2 years ago

TennyZhuang commented 2 years ago

Describe the bug

When I create a source with wrong arguments, an obscure error message was reported:

dev=> CREATE  SOURCE IF NOT EXISTS test (
id int,
host int,
device_id varchar,
concentration int,
org_timestamp timestamp,
recv_timestamp timestamp,
state int,
offline_count int,
fault_count int,
alert_count int,
normal_count int,
slight_count int,
serious_count int,
deadly_count int,
) with (
connector='kafka',
kafka.topic='new_test',
kafka.scan.startup.mode='latest',
kafka.brokers='[127.0.0.1:9092]'
) row format json;
ERROR:  internal error: internal error: Meta data fetch error: BrokerTransportFailure (Local: Broker transport failure)

In fact, the 'kafka.brokers'='[127.0.0.1:9092]' is wrong, and the square brackets should be removed. But I can't address it from the error message.

Expected behavior

ERROR: Invalid argument "kafka.brokers"

TennyZhuang commented 2 years ago

See also #3015

BugenZhao commented 2 years ago

"Introduce SourceError" 😄

Graphcalibur commented 2 years ago

Seems like the error occurs here

https://github.com/singularity-data/risingwave/blob/76549f92cd911e2383b48cbbd0ea36b78afd130f/src/connector/src/kafka/enumerator/client.rs#L206-L210

Unfortunately, the error only occurs once the client tries to fetch metadata, so I think the only way to provide more specific error messages would be to validate the brokers somehow in KafkaSplitEnumerator::new() (since that's where the other arguments of the Kafka source are validated).