risingwavelabs / risingwave

SQL stream processing, analytics, and management. We decouple storage and compute to offer instant failover, dynamic scaling, speedy bootstrapping, and efficient joins.
https://www.risingwave.com/slack
Apache License 2.0
6.42k stars 524 forks source link

[Avro] Support Union types in Avro Schemas with Schema registry #16273

Open tollercode opened 1 month ago

tollercode commented 1 month ago

Is your feature request related to a problem? Please describe.

When decoding messages, that use a union type within the Avro Schema, RW fails to decode these as currently only 1 Type is supported per field. This requires, that Schemas need to get simplified, e.g. use only string types, which decreases the ability to use strong schemas.

Describe the solution you'd like

ksql introduced an ability to support union types inside avro schemas by creating a 'struct' for this field, that can hold potentially all different types. E.g. a union type of [null, boolean, double, string] becomes --> STRUCT<BOOLEAN BOOLEAN, DOUBLE DOUBLE, STRING VARCHAR(STRING)>

In addition a wildcard struct operator was introduced, to access the struct without knowing the exact field sinside

See: https://www.confluent.io/blog/announcing-ksqldb-0-27-1/#multi-schema-protobuf-avro-topics

This solution would also work for 'oneOf' types in Protobuf and JSON schemas.

Describe alternatives you've considered

A possible alternative is, to cast union types simply into 'strings'. Probably easier to implement, but this will again loosen the strong typing approach.

Additional context

No response

tabVersion commented 1 month ago

It may take some time to reach a consensus on whether it is the desired behavior for RisingWave.

tollercode commented 1 month ago

Just as a reference:

KSQL creates automatically this table schemas using this provided Avro Schema:

{
    "namespace": "com.***.***.mqtt",
    "name": "als.DataMessage",
    "type": "record",
    "fields": [
        {
            "name": "metrics",
            "type": {
                "type": "array",
                "items": {
                    "name": "als_data_metric",
                    "type": "record",
                    "fields": [
                        {
                            "name": "id",
                            "type": "string",
                        },
                        {
                            "name": "name",
                            "type": "string",
                        },
                        {
                            "name": "norm_name",
                            "type": [
                                "null",
                                "string"
                            ],
                            "default": null,
                        },
                        {
                            "name": "uom",
                            "type": [
                                "null",
                                "string"
                            ],
                            "default": null,
                        },
                        {
                            "name": "data",
                            "type": {
                                "type": "array",
                                "items": {
                                    "name": "dataItem",
                                    "type": "record",
                                    "fields": [
                                        {
                                            "name": "ts",
                                            "type": "string",
                                            "doc": "Timestamp of the metric."
                                        },
                                        {
                                            "name": "value",
                                            "type": [
                                                "null",
                                                "boolean",
                                                "double",
                                                "string"
                                            ],
                                            "doc": "Value of the metric."
                                        }
                                    ]
                                }
                            },
                            "doc": "The data message"
                        }
                    ],
                    "doc": "A metric object"
                }
            },
            "doc": "A list of metrics."
        }
    ]
}

KSQL Table

METRICS | ARRAY<STRUCT<ID VARCHAR(STRING), NAME VARCHAR(STRING), NORM_NAME VARCHAR(STRING), UOM VARCHAR(STRING), DATA ARRAY<STRUCT<TS VARCHAR(STRING), VALUE STRUCT<BOOLEAN BOOLEAN, DOUBLE DOUBLE, STRING VARCHAR(STRING)>>>>>