datavane / tis

Support agile DataOps Based on Flink, DataX and Flink-CDC, Chunjun with Web-UI
https://tis.pub
Apache License 2.0
989 stars 217 forks source link

k8s application 模式下SQL模式执行flink Job 出错 #308

Closed baisui1981 closed 5 months ago

baisui1981 commented 6 months ago

k8s application 模式下SQL模式执行flink Job,在k8s pod JM启动过程中会报以下异常

2024-03-28 05:42:24,248 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_402]
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_402]
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_402]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_402]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_402]
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_402]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_402]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_402]
        at org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:172) ~[?:?]
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59) [flink-rpc-akka774ad0b4-9fa8-457c-a227-b536281c7c82.jar:tis-1.18.1]
        at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57) [flink-rpc-akka774ad0b4-9fa8-457c-a227-b536281c7c82.jar:tis-1.18.1]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_402]
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_402]
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_402]
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_402]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
        ... 13 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not instantiate the executor. Make sure a planner module is on the classpath
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:130) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        ... 12 more
Caused by: com.qlangtech.tis.async.message.client.consumer.MQConsumeException: Could not instantiate the executor. Make sure a planner module is on the classpath
        at com.qlangtech.tis.plugins.incr.flink.cdc.mysql.FlinkCDCMysqlSourceFunction.start(FlinkCDCMysqlSourceFunction.java:225) ~[?:?]
        at com.qlangtech.tis.plugins.incr.flink.cdc.mysql.FlinkCDCMysqlSourceFunction.start(FlinkCDCMysqlSourceFunction.java:68) ~[?:?]
        at com.qlangtech.plugins.incr.flink.TISFlinkCDCStart.deploy(TISFlinkCDCStart.java:146) ~[tis-flink-extends-dist-4.0.0.jar:?]
        at com.qlangtech.plugins.incr.flink.TISFlinkCDCStart.main(TISFlinkCDCStart.java:76) ~[tis-flink-extends-dist-4.0.0.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_402]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_402]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_402]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_402]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:130) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        ... 12 more
Caused by: org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
        at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:109) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:110) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:122) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at com.qlangtech.tis.realtime.TableRegisterFlinkSourceHandle.processTableStream(TableRegisterFlinkSourceHandle.java:94) ~[?:?]
        at com.qlangtech.tis.realtime.BasicFlinkSourceHandle.consume(BasicFlinkSourceHandle.java:99) ~[tis-flink-extends-dist-4.0.0.jar:?]
        at com.qlangtech.tis.realtime.BasicFlinkSourceHandle.consume(BasicFlinkSourceHandle.java:49) ~[tis-flink-extends-dist-4.0.0.jar:?]
        at com.qlangtech.tis.plugins.incr.flink.cdc.mysql.FlinkCDCMysqlSourceFunction.start(FlinkCDCMysqlSourceFunction.java:223) ~[?:?]
        at com.qlangtech.tis.plugins.incr.flink.cdc.mysql.FlinkCDCMysqlSourceFunction.start(FlinkCDCMysqlSourceFunction.java:68) ~[?:?]
        at com.qlangtech.plugins.incr.flink.TISFlinkCDCStart.deploy(TISFlinkCDCStart.java:146) ~[tis-flink-extends-dist-4.0.0.jar:?]
        at com.qlangtech.plugins.incr.flink.TISFlinkCDCStart.main(TISFlinkCDCStart.java:76) ~[tis-flink-extends-dist-4.0.0.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_402]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_402]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_402]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_402]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:130) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        ... 12 more
Caused by: org.apache.flink.table.api.TableException: Unexpected error when trying to load service provider.
        at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:888) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:587) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:106) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:110) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:122) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at com.qlangtech.tis.realtime.TableRegisterFlinkSourceHandle.processTableStream(TableRegisterFlinkSourceHandle.java:94) ~[?:?]
        at com.qlangtech.tis.realtime.BasicFlinkSourceHandle.consume(BasicFlinkSourceHandle.java:99) ~[tis-flink-extends-dist-4.0.0.jar:?]
        at com.qlangtech.tis.realtime.BasicFlinkSourceHandle.consume(BasicFlinkSourceHandle.java:49) ~[tis-flink-extends-dist-4.0.0.jar:?]
        at com.qlangtech.tis.plugins.incr.flink.cdc.mysql.FlinkCDCMysqlSourceFunction.start(FlinkCDCMysqlSourceFunction.java:223) ~[?:?]
        at com.qlangtech.tis.plugins.incr.flink.cdc.mysql.FlinkCDCMysqlSourceFunction.start(FlinkCDCMysqlSourceFunction.java:68) ~[?:?]
        at com.qlangtech.plugins.incr.flink.TISFlinkCDCStart.deploy(TISFlinkCDCStart.java:146) ~[tis-flink-extends-dist-4.0.0.jar:?]
        at com.qlangtech.plugins.incr.flink.TISFlinkCDCStart.main(TISFlinkCDCStart.java:76) ~[tis-flink-extends-dist-4.0.0.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_402]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_402]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_402]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_402]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:130) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        ... 12 more
Caused by: java.util.ServiceConfigurationError: org.apache.flink.table.factories.Factory: Provider org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory not a subtype
        at java.util.ServiceLoader.fail(ServiceLoader.java:239) ~[?:1.8.0_402]
        at java.util.ServiceLoader.access$300(ServiceLoader.java:185) ~[?:1.8.0_402]
        at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376) ~[?:1.8.0_402]
        at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) ~[?:1.8.0_402]
        at java.util.ServiceLoader$1.next(ServiceLoader.java:480) ~[?:1.8.0_402]
        at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:879) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:587) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:106) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:110) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:122) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at com.qlangtech.tis.realtime.TableRegisterFlinkSourceHandle.processTableStream(TableRegisterFlinkSourceHandle.java:94) ~[?:?]
        at com.qlangtech.tis.realtime.BasicFlinkSourceHandle.consume(BasicFlinkSourceHandle.java:99) ~[tis-flink-extends-dist-4.0.0.jar:?]
        at com.qlangtech.tis.realtime.BasicFlinkSourceHandle.consume(BasicFlinkSourceHandle.java:49) ~[tis-flink-extends-dist-4.0.0.jar:?]
        at com.qlangtech.tis.plugins.incr.flink.cdc.mysql.FlinkCDCMysqlSourceFunction.start(FlinkCDCMysqlSourceFunction.java:223) ~[?:?]
        at com.qlangtech.tis.plugins.incr.flink.cdc.mysql.FlinkCDCMysqlSourceFunction.start(FlinkCDCMysqlSourceFunction.java:68) ~[?:?]
        at com.qlangtech.plugins.incr.flink.TISFlinkCDCStart.deploy(TISFlinkCDCStart.java:146) ~[tis-flink-extends-dist-4.0.0.jar:?]
        at com.qlangtech.plugins.incr.flink.TISFlinkCDCStart.main(TISFlinkCDCStart.java:76) ~[tis-flink-extends-dist-4.0.0.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_402]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_402]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_402]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_402]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:130) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-tis-1.18.1.jar:tis-1.18.1]
        ... 12 more

关键调用路径:

        at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:879) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:587) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:106) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:110) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]
        at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:122) ~[flink-table-api-java-uber-tis-1.18.1.jar:tis-1.18.1]

从而导致后续 发布流程中确认是否生成jobId失败,抛出异常:

Caused by: com.qlangtech.tis.lang.TisException: has not get jobId
        at com.qlangtech.tis.lang.TisException.create(TisException.java:170)
        at com.qlangtech.plugins.incr.flink.launch.clustertype.KubernetesApplication.deploy(KubernetesApplication.java:371)
        at com.qlangtech.plugins.incr.flink.launch.TISFlinkCDCStreamFactory.deploy(TISFlinkCDCStreamFactory.java:174)
        at com.qlangtech.plugins.incr.flink.launch.TISFlinkCDCStreamFactory.deploy(TISFlinkCDCStreamFactory.java:163)
        at com.qlangtech.tis.coredefine.module.action.TISK8sDelegate.deploy(TISK8sDelegate.java:141)
        at com.qlangtech.tis.coredefine.module.action.CoreAction.lambda$getFlinkJobWorkingOrchestrate$6(CoreAction.java:696)
        at com.qlangtech.tis.datax.job.JobResName$1.accept(JobResName.java:54)
        at com.qlangtech.tis.datax.job.SubJobResName.execute(SubJobResName.java:36)
        at com.qlangtech.tis.datax.job.JobResName.execSubJob(JobResName.java:71)
        at com.qlangtech.tis.coredefine.module.action.CoreAction.lambda$null$2(CoreAction.java:635)
        at com.qlangtech.tis.datax.job.ServerLaunchToken.writeLaunchToken(ServerLaunchToken.java:418)
baisui1981 commented 6 months ago
  1. org.apache.flink.table.factories.Factory : flink-table-common(已经被部署在lib/flink-table-api-java-uber-tis-1.18.1.jar中)
  2. org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory: flink-connector-jdbc
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.1.2-1.18</version>
        </dependency>
baisui1981 commented 6 months ago

参考tpi包依赖关系 https://www.processon.com/embed/65ced2ce01345442d78ed48e?cid=65ced2ce01345442d78ed491

baisui1981 commented 5 months ago

已经修复