apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.8k stars 1.75k forks source link

[Bug] [Sink JDBC ] 使用 sink-jdbc到postgresql 自动建表时,提示 ERROR: database "postgres" already exists #7386

Open NookVoive opened 1 month ago

NookVoive commented 1 month ago

Search before asking

What happened

当我想要实现从 Mysql 同步数据到 Postgresql 时,使用generate_sink_sql = true选项,自动建表时提示 database 已存在。 正常理解,当 database 存在时,应当跳过创建 database,去直接创建表,而不是终止任务。 我的任务配置文件: stream_mysql_postgresql.config

env {
  job.mode = "STREAMING"
  job.name = "streaming-mysql-pg"
}

source {
  MySQL-CDC {
    base-url = "jdbc:mysql://192.168.8.101:3306/test"
    username = "root"
    password = "123456"
    table-names = ["test.test_001"]
  }
}

sink {
  jdbc {
    url = "jdbc:postgresql://192.168.8.101:5432/postgres"
    driver = "org.postgresql.Driver"
    user = "postgres"
    password = "123456"
    database = "postgres"
    table = "public.test_001"
    generate_sink_sql = true
  }
}

执行命令:

bash bin/seatunnel.sh --config config/stream_mysql_postgresql.config

SeaTunnel Version

seatunnel 2.3.6

SeaTunnel Config

env {
  job.mode = "STREAMING"
  job.name = "streaming-mysql-pg"
}

source {
  MySQL-CDC {
    base-url = "jdbc:mysql://192.168.8.101:3306/test"
    username = "root"
    password = "123456"
    table-names = ["test.test_001"]
  }
}

sink {
  jdbc {
    url = "jdbc:postgresql://192.168.8.101:5432/postgres"
    driver = "org.postgresql.Driver"
    user = "postgres"
    password = "123456"
    database = "postgres"
    table = "public.test_001"
    generate_sink_sql = true
  }
}

### Running Command

```shell
bash bin/seatunnel.sh --config config/stream_mysql_postgresql.config

### Error Exception

```log
Caused by: org.apache.seatunnel.api.table.catalog.exception.CatalogException: ErrorCode:[API-03], ErrorDescription:[Catalog initialize failed] - Failed creating database postgres in catalog Postgres
    at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createDatabaseInternal(AbstractJdbcCatalog.java:431)
    at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createDatabase(AbstractJdbcCatalog.java:419)
    at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createTable(DefaultSaveModeHandler.java:168)
    at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:108)
    at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:69)
    at org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:38)
    at org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36)
    at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:373)
    ... 20 more
Caused by: org.postgresql.util.PSQLException: ERROR: database "postgres" already exists
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2725)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2412)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:371)
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:502)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:419)
    at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:194)
    at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:180)
    at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.executeInternal(AbstractJdbcCatalog.java:536)
    at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createDatabaseInternal(AbstractJdbcCatalog.java:428)
    ... 27 more

    at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:506)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)


### Zeta or Flink or Spark Version

Zeta 2.3.6

### Java or Scala Version

JDK 1.8

### Screenshots

_No response_

### Are you willing to submit PR?

- [ ] Yes I am willing to submit a PR!

### Code of Conduct

- [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
NookVoive commented 1 month ago

根据我自己排查的原因,以及基于其他人协助下,初步定位到问题原因在于:

  1. 在生成建表语句前,会先进行判断 database 是否存在,而判断时会把查询到的 database 列表过滤掉系统 database 如:postgres,pg_catalog 等。这就导致会认为,数据库:postgres 不存在,进而认为需要先创建 database。
  2. 在实际执行创建数据库的语句时,则因为系统数据库已存在,则会触发 sql 执行报错。也就是报错内容:database "postgres" already exists。
  3. 那么这就导致一个实际上结果:无法在同步数据到系统数据库下时,自动建表。

截图如下: 判断数据库是否存在:

image

排除系统数据库:

image

系统数据库列表:

image