apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.1k stars 834 forks source link

[Feature][Question] CDC schema evolution in synchronizing databases? #3581

Open CodyPin opened 1 week ago

CodyPin commented 1 week ago

Search before asking

Motivation

I was using Flink CDC to sync some tables in a MySQL database and I suddenly noticed that the job failed, I didn't look much into it since the job often fails due to OOM (I am testing with loads of table, around 90ish), and when I restart the job, it comes with the below exception

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Incompatible schema found.
Paimon table is: ...
MySQL table is: ...
If you want to ignore the incompatible tables, please specify --ignore-incompatible to true.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:172)
    at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
    at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
    at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
    at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)

The table schema in MySQL has indeed changed with a column added. Originally I had the assumption that database synchronization would also have schema evolution like table synchronization, but upon further reading into the documents:

Suppose we have a MySQL table named tableA, it has three fields: field_1, field_2, field_3. When we want to load this MySQL table to Paimon, we can do this in Flink SQL, or use MySqlSyncTableAction.

With it specifically mentioning MySqlSyncTableAction, it seems schema evolution is not supported in database synchronization, is this true? If so, I have 2 questions:

  1. What would be the recommended action if I want to continue the database synchronization with that table included and updated? Do I need to drop said table and load it again? Or is there another easier way?
  2. Would schema evolution be supported in database synchronization in the future?

Some additional info for the builds I am using:

Solution

No response

Anything else?

No response

Are you willing to submit a PR?

MOBIN-F commented 6 days ago

hi @CodyPin ,Can you provide the complete information of this piece? Paimon table is: ... MySQL table is: ...

I have encountered a similar problem. This problem occurs when the schema change occurs when stopping and starting the flink job. I think that's what happened to you, too? Relevant pr:#3362

Would schema evolution be supported in database synchronization in the future? [ database synchronization supported schema evolution]

LinMingQiang commented 4 days ago

You can add flink-cdc-common-3.0.1.jar into FLINK_HOME/lib path, and try the schema evolution again.