apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.89k stars 4.27k forks source link

[Bug]: Beam Jobs built using version 2.50 and above are not running with FlinkRunners v 1.16.x #32944

Open siddhanta-rath opened 4 weeks ago

siddhanta-rath commented 4 weeks ago

What happened?

We have been running our Beam jobs on Google Cloud Dataflow for a while now. We are now evaluating migrating to running them on Flink. All of our jobs are built using beam sdk version 2.56.0.

During this exercise, we have experienced an issue where our jobs are not coming up on FlinkRunner with Flink(v1.16.x) when we use the same pipelines built using 2.56.0 but when we downgrade the beam SDK version to 2.49 the jobs start running. But, when we downgrade we lose out on some of the features offered in the newer Beam Sdks ( RequestResponseIO among others, which is a critical component in one of our jobs).

We tried to run the WordCount example pipeline with the beam version starting from 2.50 till 2.56.0 with the respective runner versions from 2.50 to 2.58.0 on Flink 1.16.x but to no avail. But it runs fine on 2.49.0 version

These are the failure LOGS

caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Pipeline execution failed at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more Caused by: java.lang.RuntimeException: Pipeline execution failed at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:107) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?] at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_372] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more Caused by: org.apache.flink.api.common.InvalidProgramException: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. at org.apache.flink.core.execution.DetachedJobExecutionResult.getNetRuntime(DetachedJobExecutionResult.java:51) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.createAttachedPipelineResult(FlinkPipelineExecutionEnvironment.java:171) ~[?:?] at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154) ~[?:?] at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:104) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?] at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_372] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more 2024-07-25 11:45:12,956 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint. java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_372] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_372] at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_372] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_372] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_372] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_372] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:287) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:224) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_372] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_372] at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_372] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_372] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_372] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_372] Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application. ... 13 more Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Pipeline execution failed at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more Caused by: java.lang.RuntimeException: Pipeline execution failed at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:107) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?] at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_372] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more Caused by: org.apache.flink.api.common.InvalidProgramException: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. at org.apache.flink.core.execution.DetachedJobExecutionResult.getNetRuntime(DetachedJobExecutionResult.java:51) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.createAttachedPipelineResult(FlinkPipelineExecutionEnvironment.java:171) ~[?:?] at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154) ~[?:?] at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:104) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?] at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_372] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more

I have gone through the beam community issues [Issue/29660] and also the release notes. Beyond the compatibility mismatch (which in our case is already addressed), I didn't find anything else that could directly be the cause for the mentioned issue...

QUESTION IS - Has anyone in the community experienced such issues and have found a workaround to run Beam pipelines built using Beam SDKs > 2.49.0 on Flink ? Particularly if anybody is successfully running Beam pipelines built using SDK version 2.56 or newer...

Any help is appreciated !

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

siddhanta-rath commented 4 weeks ago

From the change log from beam SDK2.50.0, I learned that some changes have been made to support both x86 and ARM CPU architectures in beam images. Is this the reason for this incompatibility?