apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.05k stars 1.83k forks source link

[Bug] [Connector][Paimon] setLoadTable too late when restoreJob #8070

Open yoogoc opened 6 days ago

yoogoc commented 6 days ago

Search before asking

What happened

Now setLoadTable in handleSchemaSaveMode in paimon sink, but restoreJobFromMasterActiveSwitch will skip handleSaveMode.

SeaTunnel Version

dev branch

SeaTunnel Config

any paimon sink

Running Command

any paimon sink

Error Exception

org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: Job id 909735403425103873 init failed
     at org.apache.seatunnel.engine.server.CoordinatorService.restoreJobFromMasterActiveSwitch(CoordinatorService.java:516) ~[seatunnel-starter.jar:2.3.9-SNAPSHOT]
     at org.apache.seatunnel.engine.server.CoordinatorService.lambda$restoreAllRunningJobFromMasterNodeSwitch$4(CoordinatorService.java:466) ~[seatunnel-starter.jar:2.3.9-SNAPSHOT]
     at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) ~[?:1.8.0_342]
     at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39) ~[seatunnel-starter.jar:2.3.9-SNAPSHOT]
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342]
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342]
     at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
Caused by: java.lang.NullPointerException
     at org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitter.<init>(PaimonAggregatedCommitter.java:66) ~[?:?]
     at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink.createAggregatedCommitter(PaimonSink.java:108) ~[?:?]
     at org.apache.seatunnel.api.sink.multitablesink.MultiTableSink.createAggregatedCommitter(MultiTableSink.java:150) ~[seatunnel-starter.jar:2.3.9-SNAPSHOT]
     at org.apache.seatunnel.engine.server.dag.physical.PhysicalPlanGenerator.lambda$getCommitterTask$5(PhysicalPlanGenerator.java:253) ~[seatunnel-starter.jar:2.3.9-SNAPSHOT]
     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_342]
     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_342]
     at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) ~[?:1.8.0_342]
     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_342]
     at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_342]
     at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_342]
     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_342]
     at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) ~[?:1.8.0_342]
     at org.apache.seatunnel.engine.server.dag.physical.PhysicalPlanGenerator.getCommitterTask(PhysicalPlanGenerator.java:306) ~[seatunnel-starter.jar:2.3.9-SNAPSHOT]
     at org.apache.seatunnel.engine.server.dag.physical.PhysicalPlanGenerator.lambda$generate$0(PhysicalPlanGenerator.java:182) ~[seatunnel-starter.jar:2.3.9-SNAPSHOT]
     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_342]
     at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) ~[?:1.8.0_342]
     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_342]
     at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_342]
     at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_342]
     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_342]
     at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) ~[?:1.8.0_342]
     at org.apache.seatunnel.engine.server.dag.physical.PhysicalPlanGenerator.generate(PhysicalPlanGenerator.java:220) ~[seatunnel-starter.jar:2.3.9-SNAPSHOT]
     at org.apache.seatunnel.engine.server.dag.physical.PlanUtils.fromLogicalDAG(PlanUtils.java:61) ~[seatunnel-starter.jar:2.3.9-SNAPSHOT]
     at org.apache.seatunnel.engine.server.master.JobMaster.init(JobMaster.java:259) ~[seatunnel-starter.jar:2.3.9-SNAPSHOT]
     at org.apache.seatunnel.engine.server.CoordinatorService.restoreJobFromMasterActiveSwitch(CoordinatorService.java:514) ~[seatunnel-starter.jar:2.3.9-SNAPSHOT]
     ... 6 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct