ArroyoSystems / arroyo

Distributed stream processing engine in Rust
https://arroyo.dev
Apache License 2.0
3.61k stars 201 forks source link

Enable Automatic Schema Retrieval in DDL for Kafka Sources Using Confluent Schema Registry #692

Open hazelnut-99 opened 1 month ago

hazelnut-99 commented 1 month ago

When creating a new source connection through the web UI and selecting Avro as the data format with Confluent Schema Registry as the schema type, users can omit specifying the schema, as it is automatically loaded from the Confluent Schema Registry.

However, when defining a source using DDL within a pipeline, it currently requires explicit schema definition. For instance, the following DDL statement:

CREATE TABLE my_kafka_source WITH (
    'connector' = 'kafka',
    'avro.confluent_schema_registry' = 'true',
    'bootstrap_servers' = 'my_server',
    'schema_registry.endpoint' = 'my_endpoint',
    'type' = 'source',
    'topic' = 'my_topic',
    'bad_data': 'drop',
    'source.offset': 'latest',
    'source.read_mode': 'read_committed',
    'sink.commit_mode' = 'at_least_once',
    'format' = 'avro'
);

leads to an error when subsequently trying to query the table:

SELECT my_field FROM my_kafka_source;

Error: Schema error: No field named my_field.

It would be nice if ad-hoc DDLs inside pipeline definition could support automatic schema retrieval from the Confluent Schema Registry, similar to the functionality available in the web UI.

mwylde commented 1 month ago

Agreed, this would be a great feature. It's a bit tricky because the schema is needed for planning, so this would add a dependency on schema registry as part of SQL planning. The schema might also change, which means that the same query might plan today but fail tomorrow. There also wouldn't be feedback for the user as to what the schema is. I think these issues are surmountable, but will require some design work.