-- Execute in Postgres
create table employee_paths (id int primary key, name varchar, path varchar);
-- Execute in RisingWave
CREATE TABLE employees (
id INT PRIMARY KEY,
name VARCHAR,
manager_id INT
);
INSERT INTO employees VALUES
(333, 'Yasmina', NULL),
(198, 'John', 333),
(692, 'Tarek', 333),
(29, 'Pedro', 198),
(4610, 'Sarah', 29),
(72, 'Pierre', 29),
(123, 'Adil', 692);
create table employee_paths (id int primary key, name varchar, path varchar);
create materialized view employee_paths_mv as SELECT id, name, CAST(id AS varchar) path
FROM employees
WHERE manager_id IS NULL
UNION ALL
SELECT e.id, e.name, CONCAT(ep.path, ',', e.id)
FROM employee_paths AS ep JOIN employees AS e
ON ep.id = e.manager_id;
CREATE SINK employee_paths_sink FROM employee_paths_mv WITH (
connector='jdbc',
jdbc.url='jdbc:postgresql://127.0.0.1:5432/dev',
table.name='employee_paths',
primary_key='id',
type='upsert'
);
CREATE TABLE employee_paths_source (
id int primary key, name varchar, path varchar
) WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '5432',
database.name = 'dev',
schema.name = 'public',
table.name = 'employee_paths'
);
create sink employee_paths_x into employee_paths from employee_paths_source;
select * from employee_paths;
cc: @chenzl25, I just saw your tips to the users, I wonder if it is ok that we put this workaround in the documentation.
If ok, I will polish the whole example a bit and provide the whole context.
cc: @chenzl25, I just saw your tips to the users, I wonder if it is ok that we put this workaround in the documentation. If ok, I will polish the whole example a bit and provide the whole context.