这是一个基于flink-cdc的同步工具,将mysql数据同步到doris中.
一般情况下,我们启动一个cdc程序,书写过滤正则的时候,填写准确库名,表名则用*,表示我们这个cdc程序读取该库所有表.
这时,我们将面临如下问题:
此工具就是用于解决如上问题,做了如下操作:
当前mysql中字段改名,doris中对应的表是新增一个同名字段.因为doris不支持字段改名.
doris在1.2版本增加了字段改名功能,后续此处将同步修改.
doris列重命名使用,见: https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-RENAME/
支持同步的mysql ddl语句:
tableName
add column column1
type
tableName
drop column column1
tableName
modify [column] column1
type
tableName
change [column] column1
column2
type
其他ddl语句处理:
不在doris执行同步,但是将记录该ddl并插入到doris记录表中,表名:ops.cdcDdlRecord
CREATE TABLE `cdcDdlRecord` (
`time` datetime NULL COMMENT "日志时间",
`host` varchar(60) NULL COMMENT "mysql host",
`mysqlDb` varchar(60) NULL COMMENT "mysql database name",
`mysqlTable` varchar(60) NULL COMMENT "mysql table name",
`isFromCdc` varchar(10) NULL COMMENT "本条记录是否来自flinkCdc程序",
`dorisTable` varchar(60) NULL COMMENT "对应的doris table name",
`mysqlDdl` varchar(256) NULL COMMENT "经过处理后的mysql ddl",
`dorisDdl` varchar(256) NULL COMMENT "程序转化得到doris ddl",
`isExecute` varchar(10) NULL COMMENT "是否尝试在doris执行,1表示是",
`isSucceed` varchar(10) NULL COMMENT "在doris中执行是否成功,1表示是",
`columnsStatus` varchar(10) NULL COMMENT "sink列名列表状态:0=不需要更新列表,1=需要但是构建失败,2=需要且成功构建新的",
`note` text NULL COMMENT "备注,后续人工检查可以添加此字段内容"
) ENGINE=OLAP
DUPLICATE KEY(`time`, `host`, `mysqlDb`, `mysqlTable`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`time`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);
doris不支持列名修改
,当mysql进行列名修改的时候:
ALTER table mysqlTable change phone2 phone3 varchar(140)
,在doris中执行时候,将变为
ALTER table dorisTable add column phone3 varchar(140) after phone2
doris的列修改是异步执行的,可能失败(
主要是列类型修改的时候,一般增加和删除列可以立即获取到结果,将在flink日志中打印相关信息),在doris执行如下命令可以查看:
SHOW ALTER TABLE COLUMN WHERE TableName = "dorisTable" ORDER BY CreateTime DESC LIMIT 1;
该功能是指程序运行期间的修改同步