risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.78k stars 561 forks source link

Support Avro ref type in source #17020

Open xxchan opened 3 months ago

xxchan commented 3 months ago

We added the support for Debezium avro, but not otherwise, which makes no sense.

A little background: In debezium, there are after and before sharing the same schema.

https://github.com/risingwavelabs/risingwave/blob/8dfae832334ae1dc0585dcae4b3071e9ee6c9b1d/src/connector/src/parser/debezium/avro_parser.rs#L83

BTW, this line is introduced in this huge refactor: https://github.com/risingwavelabs/risingwave/pull/10096, which is too confusing and scary. How does it work before?

tabVersion commented 3 months ago

resolve ref in schema is an internal API and has some bugs, we use our patched crate in Cargo.toml. Not sure @xiangjinwu includes the fix when migrating deps.

xiangjinwu commented 3 months ago

Not sure @xiangjinwu includes the fix when migrating deps.

Not yet.

resolve ref in schema is an internal API and has some bugs, we use our patched crate in Cargo.toml.

The bug is because we use the internal API the wrong way, or the API itself is poorly designed and hard to use. In short, we need to replaces almost all references to Schema into ResolvedSchema and use functions that take schemata (vec of schemas) rather than a single schema. I will work with @xxchan on resolving this.

xiangjinwu commented 2 months ago

The current cloning hack implementation fails to handle the following situation:

{
  "type": "record",
  "name": "Root",
  "fields": [
    {
      "name": "f1",
      "type": {
        "type": "record",
        "name": "Nested",
        "fields": [
          {
            "name": "f1",
            "type": {
              "type": "enum",
              "name": "Case",
              "symbols": ["A", "B", "C"]
            }
          },
          {
            "name": "f2",
            "type": "Case"
          }
        ]
      }
    },
    {
      "name": "f2",
      "type": "Nested"
    }
  ]
}

In ResolvedAvroSchema::resolved_schema the field f2.f2 is not resolved and still a ref:

``` Record( RecordSchema { name: Name { name: "Root", namespace: None, }, aliases: None, doc: None, fields: [ RecordField { name: "f1", doc: None, aliases: None, default: None, schema: Record( RecordSchema { name: Name { name: "Nested", namespace: None, }, aliases: None, doc: None, fields: [ RecordField { name: "f1", doc: None, aliases: None, default: None, schema: Enum( EnumSchema { name: Name { name: "Case", namespace: None, }, aliases: None, doc: None, symbols: [ "A", "B", "C", ], default: None, attributes: {}, }, ), order: Ascending, position: 0, custom_attributes: {}, }, RecordField { name: "f2", doc: None, aliases: None, default: None, schema: Enum( EnumSchema { name: Name { name: "Case", namespace: None, }, aliases: None, doc: None, symbols: [ "A", "B", "C", ], default: None, attributes: {}, }, ), order: Ascending, position: 1, custom_attributes: {}, }, ], lookup: { "f1": 0, "f2": 1, }, attributes: {}, }, ), order: Ascending, position: 0, custom_attributes: {}, }, RecordField { name: "f2", doc: None, aliases: None, default: None, schema: Record( RecordSchema { name: Name { name: "Nested", namespace: None, }, aliases: None, doc: None, fields: [ RecordField { name: "f1", doc: None, aliases: None, default: None, schema: Enum( EnumSchema { name: Name { name: "Case", namespace: None, }, aliases: None, doc: None, symbols: [ "A", "B", "C", ], default: None, attributes: {}, }, ), order: Ascending, position: 0, custom_attributes: {}, }, RecordField { name: "f2", doc: None, aliases: None, default: None, schema: Ref { name: Name { name: "Case", namespace: None, }, }, order: Ascending, position: 1, custom_attributes: {}, }, ], lookup: { "f1": 0, "f2": 1, }, attributes: {}, }, ), order: Ascending, position: 1, custom_attributes: {}, }, ], lookup: { "f1": 0, "f2": 1, }, attributes: {}, }, ) ```