apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

[SUPPORT] FlinkCDC+Hudi0.10 failure when delete or update history data but success on currentdate data #7177

Open boy88888888 opened 1 year ago

boy88888888 commented 1 year ago

I recently made a small demo about CDC, With the Flink CDC capture Mysql data changes and Sink to Hudi, synchronized to the hive. But when I update, or delete data, it failure when delete or update history data but success on currentdate data

Environment Description

To Reproduce mysql has a table users withs 2 rows id name ts 8 scala 2022-10-12 07:13:40.000 24 flink1.13 flink1.13 2022-11-10 10:41:18.000

then
1.update users set name='python' wherer id=8

  1. delete from users wherer id=8 3.update users set name='flink1.13bug' wherer id=24 Expected behavior Finished step 3, as expected hive synchronous table can see id = 8 records deleted and id = 24 records be updated But id = 8 records are still in hive synchronous table and its name still eq 'scala ',so update invalid too. but id = 24 success

Additional context

then I open flink sql client ,saw the sink table changes +----+----------------------+--------------------------------+-------------------------+-------------------------+--------------------------------+ | op | id | name | birthday | ts | partition | +----+----------------------+--------------------------------+-------------------------+-------------------------+--------------------------------+

| +I | 8 | scala | 2022-10-12 15:13:40.000 | 2022-10-12 15:13:40.000 | 20221012 | | -U | 8 | scala | 2022-10-12 07:13:40.000 | 2022-10-12 07:13:40.000 | 20221012 | | +U | 8 | python | 2022-10-12 07:13:40.000 | 2022-10-12 07:13:40.000 | 20221012 | | +I | 8 | scala | 2022-10-12 15:13:40.000 | 2022-10-12 15:13:40.000 | 20221012 | | -D | 8 | python | 2022-10-12 07:13:40.000 | 2022-10-12 07:13:40.000 | 20221012 | | +I | 8 | scala | 2022-10-12 15:13:40.000 | 2022-10-12 15:13:40.000 | 20221012 |

Problem is very strange, in each -U. + U, - D after operation, there will be + I, to insert old data to the sink list

I Don't know why, MySQL data has been modified,There should not be rolled back

the current data 20221110(id=24) data OK, it insert(+I) a new name into table | -U | 24 | flink1.13 | 2022-11-10 09:37:14.000 | 2022-11-10 09:37:14.000 | 20221110 | | +U | 24 | flink1.13bug | 2022-11-10 09:37:14.000 | 2022-11-10 09:37:14.000 | 20221110 | | +I | 24 | flink1.13bug | 2022-11-10 09:37:14.000 | 2022-11-10 09:37:14.000 | 20221110 |

This kind of problem did not show when sink to ES, kafka .

Is this a hudi problem?, hope to get more information

-----------sql --------------- --SOURCE TABLE CREATE TABLE mysql_users ( id BIGINT PRIMARY KEY NOT ENFORCED , name STRING, birthday TIMESTAMP(3), ts TIMESTAMP(3)

) with ('connector'='mysql-cdc' ,'hostname'='xxxx' ,'port'='3306' ,'username'='xxx' ,'password'='xxx' ,'debezium.snapshot.mode'='initial' ,'database-name'='flink_cdc_test' ,'table-name'='users')

--SINK TABLE CREATE TABLE users_mor_sync2hive01( id bigint , name string, birthday TIMESTAMP(3), ts TIMESTAMP(3), partition VARCHAR(20), primary key(id) not enforced
)partitioned by (partition string) with( 'connector'='hudi', 'path'= 'hdfs://nameservice1/hudidatas/hudi-warehouse/users_mor_sync2hive01' , 'hoodie.datasource.write.recordkey.field'= 'id' , 'write.precombine.field'= 'ts'
, 'write.tasks'= '1' , 'compaction.tasks'= '1' , 'write.rate.limit'= '2000' , 'table.type'= 'MERGE_ON_READ' , 'compaction.async.enabled'= 'true' , 'compaction.trigger.strategy'= 'num_commits' , 'compaction.delta_commits'= '1' , 'changelog.enabled'= 'true' , 'read.streaming.enabled'= 'true' , 'read.streaming.check-interval'= '3' , 'hive_sync.enable'= 'true' , 'hive_sync.mode'= 'hms' , 'hive_sync.metastore.uris'= 'thrift://XXXX:9083' -- , 'hive_sync.jdbc_url'= 'jdbc:hive2://hadoop:10000' , 'hive_sync.table'= 'users_mor_sync2hive01' , 'hive_sync.db'= 'hudi2hive' , 'hive_sync.username'= '' , 'hive_sync.password'= '' , 'hive_sync.support_timestamp'= 'true' );

INSERT INTO users_mor_sync2hive01 SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') FROM mysql_users;

danny0405 commented 1 year ago

You need to set up the pre_combine field correctly, for change log it is basically the operation timestamp metadata field, such as the op_ts field in debezium.