Closed chyueyi closed 1 year ago
这里是flink任务运行需要的Flink SQL。多表和整库同步的逻辑可以只关注多表部分,忽略单表的部分,可能会更容易理解。 多表的核心是: 1.mysql按正则读多表 'database-name' = 'user_db', 'table-name' = 'userid*' (这部分文档中写的确实有问题) 2.doris写设置如下参数: 'sink.multiple.enable' 'sink.multiple.format' 'sink.multiple.database-pattern' 'sink.multiple.table-pattern'
这里是flink任务运行需要的Flink SQL。多表和整库同步的逻辑可以只关注多表部分,忽略单表的部分,可能会更容易理解。 多表的核心是: 1.mysql按正则读多表 'database-name' = 'user_db', 'table-name' = 'userid*' (这部分文档中写的确实有问题) 2.doris写设置如下参数: 'sink.multiple.enable' 'sink.multiple.format' 'sink.multiple.database-pattern' 'sink.multiple.table-pattern'
感谢大佬的回答👍🏻
来信已收到,谢谢!
这里是flink任务运行需要的Flink SQL。多表和整库同步的逻辑可以只关注多表部分,忽略单表的部分,可能会更容易理解。 多表的核心是: 1.mysql按正则读多表 'database-name' = 'user_db', 'table-name' = 'userid*' (这部分文档中写的确实有问题) 2.doris写设置如下参数: 'sink.multiple.enable' 'sink.multiple.format' 'sink.multiple.database-pattern' 'sink.multiple.table-pattern'
请问多表同步的时候mysql cdc source 和 doris sink表字段映射怎么写,官方文档如下:
./bin/sql-client.sh -l ./opt/connectors/mysql-cdc-inlong/ -l ./opt/connectors/doris/
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
[INFO] Session property has been set.
Flink SQL> SET 'table.dynamic-table-options.enabled' = 'true';
[INFO] Session property has been set.
Flink SQL> CREATE TABLE cdc_mysql_source (
> id int
> ,name VARCHAR
> ,dr TINYINT
> ,PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc-inlong',
> 'hostname' = 'localhost',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '123456',
> 'database-name' = 'test',
> 'table-name' = 'cdc_mysql_source'
> );
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE cdc_doris_sink (
> id INT,
> name STRING,
> dr TINYINT
> ) WITH (
> 'connector' = 'doris-inlong',
> 'fenodes' = 'localhost:8030',
> 'username' = 'root',
> 'password' = '000000',
> 'sink.enable-delete' = 'true',
> 'sink.multiple.enable' = 'true',
> 'sink.multiple.format' = 'canal-json',
> 'sink.multiple.database-pattern' = '${database}',
> 'sink.multiple.table-pattern' = 'doris_${table}'
> );
[INFO] Execute statement succeed.
-- 支持删除事件同步(sink.enable-delete='true'), 需要 Doris 表开启批量删除功能
Flink SQL> insert into cdc_doris_sink select * from cdc_mysql_source /*+ OPTIONS('server-id'='5402') */;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 30feaa0ede92h6b6e25ea0cfda26df5e
这样写可以多表同步
SET 'execution.checkpointing.interval' = '3s';
SET 'table.dynamic-table-options.enabled' = 'true';
CREATE TABLE cdc_mysql_source (
`data` BYTES METADATA FROM 'meta.data_canal' VIRTUAL
) WITH (
'inlong.metric.labels' = 'groupId=1&streamId=1&nodeId=1',
'migrate-all' = 'true',
'connector' = 'mysql-cdc-inlong',
'scan.incremental.snapshot.enabled' = 'false',
'hostname' = '172.17.**.**',
'database-name' = 'c_test3',
'username' = 'root',
'password' = '123456',
'table-name' = 'c_test3\.test,c_test3\.test2'
);
CREATE TABLE cdc_doris_sink (
`data` BYTES
)WITH (
'connector' = 'doris-inlong',
'fenodes' = '172.17.**.**:8030',
'username' = 'root',
'password' = '123456',
'sink.enable-delete' = 'true',
'sink.multiple.enable' = 'true',
'sink.multiple.format' = 'canal-json',
'sink.multiple.database-pattern' = '${database}',
'sink.multiple.table-pattern' = '${table}'
);
insert into cdc_doris_sink select * from cdc_mysql_source;
官方文档这一块写的不是很清楚,多表load用的是单表写入创建的表,多表load创建的表没有用到,还是看不出来多表或者整库同步时要怎么写,请大佬们给个完整的同步案例。