risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.97k stars 575 forks source link

feat(sink): jsonb column to avro record/map/union type #16941

Open xiangjinwu opened 4 months ago

xiangjinwu commented 4 months ago

I think I am facing a similar issue. I am trying to sink jsonb values from a materialized view into an avro schema in kafka, with map and record types. I am getting the following errors:

Encode error: encode foo error: cannot encode jsonb column as {"name":"foo","type":"record","fields":[{"name":"id","type":"string"},{"name":"name","type":"string"}]} field

Or

Encode error: encode foo error: cannot encode jsonb column as {"type":"map","values":"string"} field

Originally posted by @maingoh in https://github.com/risingwavelabs/risingwave/issues/11699#issuecomment-2130325570

xiangjinwu commented 4 months ago

@maingoh In your specific case, could you try converting the loosely-typed jsonb to a struct before sinking?

jsonb_populate_record(null::struct<id varchar, name varchar>, foo)

(Note the function jsonb_populate_record is only available since 1.9.0.)

As for a general case, note that true and 23 are also valid jsonb values but would not be able to be sinked as an avro record or map. In the workaround above we are explicitly saying null shall be used as a default in these unexpected cases.

Does this address your issue?

maingoh commented 4 months ago

Thank you, I managed to sink a simple struct as an avro record but I don't find a way to generate a type that is convertible to an avro map. Is a jsonb object directly convertible to a map ? I actually have a joined table with a name, and some other columns. I would like to build a map<name, record<column_2, column_3, ...>>. I tried building a jsonb object of struct but it does not seem to work.

I would also need a way to union different types as avro allow it. For example we have a jsonb column which can be different types. I feel it does not exist (yet ?), what would be the best way to sink such values ?

As for a general case, note that true and 23 are also valid jsonb values but would not be able to be sinked as an avro record or map. In the workaround above we are explicitly saying null shall be used as a default in these unexpected cases.

Good to know! I actually didn't need to use jsonb_populate_record yet as I was building my record manually.

xiangjinwu commented 4 months ago

a type that is convertible to an avro map

You are right it is not available in RisingWave yet. Although jsonb feels close, there can be ambiguous cases, like {"foo": "AA=="} into avro map<bytes>. Instead, we plan to support a native map data type in RisingWave https://github.com/risingwavelabs/risingwave/issues/13387#issuecomment-1884348800

I would also need a way to union different types as avro allow it. For example we have a jsonb column which can be different types. I feel it does not exist (yet ?)

There is no native union data type either and it is not part of the plan. The plan is to allow sinking specific types, i.e. both int columns and bool columns can be sinked as avro ["null", "int", "boolean"]. If the upstream is a jsonb column, it would need to be split into strongly typed streams beforehand:

create sink variant_int as select foo::int from mixed where jsonb_typeof(foo) = 'number' ...
create sink variant_bool as select foo::bool from mixed where jsonb_typeof(foo) = 'boolean' ...

To clarify:

maingoh commented 4 months ago

Although jsonb feels close, there can be ambiguous cases, like {"foo": "AA=="} into avro map

Is it really possible to store bytes into a jsonb field ? At least JSON support only strings (usually bytes are b64 encoded strings). So for me it sounds quite natural to convert it natively to map<string, string>. And if the user want to cast it to bytes, he can ask for it explicitely eventually.

If the upstream is a jsonb column, it would need to be split into strongly typed streams beforehand

I agree that the union type is not very needed as a risingwave type. However doing the conditional check on types is something that RW could support natively for most types, it is not very user friendly to do so in SQL. If the jsonb column is compatible with the avro format, why not try to serialize it without needing some strong casts ? And having a way to default to null the uncompatible ones like in the example with jsonb_populate_record(null::struct<id varchar, name varchar>, foo). In my case I am sure that the avro will match the jsonb column, but one field can be from different types.

There could be a parameter on the sink side to treat jsonb columns as avro ones while sinking. This way it would still be stored as unstructured JSONB on RW (allowing a single json field to have different type) but would be converted in the sink only using this mapping:

JSONB AVRO
array array
object record/map
integer int or long ?
boolean boolean
number float
string string
null null

All other very specific (dates, bytes, smallint, float) would need an explicit conversion in the query itself. But since most JSON column only have the types above, it would save a lot of boilerplate to not have to handle this on the user side.

In a similar way, on the source side, RW would find the best native type if possible otherwise fallback to JSONB in case it is not possible.

It could be an optional parameter allow_jsonb_fallback_conversion=True. Or maybe a more explicit way of selecting which columns can be converted or not to/from jsonb. Just an idea :)

maingoh commented 4 months ago

Here is another mapping that would fit my use case https://materialize.com/docs/sql/create-sink/kafka/#avro:

xiangjinwu commented 4 months ago

My major concern above is about adding a native union type, or sinking arbitrary jsonb as avro union. There was one core difference I did not mention and may have led to some confusions:

RisingWave does not automatically generate and register the avro schema to schema registry. Instead, it accepts an existing one read from schema registry, so it need to do the following validation before seeing any concrete records:

If we were to generate the avro schema, we could generate a ["null", {"type": "string", "connect.name": "io.debezium.data.Json"}] so that all possible jsonb values can fit in it. The schema auto-register deature is also in plan.

Does this address your concern?

maingoh commented 4 months ago
  1. We will be working on a native map type soon.
  2. Sinking nullable as avro-union-with-null is already supported today.
  3. Sinking jsonb as avro-string is trivial and the support of connect.name is also work in progress.

If 1. and 3. land quickly I feel handling union is less of a priority. In any case the native union type feels too much and can be handled using jsonb. I have not seen any database supporting it (I didn't spend much time though). However I feel being able to source and sink array/maps/union as jsonb might be faster to implement than having a fully integrated native RW map, and will handle the union case.

Can a RisingWave jsonb column fit into avro ["int", "boolean"] field? It depends...

In this case I would say:

maingoh commented 4 months ago

@xiangjinwu any approximate idea when 1. and 3. would be available ?