risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.88k stars 569 forks source link

bug: `schemas.enable` not working for debezium sink #16954

Closed tabVersion closed 2 months ago

tabVersion commented 4 months ago

Describe the bug

according to the doc

Only configurable for upsert JSON sinks. By default, this value is false for upsert JSON sinks and true for debezium JSON sinks. If true, RisingWave will sink the data with the schema to the Kafka sink. 

but when I ran

create sink s11 from pg_all_data_types with (connector = 'kafka', topic = 'dbz-json-pk-3', properties.bootstrap.server = 'message_queue:29092', primary_key = 'id') format debezium encode json (schemas.enable='false')

I got the message below, also containing the schema def

{
  "topic": "dbz-json-pk-3",
  "key": "{\"payload\":{\"id\":3},\"schema\":{\"fields\":[{\"field\":\"id\",\"optional\":true,\"type\":\"int64\"}],\"name\":\"RisingWave.dev.pg_all_data_types.Key\",\"optional\":false,\"type\":\"struct\"}}",
  "value": "{\"payload\":{\"after\":{\"c_bigint\":9223372036854775807,\"c_bigint_array\":[9223372036854775807],\"c_boolean\":true,\"c_boolean_array\":[true],\"c_bytea\":\"////\",\"c_bytea_array\":[\"////\"],\"c_date\":2932896,\"c_date_array\":[2932896],\"c_decimal\":\"1.1234567891\",\"c_decimal_array\":[\"-10\"],\"c_double_precision\":10000.0,\"c_double_precision_array\":[10000.0],\"c_integer\":2147483647,\"c_integer_array\":[2147483647],\"c_interval\":315260424000000000,\"c_jsonb\":\"{\\\"foo\\\": \\\"bar\\\"}\",\"c_jsonb_array\":[\"{\\\"foo\\\": \\\"bar\\\"}\"],\"c_real\":10000.0,\"c_real_array\":[10000.0],\"c_smallint\":32767,\"c_smallint_array\":[32767],\"c_time\":86399000000,\"c_time_array\":[86399000000],\"c_timestamp\":253402300799000000,\"c_timestamp_array\":[253402300799000000],\"c_timestamptz\":\"9999-12-31T23:59:59.000000Z\",\"c_timestamptz_array\":[\"9999-12-31T23:59:59.000000Z\"],\"c_varchar\":\"varchar\",\"c_varchar_array\":[\"varchar element1\"],\"id\":3},\"before\":null,\"op\":\"c\",\"source\":{\"db\":\"dev\",\"table\":\"pg_all_data_types\",\"ts_ms\":1716848373272},\"ts_ms\":1716848373272},\"schema\":{\"fields\":[{\"field\":\"before\",\"fields\":[{\"field\":\"id\",\"optional\":true,\"type\":\"int64\"},{\"field\":\"c_boolean\",\"optional\":true,\"type\":\"boolean\"},{\"field\":\"c_smallint\",\"optional\":true,\"type\":\"int32\"},{\"field\":\"c_integer\",\"optional\":true,\"type\":\"int32\"},{\"field\":\"c_bigint\",\"optional\":true,\"type\":\"int64\"},{\"field\":\"c_decimal\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_real\",\"optional\":true,\"type\":\"float\"},{\"field\":\"c_double_precision\",\"optional\":true,\"type\":\"double\"},{\"field\":\"c_varchar\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_bytea\",\"optional\":true,\"type\":\"bytes\"},{\"field\":\"c_date\",\"optional\":true,\"type\":\"int32\"},{\"field\":\"c_time\",\"optional\":true,\"type\":\"int64\"},{\"field\":\"c_timestamp\",\"optional\":true,\"type\":\"int64\"},{\"field\":\"c_timestamptz\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_interval\",\"optional\":true,\"type\":\"int64\"},{\"field\":\"c_jsonb\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_boolean_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_smallint_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_integer_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_bigint_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_decimal_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_real_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_double_precision_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_varchar_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_bytea_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_date_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_time_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_timestamp_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_timestamptz_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_jsonb_array\",\"optional\":true,\"type\":\"string\"}],\"name\":\"RisingWave.dev.pg_all_data_types.Key\",\"optional\":true,\"type\":\"struct\"},{\"field\":\"after\",\"fields\":[{\"field\":\"id\",\"optional\":true,\"type\":\"int64\"},{\"field\":\"c_boolean\",\"optional\":true,\"type\":\"boolean\"},{\"field\":\"c_smallint\",\"optional\":true,\"type\":\"int32\"},{\"field\":\"c_integer\",\"optional\":true,\"type\":\"int32\"},{\"field\":\"c_bigint\",\"optional\":true,\"type\":\"int64\"},{\"field\":\"c_decimal\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_real\",\"optional\":true,\"type\":\"float\"},{\"field\":\"c_double_precision\",\"optional\":true,\"type\":\"double\"},{\"field\":\"c_varchar\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_bytea\",\"optional\":true,\"type\":\"bytes\"},{\"field\":\"c_date\",\"optional\":true,\"type\":\"int32\"},{\"field\":\"c_time\",\"optional\":true,\"type\":\"int64\"},{\"field\":\"c_timestamp\",\"optional\":true,\"type\":\"int64\"},{\"field\":\"c_timestamptz\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_interval\",\"optional\":true,\"type\":\"int64\"},{\"field\":\"c_jsonb\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_boolean_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_smallint_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_integer_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_bigint_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_decimal_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_real_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_double_precision_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_varchar_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_bytea_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_date_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_time_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_timestamp_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_timestamptz_array\",\"optional\":true,\"type\":\"string\"},{\"field\":\"c_jsonb_array\",\"optional\":true,\"type\":\"string\"}],\"name\":\"RisingWave.dev.pg_all_data_types.Key\",\"optional\":true,\"type\":\"struct\"},{\"field\":\"source\",\"fields\":[{\"field\":\"db\",\"optional\":false,\"type\":\"string\"},{\"field\":\"table\",\"optional\":true,\"type\":\"string\"},{\"field\":\"ts_ms\",\"optional\":false,\"type\":\"int64\"}],\"name\":\"RisingWave.dev.pg_all_data_types.Source\",\"optional\":false,\"type\":\"struct\"},{\"field\":\"op\",\"optional\":false,\"type\":\"string\"},{\"field\":\"ts_ms\",\"optional\":false,\"type\":\"int64\"}],\"name\":\"RisingWave.dev.pg_all_data_types.Envelope\",\"optional\":false,\"type\":\"struct\"}}",
  "timestamp": 1716848373272,
  "partition": 0,
  "offset": 0
}

the table content is

 id | c_boolean | c_smallint |  c_integer  |       c_bigint       |   c_decimal   | c_real | c_double_precision | c_varchar | c_bytea  | c_date  |   c_time    |    c_timestamp     |        c_timestamptz        |     c_interval     |    c_jsonb     | c_boolean_array | c_smallint_array | c_integer_array |     c_bigint_array     | c_decimal_array | c_real_array | c_double_precision_array |   c_varchar_array    | c_bytea_array | c_date_array | c_time_array  |  c_timestamp_array   |      c_timestamptz_array      |     c_jsonb_array
----+-----------+------------+-------------+----------------------+---------------+--------+--------------------+-----------+----------+---------+-------------+--------------------+-----------------------------+--------------------+----------------+-----------------+------------------+-----------------+------------------------+-----------------+--------------+--------------------------+----------------------+---------------+--------------+---------------+----------------------+-------------------------------+------------------------
  3 | t         |      32767 |  2147483647 |  9223372036854775807 |  1.1234567891 |  10000 |              10000 | varchar   | \xffffff | 2932896 | 86399000000 | 253402300799000000 | 9999-12-31T23:59:59.000000Z | 315260424000000000 | {"foo": "bar"} | {t}             | {32767}          | {2147483647}    | {9223372036854775807}  | {-10}           | {10000}      | {10000}                  | {"varchar element1"} | {"\\xffffff"} | {2932896}    | {86399000000} | {253402300799000000} | {9999-12-31T23:59:59.000000Z} | {"{\"foo\": \"bar\"}"}

Error message/log

No response

To Reproduce

No response

Expected behavior

No response

How did you deploy RisingWave?

No response

The version of RisingWave

No response

Additional context

No response

xiangjinwu commented 4 months ago

It is a known issue https://github.com/risingwavelabs/risingwave/issues/11539#issuecomment-1855208849 and will be carried out after #16852.

Only configurable for upsert JSON sinks. By default, this value is false for upsert JSON sinks and true for debezium JSON sinks.

This line implies this value is true for debezium JSON sinks and unconfigurable. But yes we shall report an unsupported option warning/error here.