gmmstrive / flink-connector-clickhouse

flink sql connector clickhouse zeppelin
78 stars 31 forks source link

请教下,flink写入clinkhouse,在flink监控日志出现Cannot load user class: com.thorntree.bigdata.table.ClickHouseSinkFunction是我哪里编译出现问题了吗? #10

Open breeze1126 opened 3 years ago

breeze1126 commented 3 years ago

2021-06-24 15:19:39,437 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job 0d970cef027a3313d275d9e2bab697d6. 2021-06-24 15:19:47,348 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot request 6fd6d829bcc9a798a3c13f9301590f35 for job a6bb348b05dff9e96b70677dc2e6a92d from resource manager with leader id 00000000000000000000000000000000. 2021-06-24 15:19:47,348 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated slot for 6fd6d829bcc9a798a3c13f9301590f35. 2021-06-24 15:19:47,348 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job a6bb348b05dff9e96b70677dc2e6a92d for job leader monitoring. 2021-06-24 15:19:47,348 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@localhost:6123/user/rpc/jobmanager_14 with leader id 00000000-0000-0000-0000-000000000000. 2021-06-24 15:19:47,351 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration 2021-06-24 15:19:47,354 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@localhost:6123/user/rpc/jobmanager_14 for job a6bb348b05dff9e96b70677dc2e6a92d. 2021-06-24 15:19:47,354 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish JobManager connection for job a6bb348b05dff9e96b70677dc2e6a92d. 2021-06-24 15:19:47,354 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer reserved slots to the leader of job a6bb348b05dff9e96b70677dc2e6a92d. 2021-06-24 15:19:47,356 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 6fd6d829bcc9a798a3c13f9301590f35. 2021-06-24 15:19:47,357 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 6fd6d829bcc9a798a3c13f9301590f35. 2021-06-24 15:19:47,357 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description]) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb), deploy into slot with allocation id 6fd6d829bcc9a798a3c13f9301590f35. 2021-06-24 15:19:47,357 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description]) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb) switched from CREATED to DEPLOYING. 2021-06-24 15:19:47,357 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task Source: TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description]) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb) [DEPLOYING]. 2021-06-24 15:19:47,358 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading a6bb348b05dff9e96b70677dc2e6a92d/p-18a455c70f62e58c9b3b79cbb077d4fe9d2f3e17-6882faedb95787ae435a9b7629c19672 from localhost/0:0:0:0:0:0:0:1:39096 2021-06-24 15:19:47,362 INFO org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: Source: TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description]) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb) [DEPLOYING]. 2021-06-24 15:19:47,362 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 2021-06-24 15:19:47,362 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description]) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb) switched from DEPLOYING to RUNNING. 2021-06-24 15:19:47,538 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description]) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb) switched from RUNNING to FAILED. org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.thorntree.bigdata.table.ClickHouseSinkFunction ClassLoader info: URL ClassLoader: Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:331) ~[flink-dist_2.11-1.12.3.jar:1.12.3] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:638) ~[flink-dist_2.11-1.12.3.jar:1.12.3] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:611) ~[flink-dist_2.11-1.12.3.jar:1.12.3] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:551) ~[flink-dist_2.11-1.12.3.jar:1.12.3] at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:172) ~[flink-dist_2.11-1.12.3.jar:1.12.3] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:517) ~[flink-dist_2.11-1.12.3.jar:1.12.3] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) ~[flink-dist_2.11-1.12.3.jar:1.12.3] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.11-1.12.3.jar:1.12.3] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-dist_2.11-1.12.3.jar:1.12.3] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292] Caused by: java.lang.ClassNotFoundException: com.thorntree.bigdata.table.ClickHouseSinkFunction at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_292] at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_292] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.11-1.12.3.jar:1.12.3] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.11-1.12.3.jar:1.12.3] at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_292] at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) ~[flink-dist_2.11-1.12.3.jar:1.12.3] at java.lang.Class.forName0(Native Method) ~[?:1.8.0_292] at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_292] at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76) ~[flink-dist_2.11-1.12.3.jar:1.12.3] at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1985) ~[?:1.8.0_292] at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1849) ~[?:1.8.0_292] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2159) ~[?:1.8.0_292] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666) ~[?:1.8.0_292] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404) ~[?:1.8.0_292] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328) ~[?:1.8.0_292] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186) ~[?:1.8.0_292] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666) ~[?:1.8.0_292] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404) ~[?:1.8.0_292] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328) ~[?:1.8.0_292] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186) ~[?:1.8.0_292] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666) ~[?:1.8.0_292] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502) ~[?:1.8.0_292] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460) ~[?:1.8.0_292] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) ~[flink-dist_2.11-1.12.3.jar:1.12.3] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) ~[flink-dist_2.11-1.12.3.jar:1.12.3] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) ~[flink-dist_2.11-1.12.3.jar:1.12.3] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) ~[flink-dist_2.11-1.12.3.jar:1.12.3] at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:317) ~[flink-dist_2.11-1.12.3.jar:1.12.3] ... 9 more 2021-06-24 15:19:47,539 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description]) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb). 2021-06-24 15:19:47,541 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description]) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 ba5e0effaa7a1bd54e9bc055d61447bb. 2021-06-24 15:19:47,565 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=384.000mb (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: 6fd6d829bcc9a798a3c13f9301590f35, jobId: a6bb348b05dff9e96b70677dc2e6a92d). 2021-06-24 15:19:47,565 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job a6bb348b05dff9e96b70677dc2e6a92d from job leader monitoring. 2021-06-24 15:19:47,565 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job a6bb348b05dff9e96b70677dc2e6a92d. 这个是flink上面输出的日志信息。

gmmstrive commented 3 years ago

2021年6月24日15:19:39437 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] -关闭作业0d970cef027a3313d275d9e2bab697d6的JobManager连接。 2021年6月24日15:19:47348 INFO org.apache.flink。 runtime.taskexecutor.TaskExecutor [] -从资源管理器接收时隙要求6fd6d829bcc9a798a3c13f9301590f35作业a6bb348b05dff9e96b70677dc2e6a92d与领导者ID 00000000000000000000000000000000. 2021年6月24日15:19:47348信息org.apache.flink.runtime.taskexecutor.TaskExecutor [] - 为 6fd6d829bcc9a798a3c13f9301590f35 分配的插槽。 2021-06-24 15:19:47,348 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - 以作业作业进行a690db3d70d4d448888667b7d3d关注点48988886667db3d关注 2021-06-24 15:19:47,348 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - 尝试在作业管理器 akka.tcp://flink@localhost:6123/user/rpc/jobmanager_14 与领导者注册ID 00000000-0000-0000-0000-000000000000。 2021年6月24日15:19:47351 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] -解析 JobManager地址,开始注册2021年6月24日15: 19:47,354 INFO org.apache.flink.runtime taskexecutor.DefaultJobLeaderService [] - 在作业管理器 akka.tcp://flink@localhost:6123/user/rpc/jobmanager_14 上成功注册作业 a6bb348b05dff9e96b706a2204d- e96b706a7221d-c :19:47,354 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - 为作业 a6bb348b05dff9e96b70677dc2e6a92d 建立 JobManager 连接。 2021-06-24 15:19:47,354 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - 为工作 a6bb348b05dff9e96b70677dc2e6a92d 的领导者提供保留的位置。 202161:344-06-apache .flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] -激活插槽6fd6d829bcc9a798a3c13f9301590f35。 2021年6月24日15:19:47357信息org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] -激活插槽6fd6d829bcc9a798a3c13f9301590f35。 2021-06-24 15:19:47,357 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - 收到的任务来源:TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description]) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb),部署到槽中,分配ID为6fd6d8298f30c53000 2021-06-24 15:19:47,357 INFO org.apache.flink.runtime.taskmanager.Task [] - 来源:TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description] ) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb) 从 CREATED 切换到 DEPLOYING 。 6:241-0 19:47,357 INFO org.apache.flink.runtime.taskmanager.Task [] - 加载任务源的 JAR 文件:TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id , name, description]) - > Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb) [DEPLOYING]。 2021年6月24日15:19:47358 INFO org.apache.flink.runtime.blob.BlobClient [] -下载a6bb348b05dff9e96b70677dc2e6a92d /对- 18a455c70f62e58c9b3b79cbb077d4fe9d2f3e17-6882faedb95787ae435a9b7629c19672从本地主机/ 0:0:0:0:0:0: 0: 1: 39096 2021-06-24 15:19:47,362 INFO org.apache.flink.runtime.taskmanager.Task [] - 在网络注册任务:来源:TableSourceScan(table=[[default_catalog, default_database, source_products]] , fields =[id, name, description]) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb) [DEPLOYING] 。 2021年6月24日15:19:47362 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] -没有配置状态后端,使用默认(记忆/ JobManager)MemoryStateBackend(堆内存中的数据/检查点到JobManager)(检查点:'null',保存点:'null',异步:TRUE,maxStateSize:5242880) 2021-06-24 15:19:47,362 INFO org.apache.flink.runtime.taskmanager.Task [] - 来源: TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description] ) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb) 从 DEPLOYING 切换到 RUNNING。 6:-241-0 19:47,538 WARN org.apache.flink.runtime.taskmanager.Task [] - 来源:TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description] ) -> Sink: Sink( table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb) 从 RUNNING 切换到 FAILED。 org.apache.flink.streaming.runtime.tasks.StreamTaskException:无法加载用户类:com.thorntree.bigdata.table.ClickHouseSinkFunction ClassLoader 信息:URL ClassLoader: 类无法通过给定的类加载器解析。 在 org.apache. flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:331) ~[flink-dist_2.11-1.12.3.jar:1.12.3] 在org.apache.flink.streaming.runtime.tasks.OperatorChain .createOperator(OperatorChain.java:638) ~[flink-dist_2.11-1.12.3.jar:1.12.3] 在org.apache.flink.streaming。 runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:611) ~[flink-dist_2.11-1.12.3.jar:1.12.3] 在 org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain) .java:551) ~[flink-dist_2.11- 1.12.3.jar:1.12.3] 在 org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:172) ~[flink-dist_2 .11-1.12.3.jar:1.12.3] 在 org.apache.flink.streaming.runtime .tasks.StreamTask.beforeInvoke(StreamTask.java:517) ~[flink-dist_2.11-1.12.3.jar: 1.12.3] 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask. java:573) ~[flink-dist_2.11-1.12.3.jar:1.12.3] 在org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.11- 1.12.3.jar:1.12.3] 在 org.apache.flink.runtime.taskmanager.Task .run(Task.java:570) [flink-dist_2.11-1.12.3.jar:1.12.3] 在 java .lang.Thread.run(Thread.java:748) [?:1.8.0_292] 引入 :java.lang.ClassNotFoundException:com.thorntree.bigdata.table.ClickHouseSinkFunction 在 java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_292] 在 java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_292] 在 org.apache .flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.11-1.12.3.jar:1.12.3] 在org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java) 48) ~[flink-dist_2.11-1.12.3.jar:1.12.3] 在 java.lang.ClassLoader.loadClass(ClassLoader.java: 351) ~[?:1.8.0_292] 在 org.apache.flink。 runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) ~[flink-dist_2.11-1.12.12.jar: .3] 在 java.lang.Class.forName0(Native Method) ~[? :1.8.0_292] 在 java.lang.Class.forName(Class.java:348) ~[?:1.8.0_292] 在 org.apache .flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76) ~[flink-dist_2. 11-1.12.3.jar:1.12.3] 在java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1985) ~[?:1.8.0_292] 在java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1849) ~[?:1.8.0_292] 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2159) ~[?:1.8.0_292] 在 java.io .ObjectInputStream.readObject0(ObjectInputStream.java:1666) ~[?: 1.8.0_292] 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404) ~[?:1.8.0_292] 在 java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2328) ~[?:1.8.0_292] 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186) ~[?:1.8.0_292] 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java) :1666) ~[?:1.8.0_292] 在 java. io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404) ~[?:1.8.0_292] 在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328) ~[?:1.8.0_292] 在java.io.ObjectInputStream。 readOrdinaryObject(ObjectInputStream.java:2186) ~[?:1.8.0_292] 在 java.io .ObjectInputStream.readObject0(ObjectInputStream.java:1666) ~[?:1.8.0_292] 在 java.io.ObjectInputStream.readObject(ObjectInputStream. java:502) ~[?:1.8.0_292] 在 java.io.ObjectInputStream.readObject (ObjectInputStream.java:460) ~[?:1.8.0_292] 在 org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) ~[flink-dist_2.11-1.12.3.jar:1.12。 3] 在 org.apache.flink.util.InstantiationUtil.deserializeObject( InstantiationUtil.java:600) ~[flink-dist_2.11-1.12.3.jar:1.12.3] 在org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) ~[flink-dist_2.11 -1.12.3.jar:1.12.3] 在 org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) ~[flink-dist_2.11-1.12.3.jar:1.12.3] 在 org .apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:317) ~[flink -dist_2.11-1.12.3.jar:1.12.3] ... 9 更多 2021-06-24 15:19:47,539 INFO org.apache.flink.runtime.taskmanager.Task [] - 为源释放任务资源:TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description]) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description] ) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb)。 2021-06-294 15: :47,541 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - 取消注册任务标记最终执行状态发送到 JobManager 的任务源失败:TableSourceScan(table=[[default_catalog, default_database) , source_products]], fields= [id, name, description]) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 ba5e0effaa7a1bd54e9bc055d61447bb。 2021-06-24 15:1947,565 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - 插槽TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.00000000000000000000000000000000000000000000000000004 (402653174个字节),taskOffHeapMemory = 0字节,managedMemory = 512.000mb(536870920个字节),networkMemory = 128.000mb(134217730个字节)},allocationId:6fd6d829bcc9a798a3c13f9301590f35,的的jobId:a6bb348b05dff9e96b70677dc2e6a92d)。 2021-06- 24 15:19:47,565 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - 从作业负责人监控中删除作业 a6bb348b05dff9e96b70677dc2e6a92d。2021-06-295.15.15 : orgtime.link taskexecutor.TaskExecutor [] - 关闭作业a6bb348b05dff9e96b70677dc2e6a92d的JobManager连接。这 是flink输出的日志信息。

是不是没有配置好 lib 包 什么的,这个应该是环境问题,不像是代码问题

gmmstrive commented 3 years ago

2021年6月24日15:19:39437 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] -关闭作业0d970cef027a3313d275d9e2bab697d6的JobManager连接。 2021年6:1月2944415 org.apache.flink.runtime.taskexecutor.TaskExecutor 。runtime.taskexecutor.TaskExecutor [] -从资源管理器接收时隙要求6fd6d829bcc9a798a3c13f9301590f35作业a6bb348b05dff9e96b70677dc2e6a92d与领导者ID 00000000000000000000000000000000. 2021年6月24日15:19:47348信息org.apache.flink.runtime.taskexecutor.TaskExecutor [ ] -为6fd6d829bcc9a798a3c13f9301590f35分配的插槽。 2021年6月24日15:19:47348 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] -以作业作业进行a690db3d70d4d448888667b7d3d关注点48988886667db3d关注 2021-06-24 15:19:47,348 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - 尝试在作业管理器 akka.tcp://flink@localhost:6123/user/rpc/jobmanager_14 与领导者注册ID 00000000-0000-0000-0000-000000000000。 2021年6月24日15:19:47351 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] -解析 JobManager地址,开始注册2021年6月24日15: 19:47,354 INFO org.apache.flink.runtime taskexecutor.DefaultJobLeaderService [] - 在作业管理器akka.tcp://flink@localhost:6123/user/rpc/jobmanager_14 上成功注册作业 a6bb348b05dff9e92962005dff9e9296b72d70a- c6b72d 19:47,354 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - 为作业a6bb348b05dff9e96b70677dc2e6a92d建立JobManager连接。 2021-06-24 15:19:47,354 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - 为工作 a6bb348b05dff9e96b70677dc2e6a92d 的领导者提供保留的位置。 20324.0616:link apacheruntime.taskexecutor.executor slot.TaskSlotTableImpl [] -激活插槽6fd6d829bcc9a798a3c13f9301590f35。 2021年6月24日15:19:47357信息org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] -6d9f90403030303030303030303030c103030303030c1030303000000c1030000000000 :19:47,357 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - 收到的任务来源:TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description]) - > Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb),部署到槽中,分配ID为6fd6d8298f30c50 2021-06-24 15:19:47,357 INFO org.apache.flink.runtime.taskmanager.Task [] - 来源:TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description] ) - >水槽:水槽(表= [default_catalog.default_database.sink_products],字段= [ID,名称,描述])(1/1)#0(ba5e0effaa7a1bd54e9bc055d61447bb)从CREATED切换到 部署。 6:241-0 19: 47,357 INFO org.apache.flink.runtime.taskmanager.Task [] - 加载任务源的 JAR 文件:TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id , name, description]) -> Sink : Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb) [DEPLOYING]。 2021年6月24日15:19:47358 INFO org.apache.flink.runtime.blob.BlobClient [] -下载a6bb348b05dff9e96b70677dc2e6a92d /对- 18a455c70f62e58c9b3b79cbb077d4fe9d2f3e17-6882faedb95787ae435a9b7629c19672从本地主机/ 0:0:0:0:0:0: 0: 1: 39096 2021-06-24 15:19:47,362 INFO org.apache.flink.runtime.taskmanager.Task [] - 在网络注册任务:来源:TableSourceScan(table=[[default_catalog, default_database, source_products]] , fields =[id, name, description]) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb) [DEPLOYING] 。 2021年6月24日15:19:47362 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] -没有配置状态后端,使用默认(记忆/ JobManager)MemoryStateBackend(堆内存中的数据/检查点到JobManager)(检查点:'null',保存点:'null',异步:TRUE,maxStateSize:5242880) 2021-06-24 15:19:47,362 INFO org.apache.flink.runtime.taskmanager.Task [] - 来源: TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description] ) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb) 从 DEPLOYING 切换到 RUNNING。 6:-1941 :47,538 WARN org.apache.flink.runtime.taskmanager.Task [] - 来源:TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description] ) -> Sink: Sink( table =[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb) 从 RUNNING 切换到 FAILED。 org.apache.flink.streaming.runtime.tasks.StreamTaskException:无法加载用户类:com.thorntree.bigdata.table.ClickHouseSinkFunction ClassLoader 信息:URL ClassLoader: 类无法通过给定的类加载器解析。 在 org.apache. flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:331) ~[flink-dist_2.11-1.12.3.jar:1.12.3] 在org.apache.flink.streaming.runtime.tasks.OperatorChain .createOperator(OperatorChain.java:638) ~[flink-dist_2.11-1.12.3.jar:1.12.3] 在org.apache.flink.streaming。 runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:611) ~[flink-dist_2.11-1.12.3.jar:1.12.3] 在org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain) .java:551) ~[flink-dist_2.11- 1.12.3.jar:1.12.3] 在 org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:172) ~[flink-dist_2 .11-1.12.3.jar:1.12.3] 在 org.apache.flink.streaming.runtime .tasks.StreamTask.beforeInvoke(StreamTask.java:517) ~[flink-dist_2.11-1.12.3.jar: 1.12.3] 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask. java:573) ~[flink-dist_2.11-1.12.3.jar:1.12.3] 在org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.11- 1.12.3.jar:1.12.3] 在org.apache.flink.runtime.taskmanager.Task .run(Task.java:570) [flink-dist_2.11-1.12.3.jar:1.12.3] 在java .lang.Thread.run(Thread.java:748) [?:1.8.0_292] 日期 :java.lang.ClassNotFoundException:com.thorntree.bigdata.table.ClickHouseSinkFunction 在 java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_292] 在 java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_292] 在 org.apache .flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.11-1.12.3.jar:1.12.3] 在org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java) 48) ~[flink-dist_2.11-1.12.3.jar:1.12.3] 在java.lang.ClassLoader.loadClass(ClassLoader.java: 351) ~[?:1.8.0_292] 在org.apache.flink。 runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) ~[flink-dist_2.11-1.12.12.jar: .3] 在 java.lang.Class.forName0(Native Method) ~[? :1.8.0_292] 在 java.lang.Class.forName(Class.java:348) ~[?:1.8.0_292] 在 org.apache .flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76) ~[flink-dist_2. 11-1.12.3.jar:1.12.3] 在java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1985) ~[?:1.8.0_292] 在java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1849) ~[?:1.8.0_292] 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2159) ~[?:1.8.0_292] 在 java.io .ObjectInputStream.readObject0(ObjectInputStream.java:1666) ~[?: 1.8.0_292] 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404) ~[?:1.8.0_292] 在 java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2328) ~[?:1.8.0_292] 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186) ~[?:1.8.0_292] 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java) :1666) ~[?:1.8.0_292] 在 java. io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404) ~[?:1.8.0_292] 在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328) ~[?:1.8.0_292] 在java.io.ObjectInputStream。 readOrdinaryObject(ObjectInputStream.java:2186) ~[?:1.8.0_292] 在 java.io .ObjectInputStream.readObject0(ObjectInputStream.java:1666) ~[?:1.8.0_292] 在 java.io.ObjectInputStream.readObject(ObjectInputStream. java:502) ~[?:1.8.0_292] 在 java.io.ObjectInputStream.readObject (ObjectInputStream.java:460) ~[?:1.8.0_292] 在 org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) ~[flink-dist_2.11-1.12.3.jar:1.12。 3] 在 org.apache.flink.util.InstantiationUtil.deserializeObject( InstantiationUtil.java:600) ~[flink-dist_2.11-1.12.3.jar:1.12.3] 在org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) ~[flink-dist_2.11 -1.12.3.jar:1.12.3] 在org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) ~[flink-dist_2.11-1.12.3.jar:1.12.3] 在org .apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:317) ~[flink -dist_2.11-1.12.3.jar:1.12.3] ... 9 更多 2021-06-24 15:19:47,539 INFO org.apache.flink.runtime.taskmanager.Task [] - 为源释放任务资源:TableSourceScan(table=[[default_catalog, default_database, source_products]], fields=[id, name, description]) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description] ) (1/1)#0 (ba5e0effaa7a1bd54e9bc055d61447bb) 。2021-06-294:15 47,541 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - 取消注册任务标记最终执行状态发送到JobManager的任务源失败:TableSourceScan(table=[[default_catalog, default_database) , source_products]], fields= [id , name, description]) -> Sink: Sink(table=[default_catalog.default_database.sink_products], fields=[id, name, description]) (1/1)#0 ba5e0effaa7a1bd54e9bc055d61447bb。 2021-06-24 15:1947,565 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - 插槽TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.00000000000000000000400004000000000000000400000000000000000000000000字节),taskOffHeapMemory = 0字节,managedMemory = 512.000mb(536870920个字节),networkMemory = 128.000mb(134217730个字节)},allocationId:6fd6d829bcc9a798a3c13f9301590f35,的的的jobId:a6bb348b05dff9e96b70677dc2e6a92d)。 2021-06- 24 15 :19:47565 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] -从作业负责人监控中删除作业a6bb348b05dff9e96b70677dc2e6a92d.2021-06-295.15.15 :orgtime.link taskexecutor.TaskExecutor [] -关闭作业a6bb348b05dff9e96b70677dc2e6a92d的JobManager连接。这 是flink输出的日志信息。

另外一个是flink版本是1.11.2 ,新版本的我还没有尝试,如果后面有更新使用我会继续改的

breeze1126 commented 3 years ago

嗯嗯,好。我用的版本是flink1.12的,编译和部署都没有问题,就是把任务提交上flink之后出现的这个异常。那我再去检查一下环境变量看看。

breeze1126 commented 3 years ago

嗯嗯,好。我用的版本是 flink1.12 的,编译和部署都没有问题,就是把任务提交上 flink 之后出现的这个异常。我再去检查一下环境启动器看看。

kaiwangleo commented 2 years ago

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.sink_app_basic_ed_hardware_status_new'.

Table options are:

'connector'='clickhouse' 'format'='json' 'password'='nMtk1aYu' 'table-name'='basic_ed_hardware_status_16_new' 'url'='jdbc:clickhouse://10.204.209.171:8123,10.204.209.172:8123,10.204.209.173:8123,10.204.209.174:8123,10.204.209.175:8123/default' 'username'='chadmin' at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97) at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:161) at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:86) Caused by: org.apache.flink.table.api.ValidationException: One or more required options are missing.

Missing required options are:

local-table-name at org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:285) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:482) at com.fcbox.streaming.connector.clickhouse.table.ClickHouseDynamicTableFactory.createDynamicTableSink(ClickHouseDynamicTableFactory.java:88) at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:161) ... 18 more

你好,请问下我这里报错需要local-table-name配置,是什么情况啊?

breeze1126 commented 2 years ago

Caused by: org.apache.flink.table.api.ValidationException: One or more required options are missing. 这个需要麻烦检查一下 是不是少配置了什么

k7gxn56 commented 2 years ago

下面这个类加载的问题解决了吗?我通过分布式缓存(DistributedCache)中获取hdfs的jar路径然后进行类加载创建自定义函数也报这个错误 ClassLoader info: URL ClassLoader: Class not resolvable through given classloader.