apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.05k stars 1.82k forks source link

[Bug] [Postgres-CDC] load schema failed #7064

Closed Asura7969 closed 3 months ago

Asura7969 commented 4 months ago

Search before asking

What happened

load schema failed

SeaTunnel Version

2.3.5

SeaTunnel Config

{
  "env": {
    "job.mode": "STREAMING",
    "read_limit.bytes_per_second": 100000,
    "parallelism": 1,
    "checkpoint.timeout": 2147483647,
    "read_limit.rows_per_second": 1000,
    "checkpoint.interval": 20000
  },
  "source": [
    {
      "base-url": "jdbc:postgresql://xxxx:9898/n2db?loggerLevel=OFF",
      "password": "rep@mes",
      "hostname": "xxxx",
      "exactly_once": true,
      "startup.mode": "INITIAL",
      "port": 9898,
      "debezium": {
        "publication.name": "dbz_publication"
      },
      "slot.name": "bigdata_cdc_t_wip_detail",
      "table-names": [
        "n2db.n2admin.t_wip_detail"
      ],
      "database-names": [
        "n2db"
      ],
      "plugin_name": "Postgres-CDC",
      "username": "replica"
    }
  ],
  "sink": [
    {
      "base-url": "jdbc:mysql://xxxx:9030/test?useSSL=true",
      "max_retries": 3,
      "schema_save_mode": "CREATE_SCHEMA_WHEN_NOT_EXIST",
      "plugin_name": "StarRocks",
      "enable_upsert_delete": "true",
      "password": "123456",
      "database": "test",
      "save_mode_create_template": "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n${rowtype_primary_key},\n${rowtype_fields}\n) ENGINE=OLAP\n PRIMARY KEY (${rowtype_primary_key})\nDISTRIBUTED BY HASH (${rowtype_primary_key})PROPERTIES (\n    \"replication_num\" = \"3\" \n)",
      "starrocks.config": {
        "format": "json"
      },
      "labelPrefix": "t_wip_detail",
      "data_save_mode": "APPEND_DATA",
      "nodeUrls": [
        "xxxx:8030"
      ],
      "table": "ods_t_wip_detail",
      "username": "seatunnel"
    }
  ]
}

Running Command

no

Error Exception

java.lang.RuntimeException: One or more fetchers have encountered exception
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:160)
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:111)
    at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:156)
    at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
    at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121)
    at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
    at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
    ... 5 more
Caused by: io.debezium.DebeziumException: load schema failed
    at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext.configure(PostgresSourceFetchTaskContext.java:142)
    at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.submitTask(IncrementalSourceScanFetcher.java:84)
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.checkSplitOrStartNext(IncrementalSourceSplitReader.java:147)
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:71)
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54)
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
    ... 6 more
Caused by: org.postgresql.util.PSQLException: FATAL: terminating connection due to idle-in-transaction timeout
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2676)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2366)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356)
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:496)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:413)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:333)
    at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:319)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:295)
    at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:244)
    at org.postgresql.jdbc.PgDatabaseMetaData.getTables(PgDatabaseMetaData.java:1343)
    at io.debezium.jdbc.JdbcConnection.readSchema(JdbcConnection.java:1172)
    at io.debezium.connector.postgresql.PostgresSchema.refresh(PostgresSchema.java:112)
    at io.debezium.connector.postgresql.PostgresTaskContext.refreshSchema(PostgresTaskContext.java:82)
    at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext.configure(PostgresSourceFetchTaskContext.java:140)
    ... 11 more

Zeta or Flink or Spark Version

Zeta

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

Asura7969 commented 4 months ago

@Carl-Zhou-CN Do you have time to look at it? img_v3_02c7_10f5a726-c040-45bd-a07a-4e15217281bg

github-actions[bot] commented 3 months ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] commented 3 months ago

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.