apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.06k stars 1.83k forks source link

[Bug] [mysqlCDC-to-kafka] Unable to create a source for identifier 'MySQL-CDC' #8063

Open aiyi926 opened 1 week ago

aiyi926 commented 1 week ago

Search before asking

What happened

I used the entire library of seatunnel-web-1.0.2 to synchronize mysqlCDC to kafka and reported an error that did not recognize MySQL-CDC and reported a null pointer exception

SeaTunnel Version

2.3.8

SeaTunnel Config

{
    "transform" : [],
    "sink" : [
        {
            "format" : "COMPATIBLE_DEBEZIUM_JSON",
            "bootstrap.servers" : "192.168.36.57:19093",
            "source_table_name" : "Table15642877834144",
            "semantics" : "NON",
            "plugin_name" : "Kafka"
        }
    ],
    "source" : [
        {
            "schema" : {
                "fields" : {
                    "topic" : "string",
                    "value" : "string",
                    "key" : "string"
                }
            },
            "inverse-sampling.rate" : "1000",
            "catalog" : {
                "factory" : "Mysql"
            },
            "parallelism" : "1",
            "table-names" : [
                "test.user"
            ],
            "chunk-key.even-distribution.factor.lower-bound" : "0.05",
            "database-names" : [
                "test"
            ],
            "password" : "******",
            "sample-sharding.threshold" : "1000",
            "incremental.parallelism" : "1",
            "snapshot.fetch.size" : "1024",
            "connect.max-retries" : "3",
            "base-url" : "jdbc:mysql://192.168.36.57:27508/seatunnel?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true",
            "startup.mode" : "INITIAL",
            "format" : "COMPATIBLE_DEBEZIUM_JSON",
            "result_table_name" : "Table15642877834144",
            "server-time-zone" : "UTC",
            "plugin_name" : "MySQL-CDC",
            "exactly_once" : "false",
            "connection.pool.size" : "20",
            "snapshot.split.size" : "8096",
            "stop.mode" : "NEVER",
            "chunk-key.even-distribution.factor.upper-bound" : "100.0",
            "connect.timeout.ms" : "30000",
            "dag-parsing.mode" : "MULTIPLEX",
            "username" : "******"
        }
    ],
    "env" : {
        "job.mode" : "STREAMING",
        "job.name" : "SeaTunnel_Job",
        "savemode.execute.location" : "CLUSTER"
    }
}

Running Command

使用的是web页面

Error Exception

2024-11-15 11:13:04.786 trendy_seatunnel ngfa-k8s01 INFO [qtp1710483461-1526] [MultipleTableJobConfigParser.fillJobConfigAndCommonJars():317] - add common jar in plugins :[file:/opt/apache-seatunnel-2.3.8/plugins/mysql-connector-java-8.0.19.jar]
2024-11-15 11:13:04.788 trendy_seatunnel ngfa-k8s01 INFO [qtp1710483461-1526] [AbstractPluginDiscovery.<init>():116] - Load SeaTunnelSink Plugin from /opt/apache-seatunnel-2.3.8/connectors
2024-11-15 11:13:04.789 trendy_seatunnel ngfa-k8s01 INFO [qtp1710483461-1526] [AbstractPluginDiscovery.findPluginJarPath():446] - Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='MySQL-CDC'} at: file:/opt/apache-seatunnel-2.3.8/connectors/connector-cdc-mysql-2.3.8.jar
2024-11-15 11:13:04.790 trendy_seatunnel ngfa-k8s01 INFO [qtp1710483461-1526] [AbstractPluginDiscovery.findPluginJarPath():446] - Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='sink', pluginName='Kafka'} at: file:/opt/apache-seatunnel-2.3.8/connectors/connector-kafka-2.3.8.jar
2024-11-15 11:13:04.791 trendy_seatunnel ngfa-k8s01 WARN [qtp1710483461-1526] [ConfigUtil.convertValue():64] - Option 'source_table_name' is a List, and it is recommended to configure it as ["string1","string2"]; we will only use ',' to split the String into a list.
2024-11-15 11:13:04.791 trendy_seatunnel ngfa-k8s01 INFO [qtp1710483461-1526] [MultipleTableJobConfigParser.parse():205] - start generating all sources.
2024-11-15 11:13:04.812 trendy_seatunnel ngfa-k8s01 ERROR [qtp1710483461-1526] [JobExecutorServiceImpl.executeJobBySeaTunnel():128] - Job execution submission failed.
org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a source for identifier 'MySQL-CDC'.
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:101)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:375)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:209)
        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:114)
        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:182)
        at org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.executeJobBySeaTunnel(JobExecutorServiceImpl.java:126)
        at org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.jobExecute(JobExecutorServiceImpl.java:79)
        at org.apache.seatunnel.app.controller.JobExecutorController.jobExecutor(JobExecutorController.java:64)
        at sun.reflect.GeneratedMethodAccessor393.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
        at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150)
        at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
        at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
        at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067)
        at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963)
        at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
        at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:517)
        at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:584)
        at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
        at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1631)
        at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
        at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
        at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
        at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
        at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
        at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
        at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
        at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
        at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
        at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548)
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
        at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:600)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:235)
        at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
        at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
        at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501)
        at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
        at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
        at org.eclipse.jetty.server.Server.handle(Server.java:516)
        at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
        at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
        at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
        at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
        at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException: null
        at org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema.<init>(DebeziumJsonDeserializeSchema.java:41)
        at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.MySqlIncrementalSource.createDebeziumDeserializationSchema(MySqlIncrementalSource.java:95)
        at org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource.<init>(IncrementalSource.java:112)
        at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.MySqlIncrementalSource.<init>(MySqlIncrementalSource.java:54)
        at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.MySqlIncrementalSourceFactory.lambda$createSource$1(MySqlIncrementalSourceFactory.java:116)
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:113)
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:74)
        ... 68 common frames omitted

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

image

Are you willing to submit PR?

Code of Conduct