apache / incubator-streampark

Make stream processing easier! Easy-to-use streaming application development framework and operation platform.
https://streampark.apache.org/
Apache License 2.0
3.86k stars 996 forks source link

[Bug] Demo Flink SQL can not be run on yarn-per-job mode with flink 1.16 and flink 1.17 #3761

Closed SbloodyS closed 2 months ago

SbloodyS commented 3 months ago

Search before asking

Java Version

1.8

Scala Version

2.12.x

StreamPark Version

dev

Flink Version

1.16 1.17

deploy mode

yarn-perjob

What happened

I have tested that shim's packaged streampark-flinkjob_{applicationName}.jar running properly locally using flink run -t yarn-per-job --detached. But it can not be run in streampark.

java.util.concurrent.CompletionException: java.lang.reflect.InvocationTargetException
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.streampark.flink.client.FlinkClient$.$anonfun$proxy$1(FlinkClient.scala:88)
    at org.apache.streampark.flink.proxy.FlinkShimsProxy$.$anonfun$proxy$1(FlinkShimsProxy.scala:62)
    at org.apache.streampark.common.util.ClassLoaderUtils$.runAsClassLoader(ClassLoaderUtils.scala:42)
    at org.apache.streampark.flink.proxy.FlinkShimsProxy$.proxy(FlinkShimsProxy.scala:62)
    at org.apache.streampark.flink.client.FlinkClient$.proxy(FlinkClient.scala:83)
    at org.apache.streampark.flink.client.FlinkClient$.submit(FlinkClient.scala:54)
    at org.apache.streampark.flink.client.FlinkClient.submit(FlinkClient.scala)
    at org.apache.streampark.console.core.service.application.impl.ApplicationActionServiceImpl.lambda$start$2(ApplicationActionServiceImpl.java:464)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    ... 3 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to instantiate java compiler
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
    at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82)
    at org.apache.streampark.flink.client.trait.FlinkClientTrait.getJobGraph(FlinkClientTrait.scala:293)
    at org.apache.streampark.flink.client.trait.FlinkClientTrait.getJobGraph$(FlinkClientTrait.scala:251)
    at org.apache.streampark.flink.client.impl.YarnPerJobClient$.doSubmit(YarnPerJobClient.scala:86)
    at org.apache.streampark.flink.client.trait.FlinkClientTrait.submit(FlinkClientTrait.scala:158)
    at org.apache.streampark.flink.client.trait.FlinkClientTrait.submit$(FlinkClientTrait.scala:70)
    at org.apache.streampark.flink.client.impl.YarnPerJobClient$.submit(YarnPerJobClient.scala:40)
    at org.apache.streampark.flink.client.FlinkClientEntrypoint$.submit(FlinkClientEntrypoint.scala:47)
    at org.apache.streampark.flink.client.FlinkClientEntrypoint.submit(FlinkClientEntrypoint.scala)
    ... 16 more
Caused by: java.lang.IllegalStateException: Unable to instantiate java compiler
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:163)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.generateCompileAndInstantiate(JaninoRelMetadataProvider.java:141)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:73)
    at org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:165)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3951)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:197)
    at org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:118)
    at org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:844)
    at org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:307)
    at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337)
    at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:565)
    at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:428)
    at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:251)
    at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:130)
    at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:208)
    at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:195)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
    at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
    at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
    at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
    at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:329)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1805)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881)
    at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109)
    at org.apache.streampark.flink.core.FlinkSqlExecutor$.executeSql(FlinkSqlExecutor.scala:148)
    at org.apache.streampark.flink.core.FlinkStreamTableTrait.sql(FlinkStreamTableTrait.scala:88)
    at org.apache.streampark.flink.cli.SqlClient$StreamSqlApp$.handle(SqlClient.scala:90)
    at org.apache.streampark.flink.core.scala.FlinkStreamTable.main(FlinkStreamTable.scala:48)
    at org.apache.streampark.flink.core.scala.FlinkStreamTable.main$(FlinkStreamTable.scala:45)
    at org.apache.streampark.flink.cli.SqlClient$StreamSqlApp$.main(SqlClient.scala:89)
    at org.apache.streampark.flink.cli.SqlClient$.delayedEndpoint$org$apache$streampark$flink$cli$SqlClient$1(SqlClient.scala:78)
    at org.apache.streampark.flink.cli.SqlClient$delayedInit$body.apply(SqlClient.scala:34)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$1$adapted(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:388)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at org.apache.streampark.flink.cli.SqlClient$.main(SqlClient.scala:34)
    at org.apache.streampark.flink.cli.SqlClient.main(SqlClient.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    ... 27 more
Caused by: java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory
    at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
    at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:161)
    ... 93 more

Error Exception

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

wolfboys commented 3 months ago

Based on the exception message, we can roughly understand the cause of the issue: there may be a conflict between the loaded flink-table-planner jar and the planner in flink/lib

 java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory
    at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
    at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:161)