apache / incubator-streampark

Make stream processing easier! Easy-to-use streaming application development framework and operation platform.
https://streampark.apache.org/
Apache License 2.0
3.85k stars 994 forks source link

[Bug] Version 2.1 Submitting a batch task is abnormal #2976

Closed Dkbei closed 1 year ago

Dkbei commented 1 year ago

Search before asking

Java Version

jdk 1.8

Scala Version

2.11.x

StreamPark Version

2.1.0

Flink Version

1.14.6

deploy mode

yarn-application

What happened

An exception occurred when the sql was submitted in batch mode

Error Exception

2023-08-25 11:42:33,149 WARN  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application failed unexpectedly:
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_181]
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_181]
    at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) ~[?:1.8.0_181]
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_181]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_181]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_181]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:287) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:224) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181]
    at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_16be5c59-5ecf-4c7e-808f-900a6875ceae.jar:1.14.6]
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_16be5c59-5ecf-4c7e-808f-900a6875ceae.jar:1.14.6]
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_16be5c59-5ecf-4c7e-808f-900a6875ceae.jar:1.14.6]
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_16be5c59-5ecf-4c7e-808f-900a6875ceae.jar:1.14.6]
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_16be5c59-5ecf-4c7e-808f-900a6875ceae.jar:1.14.6]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_181]
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_181]
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_181]
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) [?:1.8.0_181]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
    ... 13 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No operators defined in streaming topology. Cannot execute.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    ... 12 more
Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:2105) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.generateStreamGraph(StreamExecutionEnvironment.java:2100) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.table.planner.delegation.DefaultExecutor.createPipeline(DefaultExecutor.java:83) ~[flink-table_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1606) ~[flink-table_2.11-1.14.6.jar:1.14.6]
    at org.apache.streampark.flink.core.TableContext.execute(TableContext.scala:68) ~[streampark-flink-shims_flink-1.14_2.11-2.1.0.jar:2.1.0]
    at org.apache.streampark.flink.core.FlinkTableTrait.start(FlinkTableTrait.scala:39) ~[streampark-flink-sqlclient_2.11-2.1.0.jar:2.1.0]
    at org.apache.streampark.flink.core.scala.FlinkTable$class.main(FlinkTable.scala:42) ~[streampark-flink-sqlclient_2.11-2.1.0.jar:2.1.0]
    at org.apache.streampark.flink.cli.SqlClient$BatchSqlApp$.main(SqlClient.scala:64) ~[streampark-flink-sqlclient_2.11-2.1.0.jar:2.1.0]
    at org.apache.streampark.flink.cli.SqlClient$.delayedEndpoint$org$apache$streampark$flink$cli$SqlClient$1(SqlClient.scala:57) ~[streampark-flink-sqlclient_2.11-2.1.0.jar:2.1.0]
    at org.apache.streampark.flink.cli.SqlClient$delayedInit$body.apply(SqlClient.scala:31) ~[streampark-flink-sqlclient_2.11-2.1.0.jar:2.1.0]
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at scala.App$$anonfun$main$1.apply(App.scala:76) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at scala.App$$anonfun$main$1.apply(App.scala:76) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at scala.collection.immutable.List.foreach(List.scala:392) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at scala.App$class.main(App.scala:76) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.streampark.flink.cli.SqlClient$.main(SqlClient.scala:31) ~[streampark-flink-sqlclient_2.11-2.1.0.jar:2.1.0]
    at org.apache.streampark.flink.cli.SqlClient.main(SqlClient.scala) ~[streampark-flink-sqlclient_2.11-2.1.0.jar:2.1.0]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_181]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_181]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    ... 12 more
2023-08-25 11:42:33,155 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=30.0, taskHeapSize=15.292gb (16419651555 bytes), taskOffHeapSize=125.000mb (131072000 bytes), networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=1.852gb (1988100125 bytes), numSlots=30}, current pending count: 1.
2023-08-25 11:42:33,156 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_181]
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_181]
    at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) ~[?:1.8.0_181]
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_181]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_181]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_181]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:287) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:224) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181]
    at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_16be5c59-5ecf-4c7e-808f-900a6875ceae.jar:1.14.6]
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_16be5c59-5ecf-4c7e-808f-900a6875ceae.jar:1.14.6]
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_16be5c59-5ecf-4c7e-808f-900a6875ceae.jar:1.14.6]
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_16be5c59-5ecf-4c7e-808f-900a6875ceae.jar:1.14.6]
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_16be5c59-5ecf-4c7e-808f-900a6875ceae.jar:1.14.6]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_181]
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_181]
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_181]
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) [?:1.8.0_181]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
    ... 13 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No operators defined in streaming topology. Cannot execute.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    ... 12 more
Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:2105) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.generateStreamGraph(StreamExecutionEnvironment.java:2100) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.table.planner.delegation.DefaultExecutor.createPipeline(DefaultExecutor.java:83) ~[flink-table_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1606) ~[flink-table_2.11-1.14.6.jar:1.14.6]
    at org.apache.streampark.flink.core.TableContext.execute(TableContext.scala:68) ~[streampark-flink-shims_flink-1.14_2.11-2.1.0.jar:2.1.0]
    at org.apache.streampark.flink.core.FlinkTableTrait.start(FlinkTableTrait.scala:39) ~[streampark-flink-sqlclient_2.11-2.1.0.jar:2.1.0]
    at org.apache.streampark.flink.core.scala.FlinkTable$class.main(FlinkTable.scala:42) ~[streampark-flink-sqlclient_2.11-2.1.0.jar:2.1.0]
    at org.apache.streampark.flink.cli.SqlClient$BatchSqlApp$.main(SqlClient.scala:64) ~[streampark-flink-sqlclient_2.11-2.1.0.jar:2.1.0]
    at org.apache.streampark.flink.cli.SqlClient$.delayedEndpoint$org$apache$streampark$flink$cli$SqlClient$1(SqlClient.scala:57) ~[streampark-flink-sqlclient_2.11-2.1.0.jar:2.1.0]
    at org.apache.streampark.flink.cli.SqlClient$delayedInit$body.apply(SqlClient.scala:31) ~[streampark-flink-sqlclient_2.11-2.1.0.jar:2.1.0]
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at scala.App$$anonfun$main$1.apply(App.scala:76) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at scala.App$$anonfun$main$1.apply(App.scala:76) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at scala.collection.immutable.List.foreach(List.scala:392) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at scala.App$class.main(App.scala:76) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.streampark.flink.cli.SqlClient$.main(SqlClient.scala:31) ~[streampark-flink-sqlclient_2.11-2.1.0.jar:2.1.0]
    at org.apache.streampark.flink.cli.SqlClient.main(SqlClient.scala) ~[streampark-flink-sqlclient_2.11-2.1.0.jar:2.1.0]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_181]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_181]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.11-1.14.6.jar:1.14.6]
    ... 12 more
2023-08-25 11:42:33,169 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting YarnApplicationClusterEntryPoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..

Screenshots

image

Are you willing to submit PR?

Code of Conduct

Dkbei commented 1 year ago

sql脚本如下: SET 'execution.runtime-mode' = 'batch'; SET 'parallelism.default' = '30'; SET 'pipeline.max-parallelism' = '30';

CREATE CATALOG paimon WITH ( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://xxx:9083', 'warehouse' = 'hdfs://xx/user/hive/warehouse/', 'hive-conf-dir' = '/etc/hive/conf' );

insert into paimon.ods.ods_record_dynamic_realtime select from ( select a.,row_number() over(partition by synid order by upload_update desc) as rn from paimon.ods.ods_record_realtime /+ OPTIONS('scan.infer-parallelism' = 'false') / a ) zz ;

wolfboys commented 1 year ago

Thanks for your feedback. We will promptly confirm and fix it.