apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.82k stars 1.76k forks source link

[Bug] Can not find catalog table with factoryId [Postgres] #6550

Open Asura7969 opened 6 months ago

Asura7969 commented 6 months ago

Search before asking

What happened

Can not find catalog table with factoryId [Postgres]

SeaTunnel Version

2.3.4

SeaTunnel Config

nothing

Running Command

{
  "env": {
    "job.mode": "STREAMING",
    "parallelism": 1,
    "checkpoint.interval": 5000
  },
  "source": [
    {
      "base-url": "jdbc:postgresql://xxxx:9999/n2db?loggerLevel=OFF",
      "password": "xxxx",
      "hostname": "xxxx",
      "exactly_once": true,
      "startup.mode": "INITIAL",
      "port": 9999,
      "debezium": {
        "publication.name": "dbz_publication"
      },
      "slot.name": "hipot_results_slot",
      "database-name": [
        "n2db"
      ],
      "table-names": [
        "n2db.n2admin.hipot_results"
      ],
      "plugin_name": "Postgres-CDC",
      "username": "replica"
    }
  ],
  "sink": [
    {
      "base-url": "jdbc:mysql://xxxx:9030/test?useSSL=true",
      "enable_upsert_delete": "true",
      "password": "123456",
      "database": "test",
      "save_mode_create_template": "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n${rowtype_primary_key},\n${rowtype_fields}\n) ENGINE=OLAP\n PRIMARY KEY (${rowtype_primary_key})\nDISTRIBUTED BY HASH (${rowtype_primary_key})PROPERTIES (\n    \"replication_num\" = \"2\" \n)",
      "max_retries": 3,
      "starrocks.config": {
        "format": "json"
      },
      "labelPrefix": "test_pg_cdc",
      "nodeUrls": [
        "xxxx:8030"
      ],
      "plugin_name": "StarRocks",
      "table": "test_pg_cdc",
      "username": "seatunnel"
    }
  ]
}

Error Exception

2024-03-20 17:00:01,698 INFO  [o.a.s.a.t.c.CatalogTableUtil  ] [hz.main.cached.thread-13] - Get catalog tables, cost time: 89
2024-03-20 17:00:01,698 INFO  [.s.c.s.j.c.AbstractJdbcCatalog] [hz.main.cached.thread-13] - Catalog Postgres closing
2024-03-20 17:00:01,698 WARN  [Log4j2HttpPostCommandProcessor] [hz.main.cached.thread-13] - [10.9.30.107]:6801 [seatunnel] [5.1] An error occurred while handling request HttpCommand [HTTP_POST]{uri='/hazelcast/rest/maps/submit-job'}AbstractTextCommand[HTTP_POST]{requestId=2}
org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a source for identifier 'Postgres-CDC'.
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:100) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:320) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:181) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.rest.RestJobExecutionEnvironment.getLogicalDag(RestJobExecutionEnvironment.java:76) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.rest.RestJobExecutionEnvironment.build(RestJobExecutionEnvironment.java:99) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.rest.RestHttpPostCommandProcessor.handleSubmitJob(RestHttpPostCommandProcessor.java:136) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.rest.RestHttpPostCommandProcessor.handle(RestHttpPostCommandProcessor.java:82) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.rest.RestHttpPostCommandProcessor.handle(RestHttpPostCommandProcessor.java:60) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.internal.ascii.TextCommandServiceImpl$CommandExecutor.run(TextCommandServiceImpl.java:402) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.internal.util.executor.CachedExecutorServiceDelegate$Worker.run(CachedExecutorServiceDelegate.java:217) ~[seatunnel-starter.jar:2.3.4]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_381]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_381]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
        at com.hazelcast.internal.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) ~[seatunnel-starter.jar:2.3.4]
Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: Can not find catalog table with factoryId [Postgres]
        at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.lambda$getCatalogTables$0(CatalogTableUtil.java:129) ~[seatunnel-starter.jar:2.3.4]
        at java.util.Optional.map(Optional.java:215) ~[?:1.8.0_381]
        at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.getCatalogTables(CatalogTableUtil.java:116) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.getCatalogTables(CatalogTableUtil.java:96) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.PostgresIncrementalSourceFactory.lambda$createSource$1(PostgresIncrementalSourceFactory.java:91) ~[?:?]
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:112) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:73) ~[seatunnel-starter.jar:2.3.4]
        ... 14 more

Zeta or Flink or Spark Version

Zeta

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

Asura7969 commented 6 months ago

@Carl-Zhou-CN cc

Carl-Zhou-CN commented 6 months ago

hi, @Asura7969 Most likely, the catalog fails to query the table you specified

Asura7969 commented 6 months ago

hi, @Asura7969 Most likely, the catalog fails to query the table you specified

Are you referring to the issue of account permissions for connecting to pg?

Carl-Zhou-CN commented 6 months ago

Are you referring to the issue of account permissions for connecting to pg?

It could be, or it could happen because of a connection timeout, or because of a table name definition error

Asura7969 commented 6 months ago

Are you referring to the issue of account permissions for connecting to pg?

It could be, or it could happen because of a connection timeout, or because of a table name definition error

The task runs normally, but no data is written to starrocks,no exception log

ab92015359 commented 5 months ago

I have the same issue with seatunnel V2.3.4.

Postgresql: 12.18 PostgreSQL JDBC Driver: 42.7.3

ab92015359 commented 5 months ago

Seatunnel Log

2024-04-08 14:21:24,171 INFO  [o.a.s.c.s.u.ConfigBuilder     ] [main] - Parsed config file:
{
    "env" : {
        "execution.parallelism" : 1,
        "job.mode" : "STREAMING",
        "checkpoint.interval" : 5000,
        "read_limit.bytes_per_second" : 7000000,
        "read_limit.rows_per_second" : 400
    },
    "source" : [
        {
            "base-url" : "jdbc:postgresql://X.X.X.X:5434/postgres",
            "password" : "XXXX",
            "table-names" : [
                "postgres.public.table_name"
            ],
            "result_table_name" : "customers_Postgre_cdc",
            "database-names" : [
                "postgres"
            ],
            "schema-names" : [
                "public"
            ],
            "plugin_name" : "Postgres-CDC",
            "username" : "XXXX"
        }
    ],
    "transform" : [],
    "sink" : [
        {
            "plugin_name" : "Console"
        }
    ]
}

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a source for identifier 'Postgres-CDC'.
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:100)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:320)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:181)
        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:88)
        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:161)
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:146)
        ... 2 more
Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: Can not find catalog table with factoryId [Postgres]
        at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.lambda$getCatalogTables$0(CatalogTableUtil.java:129)
        at java.util.Optional.map(Optional.java:215)
        at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.getCatalogTables(CatalogTableUtil.java:116)
        at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.getCatalogTables(CatalogTableUtil.java:96)
        at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.PostgresIncrementalSourceFactory.lambda$createSource$1(PostgresIncrementalSourceFactory.java:91)
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:112)
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:73)
        ... 7 more

PG Log

2024-04-09 15:25:18.370 UTC [128133] postgres@postgres LOG:  execute <unnamed>: SET extra_float_digits = 3
2024-04-09 15:25:18.371 UTC [128133] postgres@postgres LOG:  execute <unnamed>: SET application_name = 'PostgreSQL JDBC Driver'
2024-04-09 15:25:18.383 UTC [128133] postgres@postgres LOG:  execute <unnamed>: select datname from pg_database
ab92015359 commented 5 months ago

it is ok now, thanks.

Carl-Zhou-CN commented 5 months ago

Are you referring to the issue of account permissions for connecting to pg?

It could be, or it could happen because of a connection timeout, or because of a table name definition error

The task runs normally, but no data is written to starrocks,no exception log

No specific information, can not determine why

Asura7969 commented 5 months ago

Are you referring to the issue of account permissions for connecting to pg?

It could be, or it could happen because of a connection timeout, or because of a table name definition error

The task runs normally, but no data is written to starrocks,no exception log

No specific information, can not determine why

Already solved,thanks

github-actions[bot] commented 4 months ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

chess3cake commented 3 months ago

@Asura7969 hi,how did you resolve it? I also had the same error

chess3cake commented 3 months ago

@Asura7969 hi,how did you resolve it? I also had the same error

@Carl-Zhou-CN my config like this `env {

You can set engine configuration here

execution.parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 5000 read_limit.bytes_per_second=7000000 read_limit.rows_per_second=400 }

source { Postgres-CDC { startup.mode = "INITIAL" format= "COMPATIBLE_DEBEZIUM_JSON" username = "user" password = "wpwd" table-names = ["postgres.schema.table"] base-url = "jdbc:postgresql://url/postgres" } }

transform {

}

sink { Doris { fenodes = "url" username = root password = "pw" database = "${database_name}" table = "${table_name}" sink.label-prefix = "some_sink" sink.enable-2pc = "true" sink.enable-delete = "true" needs_unsupported_type_casting=true schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode = "APPEND_DATA" doris.config { format = "json" read_json_by_line = "true" } } }`