apache / incubator-streampark

Make stream processing easier! Easy-to-use streaming application development framework and operation platform.
https://streampark.apache.org/
Apache License 2.0
3.91k stars 1.01k forks source link

[Bug] Flink native k8s application mode recovery failed from S3(s3p) savepoint #3013

Open lukeyan2023 opened 1 year ago

lukeyan2023 commented 1 year ago

Search before asking

Java Version

1.8

Scala Version

2.12.x

StreamPark Version

2.1.1

Flink Version

1.17.1

deploy mode

kubernetes-application

What happened

Flink failed to recover from savepoints that automatically saved by streampark, Through reviewing the logs, it was found that the value of the savepoint submitted during streampark's recovery of the flash is s3p://lakehouse/flink/sp/Platform-Link-Test-Security Log/savepoint-2b3ed0-f0c7ba51791f . By checking the logs of the Flink app, it was found that Flink encountered an error when restoring from savepoint s3p://lakehouse/flink/sp/Platform-Link-Test-Security-Log/savepoint-2b3ed0-f0c7ba51791f . Afterwards, manually submitting using the same savepoint s3p://lakehouse/flink/sp/Platform-Link-Test-Security Log/savepoint-2b3ed0-f0c7ba51791f through common cli encountered the same error

However, by modifying the savepoint format to s3p://lakehouse/flink/sp/Platform-Flink-Test-Security-Log/savepoint-2b3ed0-f0c7ba51791f/_metadata, both common cli and streampark submissions can be successful.

Error Exception

The error log of Flink is as follows:

2023-09-02 02:57:38,909 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No checkpoint found during restore.
2023-09-02 02:57:38,912 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Starting job ff31d8dfb89d0a2e3fdf618617af15bc from savepoint s3p://lakehouse/flink/sp/Platform-Flink-Test-Security-Log/savepoint-2b3ed0-f0c7ba51791f ()
2023-09-02 02:57:39,360 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job ff31d8dfb89d0a2e3fdf618617af15bc reached terminal state FAILED.
org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
    at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.io.EOFException
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
    ... 3 more
Caused by: java.lang.RuntimeException: java.io.EOFException
    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    ... 3 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.flink.runtime.checkpoint.Checkpoints.loadCheckpointMetadata(Checkpoints.java:113)
    at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:149)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1849)
    at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:223)
    at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:198)
    at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365)
    at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:210)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:136)
    at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
    at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
    at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:371)
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:348)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
    ... 4 more
2023-09-02 02:57:39,395 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job ff31d8dfb89d0a2e3fdf618617af15bc has been registered for cleanup in the JobResultStore after reaching a terminal state.
2023-09-02 02:57:40,108 WARN  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application failed unexpectedly: 
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_382]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[flink-dist-1.17.1.jar:1.17.1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_382]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_382]
    at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_382]
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_382]
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_382]
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_382]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
    ... 13 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute sql
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
    ... 12 more
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:938) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
    at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
    at com.entercom.china.itsdsi.security.iceberg.ProbeLogToIceberg.main(ProbeLogToIceberg.java:1389) ~[?:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_382]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_382]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_382]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
    ... 12 more
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'Platform-Flink-Test-security-Log'.
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:189) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) ~[?:?]
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:921) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
    at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
    at com.entercom.china.itsdsi.security.iceberg.ProbeLogToIceberg.main(ProbeLogToIceberg.java:1389) ~[?:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_382]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_382]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_382]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
    ... 12 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) ~[flink-dist-1.17.1.jar:1.17.1]
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_382]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
    at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist-1.17.1.jar:1.17.1]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.io.EOFException
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.lang.RuntimeException: java.io.EOFException
    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) ~[flink-dist-1.17.1.jar:1.17.1]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_382]
    at org.apache.flink.runtime.checkpoint.Checkpoints.loadCheckpointMetadata(Checkpoints.java:113) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:149) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1849) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:223) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:198) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:210) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:136) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:371) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:348) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.17.1.jar:1.17.1]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
2023-09-02 02:57:40,132 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_382]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[flink-dist-1.17.1.jar:1.17.1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_382]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_382]
    at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_382]
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_382]
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_382]
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_382]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
    ... 13 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute sql
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
    ... 12 more
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:938) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
    at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
    at com.entercom.china.itsdsi.security.iceberg.ProbeLogToIceberg.main(ProbeLogToIceberg.java:1389) ~[?:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_382]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_382]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_382]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
    ... 12 more
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'Platform-Flink-Test-security-Log'.
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:189) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) ~[?:?]
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:921) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
    at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
    at com.entercom.china.itsdsi.security.iceberg.ProbeLogToIceberg.main(ProbeLogToIceberg.java:1389) ~[?:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_382]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_382]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_382]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
    ... 12 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) ~[flink-dist-1.17.1.jar:1.17.1]
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_382]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
    at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist-1.17.1.jar:1.17.1]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.io.EOFException
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_382]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.lang.RuntimeException: java.io.EOFException
    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) ~[flink-dist-1.17.1.jar:1.17.1]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_382]
    at org.apache.flink.runtime.checkpoint.Checkpoints.loadCheckpointMetadata(Checkpoints.java:113) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:149) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1849) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:223) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:198) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:210) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:136) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:371) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:348) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.17.1.jar:1.17.1]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
2023-09-02 02:57:40,145 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2023-09-02 02:57:40,145 INFO  org.apache.flink.runtime.blob.BlobServer                     [] - Stopped BLOB server at 0.0.0.0:6124

Screenshots

cf4c0507a1334673f11b8a73a2d5f7c

491a22b05e31e741d5215ed1d252a0d

Are you willing to submit PR?

Code of Conduct

wolfboys commented 1 year ago

Thank you for the detailed feedback. We will work quickly to identify and fix this bug. 💪🔧

lukeyan2023 commented 1 year ago

Thank you for the detailed feedback. We will work quickly to identify and fix this bug. 💪🔧

image

image

The above two screenshots are from the Flink 1.17.1 official document

In actual testing, the SP address in Figure 2 should use the format 2 in Figure 1 to function properly, but using the format 1 in Figure 1 will fail, which is also the reason for the current error.

By reviewing the source code, I understand that the process should be as follows:

First, trigger the generation of sp, then streampark will obtain the generated sp from Flink and write it to the database. Second, when restoring from sp, it will obtain the sp parameters from the database.

So if we fix this issue, should we obtain the format 2 in Figure 1 when obtaining sp from Flink, so that the code in other places doesn't need to be changed?

wolfboys commented 1 year ago

Thank you for providing the information. We encourage you to fix this bug, how about it? 💪 We need to test and determine the savepoint path rules under different versions of Flink (1.12 ~ 1.17). We warmly welcome you to fix this bug. And we believe you can do it! 👍😊

lukeyan2023 commented 1 year ago

Thank you for providing the information. We encourage you to fix this bug, how about it? 💪 We need to test and determine the savepoint path rules under different versions of Flink (1.12 ~ 1.17). We warmly welcome you to fix this bug. And we believe you can do it! 👍😊

I am willing to fix this bug and am currently reading the relevant code, but due to my limited abilities, it may take some time

lukeyan2023 commented 1 year ago

image

By reviewing the relevant source code and Flink official documents, I believe that the correct savepoint format should be Format 1 in the screenshot

So I think this should be a problem with Flink, not Streampark。To prove this, I ran the following test

Flink Version 1.17.1

  1. Use HDFS to store the savepoint, and use the sp recovery task in format 1 as shown in the screenshot. Test result successful
  2. Use S3 to store the savepoint and select the s3a protocol, and use the sp restore task of format 1 in the screenshot. Test result successful
  3. Use S3 to store the savepoint and select the s3p protocol, and use the sp restore task of format 1 in the screenshot. test result failed
  4. Use S3 to store the savepoint and select the s3p protocol, and use the sp recovery task in format 2 in the screenshot. Test result successful

In summary, it should be that flink sp has unexpected behavior when using S3 storage and using the s3p protocol

If this is the design goal of flink, then maybe streampark needs to be optimized specifically for this scenario. If this does not meet the design goals of flink, it seems that the BUG should be reported to the flink community

@wolfboys

wolfboys commented 1 year ago

image

By reviewing the relevant source code and Flink official documents, I believe that the correct savepoint format should be Format 1 in the screenshot

So I think this should be a problem with Flink, not Streampark。To prove this, I ran the following test

Flink Version 1.17.1

  1. Use HDFS to store the savepoint, and use the sp recovery task in format 1 as shown in the screenshot. Test result successful
  2. Use S3 to store the savepoint and select the s3a protocol, and use the sp restore task of format 1 in the screenshot. Test result successful
  3. Use S3 to store the savepoint and select the s3p protocol, and use the sp restore task of format 1 in the screenshot. test result failed
  4. Use S3 to store the savepoint and select the s3p protocol, and use the sp recovery task in format 2 in the screenshot. Test result successful

In summary, it should be that flink sp has unexpected behavior when using S3 storage and using the s3p protocol

If this is the design goal of flink, then maybe streampark needs to be optimized specifically for this scenario. If this does not meet the design goals of flink, it seems that the BUG should be reported to the flink community

@wolfboys

Sorry for taking so long to get back here, based on your description, there is a preliminary suspicion that it might be a bug in Flink. We need further confirmation. If it is true, we can provide feedback to the Flink community.