apache / flink-cdc

Flink CDC is a streaming data integration tool
https://nightlies.apache.org/flink/flink-cdc-docs-stable
Apache License 2.0
5.71k stars 1.94k forks source link

[Bug][Mysql] Starting flink with a savepoint and adding a new table for synchronization is problematic #2105

Closed hiSandog closed 9 months ago

hiSandog commented 1 year ago

Search before asking

Flink version

1.14.2

Flink CDC version

2.2.1

Database and its version

mysql 8.0.22

Minimal reproduce step

  1. 使用startupOptions : initial 方式启动 flink cdc,同步若干张表,比如 (10 张),这时候没有问题,很顺利,不仅快照数据全部进入kafka,后续的binlog 数据也都能进入 Starting the flink cdc with startupOptions: initial and syncing a few tables, say (10), worked fine,Not only the snapshot data will all enter kafka, but also the subsequent binlog data
  2. 通过执行 flink stop -t yarn-application --savepointPath xxx -Dyarn.application.id={flink_yarn_id} {flink_id} 停止flink 任务,并且设置保存点 stop the flink job by running 'flink stop -t yarn-application --savepointPath xxx-Dyarn.application. id={flink_yarn_id} {flink_id}', And set the savepoints
  3. 有些新增需要同步的表加到了启动参数里面,这些表的数据量大概 100多万,使用 flink run --detached --fromSavepoint xxx,通过刚才的保存点恢复任务。 flink run --detached --fromSavepoint xxxis used to retrieve a few new tables that need to be synchronized.

What did you expect to see?

flink cdc 在同步完这些新增表的快照后,会连同之前的表一起同步新的binlog

After the flink cdc syncs the snapshots of these new tables, it syncs the new binlog along with the previous tables

What did you see instead?

flink cdc 的日志确实显示了有新的表加入,也确实开始同步了新的表的快照数据,可是在同步新的表的快照时候同步了一部分之后就似乎停止同步了,然后也没有同步任何的binlog数据,仿佛所有的任务全部卡死,也没有任何的报错。 The flink cdc logs do show that new tables have been added, and it does start syncing snapshots of new tables, but it seems to stop syncing after a few syncs of the new table snapshot, and then it doesn't sync any binlog data, as if all tasks are stuck without any errors.

image image 图一表明,flinkcdc感知到了新的需要同步的表,并且开始同步表的快照 图二是flink 的jobmanager 日志,在切分表切分到第21个切片的时候,就停止切分了,其实后面还有很多数据,但是此时flink cdc 仿佛啥都不干了,也不同步快照,也不同步binlog,也没有任何报错,等了几个小时都是这样

Figure 1 shows that flinkcdc senses the new tables that need to be synchronized and starts synchronizing snapshots of the tables Figure 2 shows the flink jobmanager log.The sharding table stops at the 21st slice.There is a lot of data left, but the flink cdc seems to stop doing anything-it doesn't sync snapshots, it doesn't sync binlogs, and it doesn't get any error for hours

Anything else?

当需要同步的数据表比较小,比较少的时候似乎并不会出现这种情况,有时候出现这种情况靠提高并行度似乎可以缓解,但是有时候又不行。

This doesn't seem to be the case when the number of tables that need to be synchronized is relatively small, and sometimes it seems to be alleviated by increasing parallelism, but sometimes it isn't.

Are you willing to submit a PR?

leonardBang commented 1 year ago

@hiSandog Could you try to bump the cdc version to 2.3.0 which has fixed some bugs?

hiSandog commented 1 year ago

@hiSandog Could you try to bump the cdc version to 2.3.0 which has fixed some bugs?

"Source Data Fetcher for Source: mysql cdc source -> Sink: cdc sink msk aws (1/8)#0" Id=495 TIMED_WAITING
    at java.lang.Thread.sleep(Native Method)
    at io.debezium.util.Metronome$1.pause(Metronome.java:57)
    at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:237)
    at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.pollSplitRecords(BinlogSplitReader.java:147)
    at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:78)
    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
    ...

    Number of locked synchronizers = 1
    - java.util.concurrent.ThreadPoolExecutor$Worker@67678907

我看flink 的 thread dump ,好像应该是卡死在这的

hiSandog commented 1 year ago

@hiSandog Could you try to bump the cdc version to 2.3.0 which has fixed some bugs?

切换到 2.3.0 其实有所好转,但似乎依然算不算理想状态

image 首先在同步大表快照阶段,比如我设置并行度为 8,但是 其中6个只跑了一次切片后就歇火了,另外两个将剩下的所有数据跑完(不是这张图,因为跑大表的时候 没截图,所以用这种图表示一下,当时的情况是6个task,只跑了很少量的数据,剩下的所有数据都是两个任务跑完的)。我很担心,如果我再继续加表,就会出现8个task全部都不跑的情况,现在使用的2.2.1 就是这种情况。 还有就是 大表快照完之后,没有开始同步binlog,然后我保存重启之后才正常同步binlog。

gong commented 1 year ago

@hiSandog Could you try to bump the cdc version to 2.3.0 which has fixed some bugs?

切换到 2.3.0 其实有所好转,但似乎依然算不算理想状态

image 首先在同步大表快照阶段,比如我设置并行度为 8,但是 其中6个只跑了一次切片后就歇火了,另外两个将剩下的所有数据跑完(不是这张图,因为跑大表的时候 没截图,所以用这种图表示一下,当时的情况是6个task,只跑了很少量的数据,剩下的所有数据都是两个任务跑完的)。我很担心,如果我再继续加表,就会出现8个task全部都不跑的情况,现在使用的2.2.1 就是这种情况。 还有就是 大表快照完之后,没有开始同步binlog,然后我保存重启之后才正常同步binlog。

@hiSandog Hi, I think you met oom when splitSnapshotReadTask.execute(sourceContext); I suggest you can modify MySqlSnapshotSplitReadTask#createDataEventsForTable to catch Throwable. Maybe you will discover error info.

hiSandog commented 1 year ago

@hiSandog Could you try to bump the cdc version to 2.3.0 which has fixed some bugs?

切换到 2.3.0 其实有所好转,但似乎依然算不算理想状态 image 首先在同步大表快照阶段,比如我设置并行度为 8,但是 其中6个只跑了一次切片后就歇火了,另外两个将剩下的所有数据跑完(不是这张图,因为跑大表的时候 没截图,所以用这种图表示一下,当时的情况是6个task,只跑了很少量的数据,剩下的所有数据都是两个任务跑完的)。我很担心,如果我再继续加表,就会出现8个task全部都不跑的情况,现在使用的2.2.1 就是这种情况。 还有就是 大表快照完之后,没有开始同步binlog,然后我保存重启之后才正常同步binlog。

@hiSandog Hi, I think you met oom when splitSnapshotReadTask.execute(sourceContext); I suggest you can modify MySqlSnapshotSplitReadTask#createDataEventsForTable to catch Throwable. Maybe you will discover error info.

image

我不确定 现在这样算不算正常现象,重启后同步快照的时候,只有两个task 在工作,其余task 全部在摸鱼

Mengqi777 commented 1 year ago

This happened to me too

ruanhang1993 commented 9 months ago

Considering collaboration with developers around the world, please re-create your issue in English on Apache Jira under project Flink with component tag Flink CDC. Thank you!

为了方便各个国家的贡献者交流,请在 Apache JiraFlink 项目下重新用英文创建问题,同时在问题上将 Component 标注为 Flink CDC。谢谢!

njalan commented 1 month ago

@ruanhang1993 请问这个问题有方案吗? flink cdc 3.2还是同样的问题。 一般的表通过savepoint恢复没什么问题。但是数据量很大比如超过10亿通过checkpoint恢复后跑1分钟马上就卡着不动了。 没有什么新数据流入,程序也没有明显报错