apache / flink-cdc

Flink CDC is a streaming data integration tool
https://nightlies.apache.org/flink/flink-cdc-docs-stable
Apache License 2.0
5.7k stars 1.94k forks source link

Flink SQL Source Mysql To Sink PoshtgreSQL NoClassDefFoundError: org/apache/flink/util/function/SerializableFunction #1457

Closed have1568 closed 8 months ago

have1568 commented 2 years ago

Flink Version 1.14

jar file:

flink-connector-jdbc_2.12-1.14.4 flink-sql-connector-mysql-cdc-2.2.1 postgresql-42.2.20 mysql-connector-java-8.0.25

Source SQL: CREATE TABLE products_mysql ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'xxxxxx', 'port' = '3306', 'username' = 'xxxxx', 'password' = 'xx', 'database-name' = 'xxxxxx', 'table-name' = 'products' );

Sink SQL: CREATE TABLE products_pg ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'XXXXX', 'driver' = 'org.postgresql.Driver', 'username' = 'XXXX', 'password' = 'XXXXX', 'table-name' = 'products' );

INSERT INTO products_pg SELECT * FROM products_mysql;

ERROR Message :

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/apache/flink/util/function/SerializableFunction at org.apache.flink.connector.jdbc.table.JdbcOutputFormatBuilder.build(JdbcOutputFormatBuilder.java:100) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink.getSinkRuntimeProvider(JdbcDynamicTableSink.java:89) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:121) at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:140) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:71) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752) at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeModifyOperations$4(LocalExecutor.java:222) at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88) at org.apache.flink.table.client.gateway.local.LocalExecutor.executeModifyOperations(LocalExecutor.java:222) at org.apache.flink.table.client.cli.CliClient.callInserts(CliClient.java:571) at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:560) at org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:420) at org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$1(CliClient.java:332) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:325) at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297) at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221) at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) ... 1 more Caused by: java.lang.NoClassDefFoundError: org/apache/flink/util/function/SerializableFunction at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:756) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:473) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 35 more Caused by: java.lang.ClassNotFoundException: org.apache.flink.util.function.SerializableFunction at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 47 more

Shutting down the session... done.

ruanhang1993 commented 2 years ago

org.apache.flink.util.function.SerializableFunction is a class from flink-core. Do you add the dependencies from Flink?

PatrickRen commented 8 months ago

Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!