apache / doris-flink-connector

Flink Connector for Apache Doris
https://doris.apache.org/
Apache License 2.0
307 stars 216 forks source link

[Bug] CheckPoint Timeout Due to Retrieving Schemas for All Subpartitions When Synchronizing a Partitioned PostgreSQL Table #426

Closed tchivs closed 1 week ago

tchivs commented 1 month ago

Search before asking

Version

1.6.1

What's Wrong?

When synchronizing a PostgreSQL table using a connector, if the table contains a large number of partitions, the checkpoint always fails. By tracing the source code, it was found that the PostgresDialect's queryTableSchema queries the schema of each table. This schema querying during each checkpoint causes connection timeouts.

flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c com.hytera.cdc.FromPg2Doris \
    ./flink-doris-connector-1.19-1.6.1.jar\
     --database aia_dev\
     --postgres-conf scan.incremental.snapshot.enabled=true \
     --postgres-conf hostname=xxxxx \
     --postgres-conf port=5432 \
     --postgres-conf username=postgres \
     --postgres-conf password="xxxxx" \
     --postgres-conf database-name=test \
     --postgres-conf schema-name=public \
     --postgres-conf slot.name=aia_test \
     --postgres-conf decoding.plugin.name=pgoutput \
     --table-prefix dwd_\
     --ignore-default-value \
     --including-tables "jjdb_.*|fkdb_.*|pjdb_.*|dsrdb|zjdb|case_log_test" \
     --multi-to-one-origin "jjdb_.*|fkdb_.*|pjdb_.*" \
     --multi-to-one-target "dwd_jjdb|dwdfkdb|dwd_pjdb" \
     --excluding-tables ".*_bak"\
     --sink-conf fenodes=10.115.6.215:8030 \
     --sink-conf username=admin \
     --sink-conf password=:a:74Y4nF-8m\
     --sink-conf jdbc-url=jdbc:mysql://10.115.6.215:9030 \
     --sink-conf sink.label-prefix=label \
     --sink-conf sink.enable.batch-mode=false \
     --sink-conf sink.properties.partial.columns=true\
     --sink-conf sink.enable-delete=true\
     --table-conf replication_num=3\

check point Exception:

org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. The latest checkpoint failed due to Checkpoint expired before completing., view the Checkpoint History tab or the Job Manager log to find out why continuous checkpoints failed.

image

error log:

2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0 (dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
2024-05-28 15:39:07,370 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state CANCELED to JobManager for task fkdb: Writer -> fkdb: Committer (1/1)#0 dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - I/O exception (java.net.SocketException) caught when processing request to {}->http://192.168.0.168:8040: Socket closed
2024-05-28 15:39:07,378 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state CANCELED to JobManager for task jjdb: Writer -> jjdb: Committer (1/1)#0 dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] - Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling signal - interrupting; it is stuck for 30 seconds in method:
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown Source)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
java.lang.Thread.run(Thread.java:750)

2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping the embedded engine
2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover [] - Reporting error:
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_381]
at com.ververica.cdc.debezium.internal.Handover.produce(Handover.java:115) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at com.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:54) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:822) [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192) [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_381]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:39:37,375 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping the task and engine
2024-05-28 15:39:37,375 INFO io.debezium.connector.common.BaseSourceTask [] - Stopping down connector
2024-05-28 15:41:07,376 WARN io.debezium.pipeline.ChangeEventSourceCoordinator [] - Coordinator didn't stop in the expected time, shutting down executor now
2024-05-28 15:41:07,377 WARN io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot was interrupted before completion
2024-05-28 15:41:07,377 INFO io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot - Final stage
2024-05-28 15:41:07,377 WARN io.debezium.pipeline.ChangeEventSourceCoordinator [] - Change event source executor was interrupted
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:440) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:166) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:120) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:57) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:155) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:407) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:316) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:132) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_381]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_381]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_381]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:41:07,378 INFO io.debezium.pipeline.ChangeEventSourceCoordinator [] - Connected metrics set to 'false'
2024-05-28 15:41:07,380 INFO io.debezium.jdbc.JdbcConnection [] - Connection gracefully closed

image

What You Expected?

When synchronizing a PostgreSQL table using a connector, if the table contains a large number of partitions, the checkpoint always fails. By tracing the source code, it was found that the PostgresDialect's queryTableSchema queries the schema of each table. This schema querying during each checkpoint causes connection timeouts.

Solution: Use caching. Each partitioned table should match and fetch the schema only once using the following parameters:

How to Reproduce?

Steps to Reproduce:

  1. Add a partitioned table in PostgreSQL.
  2. Create partitions for nearly ten years.
  3. Synchronize this table.

Anything Else?

Method Modification: I modified the method as follows and it works well:

@Override
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
    long startTime = System.nanoTime(); // Record start time
    String name = this.tableNameConverter.convert(tableId.table());
    TableId parentTableId = new TableId(null, tableId.schema(), name);
    TableChanges.TableChange tableChange = cache.get(parentTableId);
    if (tableChange == null) {
        LOG.info("[queryTableSchema begin] {}", tableId.identifier());
        if (schema == null) {
            schema = new CustomPostgresSchema((PostgresConnection) jdbc, sourceConfig);
        }
        tableChange = schema.getTableSchema(tableId);
        LOG.info("[queryTableSchema end] {}", tableId.identifier());
        cache.put(parentTableId, tableChange);
    }
    long endTime = System.nanoTime(); 
    long duration = endTime - startTime; 
    LOG.info("[queryTableSchema duration] {} {} ms", tableId.identifier(), duration / 1_000_000); // Convert nanoseconds to milliseconds
    return tableChange;
}

Are you willing to submit PR?

Code of Conduct

DongLiang-0 commented 1 month ago

@tchivs Thanks for you raising this issue. This seems to be an issue with flink-cdc-connector. You can try contributing your fix to the flink-cdc community.

tchivs commented 1 week ago

感谢您提出此问题。这似乎是 flink-cdc-connector 的问题。你可以尝试将你的修复贡献给 flink-cdc 社区。 OK,I have submitted issus to them, and this issue can be closed FLINK-36164