apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.81k stars 1.76k forks source link

[Feature][MySQL-CDC] MySQL CDC 总是报错 Generate Splits for table xx.xx error #7393

Open inuex opened 1 month ago

inuex commented 1 month ago

Search before asking

Description

已经启用了 MySQL 的 Binlog 的情况下, 使用MySQL CDC ,希望两表实时同步,启动脚本总是报以下错误,如何解决:

Caused by: java.lang.RuntimeException: Generate Splits for table test.log error

config配置:

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}

source {
  MySQL-CDC {
    base-url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
    username = "root"
    password = "123456"
    table-names = ["test.log"]
    table-names-config = [
      {
        table = "test.log"
        primaryKeys = ["id"]
      }
    ]
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = "123456"
    generate_sink_sql = true
    database = "test"
    table = "log_1"
    primary_keys = ["id"]
  }
}

启动脚本

cd /d D:\Servers\apache-seatunnel-2.3.6
.\bin\seatunnel.cmd --config .\script\o1.conf -m local

运行日志 console.txt

Usage Scenario

No response

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

liunaijie commented 1 month ago

please attach the detail log

inuex commented 1 month ago

please attach the detail log

log.txt

liunaijie commented 1 month ago

please attach the detail log

log.txt

the log you attached is oracle-cdc. not mysql-cdc. please attach the config as well.

chen1274528094 commented 1 month ago

I also encountered this problem in version 2.3.6, is there a solution

chen1274528094 commented 1 month ago

its my log and config, i use it by
log.txt

`env { execution.parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 10000 }

source { MySQL-CDC {

    username  = "xxx"
    password = "xxx"

    table-names = ["dhg_copy.per_orgs"]
    result_table_name = "per_orgs"
    base-url = "jdbc:mysql://ip:3306/dhg_copy"
    server-id = 5656

    startup.mode="initial"
    stop.mode="never"
snapshot.split.size=10000000

  }

}

sink {

  jdbc {
    url = "jdbc:mysql://ip:3306/test-seatunnel"
    driver = "com.mysql.cj.jdbc.Driver"
     user = "xxx"
    password = "xxx"
    generate_sink_sql = true    
    database = test-seatunnel
    table = per_orgs
    primary_keys = ["id"]
  }

}`

liunaijie commented 1 month ago

its my log and config, i use it by log.txt

`env { execution.parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 10000 }

source { MySQL-CDC {

    username  = "xxx"
    password = "xxx"

    table-names = ["dhg_copy.per_orgs"]
    result_table_name = "per_orgs"
    base-url = "jdbc:mysql://ip:3306/dhg_copy"
    server-id = 5656

    startup.mode="initial"
    stop.mode="never"
snapshot.split.size=10000000

  }

}

sink {

  jdbc {
    url = "jdbc:mysql://ip:3306/test-seatunnel"
    driver = "com.mysql.cj.jdbc.Driver"
     user = "xxx"
    password = "xxx"
    generate_sink_sql = true  
    database = test-seatunnel
    table = per_orgs
    primary_keys = ["id"]
  }

}`

the log download link is invalid

chen1274528094 commented 1 month ago

log.txt i upload that again, its used by dolohinscheduler @liunaijie

inuex commented 1 month ago

please attach the detail log

log.txt

the log you attached is oracle-cdc. not mysql-cdc. please attach the config as well.

mysql.log

liunaijie commented 1 month ago

i checked the log file, find the current system has some bug.

  1. in some case get table pimary key not check table exist, and table schema generate null

    Create sink 'jdbc' with upstream input catalog-table[database: dhg_copy, schema: null, table: per_orgs]
  2. the jdbc split enumerator has some issue, will cancel the job.

    2024-08-16 11:51:01,688 INFO  io.debezium.jdbc.JdbcConnection - Connection gracefully closed
    2024-08-16 11:51:01,697 INFO  io.debezium.jdbc.JdbcConnection - Connection gracefully closed
    2024-08-16 11:51:01,698 INFO  org.apache.seatunnel.connectors.cdc.base.source.enumerator.SnapshotSplitAssigner - SnapshotSplitAssigner created with remaining tables: [test.log]
    2024-08-16 11:51:01,698 INFO  org.apache.seatunnel.connectors.cdc.base.source.enumerator.SnapshotSplitAssigner - SnapshotSplitAssigner created with remaining splits: []
    2024-08-16 11:51:01,698 INFO  org.apache.seatunnel.connectors.cdc.base.source.enumerator.SnapshotSplitAssigner - SnapshotSplitAssigner created with assigned splits: []
    2024-08-16 11:51:01,769 INFO  org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase - Open Source Reader.
    2024-08-16 11:51:01,769 INFO  org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask - received reader register, readerID: TaskLocation{taskGroupLocation=TaskGroupLocation{jobId=876674346033086465, pipelineId=1, taskGroupId=30000}, taskID=40000, index=0}
    2024-08-16 11:51:01,784 INFO  org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase - Open Source Reader.
    2024-08-16 11:51:01,784 INFO  org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask - received reader register, readerID: TaskLocation{taskGroupLocation=TaskGroupLocation{jobId=876674346033086465, pipelineId=1, taskGroupId=30001}, taskID=40001, index=1}
    2024-08-16 11:51:01,789 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - checkpoint is enabled, start schedule trigger pending checkpoint.
    2024-08-16 11:51:01,848 INFO  org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask - received enough reader, starting enumerator...
    2024-08-16 11:51:01,880 INFO  org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter - Start splitting table test.log into chunks...
    2024-08-16 11:51:01,931 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-623149] [5.1] taskDone, taskId = 40000, taskGroup = TaskGroupLocation{jobId=876674346033086465, pipelineId=1, taskGroupId=30000}
    2024-08-16 11:51:01,931 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-623149] [5.1] task 40000 error with exception: [java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Generate Splits for table test.log error], cancel other task in taskGroup TaskGroupLocation{jobId=876674346033086465, pipelineId=1, taskGroupId=30000}.
    2024-08-16 11:51:01,931 INFO  org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase - Closing Source Reader 0.

@chen1274528094 for your case, maybe you can manually add schema config to source table to avoid this issue, refer this doc https://seatunnel.apache.org/docs/concept/schema-feature/#how-to-use-schema

@inuex for your case, still need check.

liunaijie commented 1 month ago

cc @hailin0 @Hisoka-X looks like 2.3.6 jdbc connector has some issue need to fix

QQxiaoyuyu commented 1 week ago

请问这个问题解决了吗,我使用2.3.7版本,同样遇到了这个问题

QQxiaoyuyu commented 1 week ago

我的报错是这样的 2024-09-03 10:28:54,944 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Fatal Error,

2024-09-03 10:28:54,944 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues

2024-09-03 10:28:54,944 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Reason:SeaTunnel job executed failed

2024-09-03 10:28:54,946 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:211) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: Generate Splits for table test1.agents_deals error at org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter.generateSplits(AbstractJdbcSourceChunkSplitter.java:112) at org.apache.seatunnel.connectors.cdc.base.source.enumerator.SnapshotSplitAssigner.getNext(SnapshotSplitAssigner.java:185) at org.apache.seatunnel.connectors.cdc.base.source.enumerator.HybridSplitAssigner.getNext(HybridSplitAssigner.java:98) at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:172) at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator.run(IncrementalSourceEnumerator.java:70) at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.stateProcess(SourceSplitEnumeratorTask.java:319) at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.call(SourceSplitEnumeratorTask.java:138) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:717) at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1039) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.seatunnel.api.table.catalog.CatalogTable.getTableSchema()" because the return value of "java.util.Map.get(Object)" is null at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.MySqlDialect.getPrimaryKey(MySqlDialect.java:130) at org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter.getSplitColumn(AbstractJdbcSourceChunkSplitter.java:382) at org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter.generateSplits(AbstractJdbcSourceChunkSplitter.java:64) ... 13 more

    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:203)
    ... 2 more
liunaijie commented 1 week ago

我的报错是这样的 2024-09-03 10:28:54,944 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Fatal Error,

2024-09-03 10:28:54,944 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues

2024-09-03 10:28:54,944 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Reason:SeaTunnel job executed failed

2024-09-03 10:28:54,946 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:211) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: Generate Splits for table test1.agents_deals error at org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter.generateSplits(AbstractJdbcSourceChunkSplitter.java:112) at org.apache.seatunnel.connectors.cdc.base.source.enumerator.SnapshotSplitAssigner.getNext(SnapshotSplitAssigner.java:185) at org.apache.seatunnel.connectors.cdc.base.source.enumerator.HybridSplitAssigner.getNext(HybridSplitAssigner.java:98) at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:172) at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator.run(IncrementalSourceEnumerator.java:70) at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.stateProcess(SourceSplitEnumeratorTask.java:319) at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.call(SourceSplitEnumeratorTask.java:138) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:717) at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1039) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.seatunnel.api.table.catalog.CatalogTable.getTableSchema()" because the return value of "java.util.Map.get(Object)" is null at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.MySqlDialect.getPrimaryKey(MySqlDialect.java:130) at org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter.getSplitColumn(AbstractJdbcSourceChunkSplitter.java:382) at org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter.generateSplits(AbstractJdbcSourceChunkSplitter.java:64) ... 13 more

    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:203)
    ... 2 more

your error is same like @chen1274528094 you can manually add schema config to source table to avoid this issue, refer this doc https://seatunnel.apache.org/docs/concept/schema-feature/#how-to-use-schema

hailin0 commented 2 days ago

When this exception occurs, the jar package with the prefix connector- should be deleted from the lib directory.

    Caused by: java.lang.NullPointerException
        at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.MySqlDialect.getPrimaryKey(MySqlDialect.java:130)
        at org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter.getSplitColumn(AbstractJdbcSourceChunkSplitter.java:382)
        at org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter.generateSplits(AbstractJdbcSourceChunkSplitter.java:64)
        ... 13 more