risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.79k stars 560 forks source link

bug(source): `schema.registry.name.strategy` is not followed for `encode protobuf` #18319

Open xiangjinwu opened 2 weeks ago

xiangjinwu commented 2 weeks ago

11384

https://github.com/risingwavelabs/risingwave/blob/edb149317bad534730c09d04e639a622ba19e5fa/src/frontend/src/handler/create_source.rs#L195-L208

Just acknowledging this fact. It will be fixed after format_encode_options parsing logic unification.

xiangjinwu commented 2 weeks ago

Not verified end-to-end:

diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs
index 3f83c25fb3..bfede244da 100644
--- a/src/frontend/src/handler/create_source.rs
+++ b/src/frontend/src/handler/create_source.rs
@@ -193,20 +193,11 @@ async fn extract_debezium_avro_table_pk_columns(

 /// Map a protobuf schema to a relational schema.
 async fn extract_protobuf_table_schema(
-    schema: &ProtobufSchema,
+    info: &StreamSourceInfo,
     with_properties: &WithOptionsSecResolved,
     format_encode_options: &mut BTreeMap<String, String>,
 ) -> Result<Vec<ColumnCatalog>> {
-    let info = StreamSourceInfo {
-        proto_message_name: schema.message_name.0.clone(),
-        row_schema_location: schema.row_schema_location.0.clone(),
-        use_schema_registry: schema.use_schema_registry,
-        format: FormatType::Plain.into(),
-        row_encode: EncodeType::Protobuf.into(),
-        format_encode_options: format_encode_options.clone(),
-        ..Default::default()
-    };
-    let parser_config = SpecificParserConfig::new(&info, with_properties)?;
+    let parser_config = SpecificParserConfig::new(info, with_properties)?;
     try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME);
     try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD);
     consume_aws_config_from_options(format_encode_options);
@@ -392,7 +383,7 @@ pub(crate) async fn bind_columns_from_source(

             Some(
                 extract_protobuf_table_schema(
-                    &protobuf_schema,
+                    &stream_source_info,
                     &options_with_secret,
                     &mut format_encode_options_to_consume,
                 )