apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.42k stars 953 forks source link

[Bug] Job cannot recover from checkpoint/savepoint if parallelism is changed from 1 to 2 #4543

Open Tan-JiaLiang opened 1 day ago

Tan-JiaLiang commented 1 day ago

Search before asking

Paimon version

0.9.0

Compute Engine

Flink

Minimal reproduce step

  1. start a job to write paimon append only table in parallelism=1.
  2. stop the job.
  3. restore the job with checkpoint, and change the job's parallelism=2.
  4. error appear, job can not restore from checkpoint.

What doesn't meet your expectations?

Job can restore from checkpoint/savepoint even if I change the parallelism.

Anything else?

No response

Are you willing to submit a PR?

Tan-JiaLiang commented 1 day ago

https://github.com/apache/paimon/blob/220789d5ab4c566f72584d2b85980c777fd7807d/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java#L69-L83

If the job parallelism is 1, the Writer operator and the Compact Coordinator operator will be chained. However, since the parallelism of the Compact Coordinator operator is always 1, when the job parallelism is adjusted, the Writer operator and the Compact Coordinator operator will be separated, resulting in the state not being recoverable.

We need to disable chain between writer operator and compact corrdinator operator.

Tan-JiaLiang commented 1 day ago

https://github.com/apache/paimon/blob/220789d5ab4c566f72584d2b85980c777fd7807d/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java#L243-L256

Same as org.apache.paimon.flink.sink.FlinkSink#doWrite

Tan-JiaLiang commented 1 day ago

@JingsongLi WDYT? Should we add an option to control this? Like #3232.

Tan-JiaLiang commented 15 hours ago

I think #4424 can solve this problem. But do we still need to add the disable chain? Or do we just need to recommend that users add the 'sink.operator-uid.suffix' and 'source.operator-uid.suffix' options to their Flink job?