DTStack / chunjun

A data integration framework
https://dtstack.github.io/chunjun/
Apache License 2.0
3.98k stars 1.69k forks source link

[Bug] [doris-x] sink-遇到源端update时,会出现重复统计 #1837

Open waryars opened 10 months ago

waryars commented 10 months ago

Search before asking

What happened

mysql-binlog->kafka->doris时,源端mysql遇到update语句时,目标端会出现重复数据。

源端操作: insert into test_binlog(id,cname,sale_amt,ttime) values(154,'hua',10.20,now()); insert into test_binlog(id,cname,sale_amt,ttime) values(155,'hua',10.20,now()); insert into test_binlog(id,cname,sale_amt,ttime) values(156,'hua',10.20,now());

目标端效果: mysql> select * from test_sink; +------------+-------+-----------+-----------+----------+----------+ | tdate | cname | area_name | addr_name | sale_num | sale_amt | +------------+-------+-----------+-----------+----------+----------+ | 2023-10-24 | hua | 广东省 | 湛江市 | 3 | 30.60 | +------------+-------+-----------+-----------+----------+----------+ 1 row in set (0.01 sec)

源端操作: delete from test_binlog where id=154; update test_binlog set sale_amt=20.20 where id=155;

目标端效果: mysql> select * from test_sink; +------------+-------+-----------+-----------+----------+----------+ | tdate | cname | area_name | addr_name | sale_num | sale_amt | +------------+-------+-----------+-----------+----------+----------+ | 2023-10-24 | hua | 广东省 | 湛江市 | 2 | 30.40 | | 2023-10-24 | hua | 广东省 | 湛江市 | 1 | 10.20 | +------------+-------+-----------+-----------+----------+----------+ 2 rows in set (0.01 sec)

遇到update,则多出一条记录。

What you expected to happen

不应出现重复统计数据

How to reproduce

一、mysqlbinlog->kafka配置:

[root@t-hadoop01 binlog]# cat binlog_kafka.sql CREATE TABLE source ( id int, cname varchar, sale_amt decimal(18,2), ttime timestamp ) WITH ( 'connector' = 'binlog-x' ,'username' = 'root' ,'password' = 'xxxxx' ,'cat' = 'insert,delete,update' ,'url' = 'jdbc:mysql://172.16.44.83:3306/ambari?useSSL=false' ,'host' = '172.16.44.83' ,'port' = '3306' ,'table' = 'ambari.test_binlog' ,'timestamp-format.standard' = 'SQL' );

CREATE TABLE sink ( id int, cname varchar, sale_amt decimal(18,2), ttime timestamp ) WITH ( 'connector' = 'kafka-x' ,'topic' = 'flinkcdc-mysql' ,'properties.bootstrap.servers' = '172.16.56.254:34715' ,'value.format' = 'debezium-json' );

insert into sink select * from source u;

二、kafka->doris配置: [root@t-hadoop01 doris]# cat dim_doris.sql CREATE TABLE source ( id int, cname varchar, sale_amt decimal(18,2), ttime timestamp, PROCTIME AS PROCTIME() ) WITH ( 'connector' = 'kafka-x' ,'topic' = 'flinkcdc-mysql' ,'properties.bootstrap.servers' = '172.16.56.254:34715' ,'scan.startup.mode' = 'latest-offset' ,'value.format' = 'debezium-json' );

CREATE TABLE test_side ( id int, cname varchar, area_name varchar, addr_name varchar, PRIMARY KEY (cname) NOT ENFORCED ) WITH ( 'connector' = 'doris-x', 'url' = 'jdbc:mysql://172.16.44.86:9030', 'schema' = 'dorisdb', 'table-name' = 'test_side', 'username' = 'root', 'password' = '', 'lookup.cache-type' = 'lru', 'lookup.cache-period' = '3000', 'lookup.cache.max-rows' = '20000', 'lookup.cache.ttl' = '3000' );

CREATE TABLE sink ( tdate varchar, cname varchar, area_name varchar, addr_name varchar, sale_num bigint, sale_amt decimal(18,2), PRIMARY KEY (tdate,cname) NOT ENFORCED ) WITH ( 'password' = '', 'connector' = 'doris-x', 'sink.buffer-flush.interval' = '1000', 'sink.all-replace' = 'true', 'sink.buffer-flush.max-rows' = '100', 'schema' = 'dorisdb', 'table-name' = 'test_sink', 'sink.parallelism' = '1', 'url' = 'jdbc:mysql://172.16.44.86:9030', 'username' = 'root' );

INSERT INTO sink SELECT date_format(a.ttime,'yyyy-MM-dd') as tdate, a.cname, max(b.area_name) as area_name, max(b.addr_name) as addr_name, count(*) as sale_num, sum(a.sale_amt) as sale_amt FROM source a left join test_side FOR SYSTEM_TIME AS OF a.PROCTIME AS b on (a.cname=b.cname) group by date_format(a.ttime,'yyyy-MM-dd'),a.cname;

三、表结构信息: 3.1、源端mysql表: CREATE TABLE test_binlog ( id int(11) DEFAULT NULL, cname varchar(50) DEFAULT NULL, sale_amt decimal(18,2) DEFAULT NULL, ttime datetime DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.2、目标端doris表: 维度表结构及其数据: CREATE TABLE test_side ( id int(11) NULL COMMENT 'id', cname varchar(50) NULL COMMENT 'xxx', area_name varchar(50) NULL COMMENT 'xxx', addr_name varchar(50) NULL COMMENT 'xxx' ) ENGINE=OLAP DUPLICATE KEY(id, cname) COMMENT 'test1表' DISTRIBUTED BY HASH(id, cname) BUCKETS 5 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "in_memory" = "false", "storage_format" = "V2", "disable_auto_compaction" = "false" );

insert into test_side(id,cname,area_name,addr_name) values(1,'war','广东省','珠海市'); insert into test_side(id,cname,area_name,addr_name) values(2,'pha','广东省','广州市'); insert into test_side(id,cname,area_name,addr_name) values(3,'hua','广东省','湛江市');

sink表结构: create table test_sink(tdate varchar(10) comment 'id',cname varchar(50) comment '名称',area_name varchar(50) comment 'xx',addr_name varchar(50) comment 'xx',sale_num bigint,sale_amt decimal(18,2)) duplicate key(tdate,cname) comment 'test表' distributed by hash(tdate,cname) buckets 5 PROPERTIES ("replication_num" = "1");

四、执行: 1、sh bin/chunjun-local.sh -job chunjun-examples/sql/binlog/binlog_kafka.sql 2、sh bin/chunjun-local.sh -job chunjun-examples/sql/doris/dim_doris.sql

五、问题重现: mysql-binlog->kafka->doris时,源端mysql遇到update语句时,目标端会出现重复数据。

源端操作: insert into test_binlog(id,cname,sale_amt,ttime) values(154,'hua',10.20,now()); insert into test_binlog(id,cname,sale_amt,ttime) values(155,'hua',10.20,now()); insert into test_binlog(id,cname,sale_amt,ttime) values(156,'hua',10.20,now());

目标端效果: mysql> select * from test_sink; +------------+-------+-----------+-----------+----------+----------+ | tdate | cname | area_name | addr_name | sale_num | sale_amt | +------------+-------+-----------+-----------+----------+----------+ | 2023-10-24 | hua | 广东省 | 湛江市 | 3 | 30.60 | +------------+-------+-----------+-----------+----------+----------+ 1 row in set (0.01 sec)

源端操作: delete from test_binlog where id=154; update test_binlog set sale_amt=20.20 where id=155;

目标端效果: mysql> select * from test_sink; +------------+-------+-----------+-----------+----------+----------+ | tdate | cname | area_name | addr_name | sale_num | sale_amt | +------------+-------+-----------+-----------+----------+----------+ | 2023-10-24 | hua | 广东省 | 湛江市 | 2 | 30.40 | | 2023-10-24 | hua | 广东省 | 湛江市 | 1 | 10.20 | +------------+-------+-----------+-----------+----------+----------+ 2 rows in set (0.01 sec)

遇到update,则多出一条记录。

Anything else

No response

Version

master

Are you willing to submit PR?

Code of Conduct

zoudaokoulife commented 6 months ago

@waryars 在使用jdbc 模式下 doris 的表使用 DUPLICATE KEY(id, cname) ==》“ 而在建表语句中指定的 DUPLICATE KEY,只是用来指明底层数据按照那些列进行排序。(更贴切的名称应该为 “Sorted Column” 参考:https://doris.apache.org/zh-CN/docs/data-table/data-model/ 如果你要达到更新的效果,将test_sink 表的属性修改为 UNIQUE KEY(tdate, cname)

waryars commented 6 months ago

试过用Unique表模型,启动都会报错的

waryars commented 6 months ago

为什么不是以sink端的指定的pimary key去处理~

zoudaokoulife commented 6 months ago

@waryars 1:使用unique 模型报错信息是啥呢?帮忙发下。我本地验证过是正常的 2:“为什么不是以sink端的指定的pimary key去处理~“ 这个写入逻辑受限于connecor 的数据模型逻辑。类似上面doris 的数据模型里面,你指定的主键的逻辑只是排序, 那写入的时候也是按doris 的调用逻辑。

waryars commented 6 months ago

Caused by: java.sql.SQLException: errCode = 2, detailMessage = errCode = 2, detailMessage = Column[area_name] is not key column or storage model is not duplicate or column type is float or double. at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)

waryars commented 6 months ago

把表模型一改,很好重现的

waryars commented 6 months ago

我把 area_name,addr_name 字段类型的字段都加到key范围里了,其它字段的类型改为:sale_num float,sale_amt double

但还是报错:

Column[sale_num] is not key column or storage model is not duplicate or column type is float or double.

当源表插入第二条记录时报错: insert into test_binlog(id,cname,sale_amt,ttime) values(154,'hua',10.20,now()); insert into test_binlog(id,cname,sale_amt,ttime) values(155,'hua',10.20,now()); insert into test_binlog(id,cname,sale_amt,ttime) values(156,'hua',10.20,now());