apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.82k stars 1.76k forks source link

[Bug] [Postgres-CDC] Postgres-CDC Unsupported type: ARRAY<STRING> #7370

Open hswlxc opened 1 month ago

hswlxc commented 1 month ago

Search before asking

What happened

image When using postgress-cdc, it is indicated that the array type is not supported, but the usage documentation indicates that it is supported

image

SeaTunnel Version

2.3.5

SeaTunnel Config

env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
  read_limit.bytes_per_second=7000000
  read_limit.rows_per_second=400
}

source {
 Postgres-CDC {
    result_table_name=Table14286033870336
    username="brt"
    password="jiupinbrt6789jr"
    hostname = "172.16.15.117"
    port = 6543
    database-names = ["db_brt_test"]
    schema-names = ["brt_case"]
    table-names = ["db_brt_test.brt_case.tb_base"]
    base-url="jdbc:postgresql://172.16.15.117:6543/db_brt_test"
    exactly_once = true
    table-names-config = [
      {
        table = "db_brt_test.brt_case.tb_base"
        primaryKeys = ["bse_id"]
      }
     ]
 }
}
transform {
}
sink {
 Jdbc {
    source_table_name = "Table14286033870336"
    database=db_brt_test
    generate_sink_sql = true
    table="db_brt_test.brt_case.tb_base"
    chema = "brt_case"
    #tablePrefix = "sink_"
    primary_keys = ["bse_id"]
    password="6789@jiupin"
    url="jdbc:postgresql://172.16.16.230:5432/db_brt_test"
    user=postgres
    #username=postgres
    driver="org.postgresql.Driver"
 }
}

Running Command

bin/seatunnel.sh -c config/v2.streaming.conf -e local

Error Exception

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:202)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a source for identifier 'Postgres-CDC'.
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:100)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:332)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:188)
        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:88)
        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:156)
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:149)
        ... 2 more
Caused by: java.lang.UnsupportedOperationException: Unsupported type: ARRAY<STRING>
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.createNotNullConverter(SeaTunnelRowDebeziumDeserializationConverters.java:178)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.createConverter(SeaTunnelRowDebeziumDeserializationConverters.java:113)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.lambda$new$0(SeaTunnelRowDebeziumDeserializationConverters.java:71)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)
        at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
        at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.<init>(SeaTunnelRowDebeziumDeserializationConverters.java:73)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.createTableRowConverters(SeaTunnelRowDebeziumDeserializeSchema.java:295)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.<init>(SeaTunnelRowDebeziumDeserializeSchema.java:87)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema$Builder.build(SeaTunnelRowDebeziumDeserializeSchema.java:331)
        at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.PostgresIncrementalSource.createDebeziumDeserializationSchema(PostgresIncrementalSource.java:107)
        at org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource.<init>(IncrementalSource.java:113)
        at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.PostgresIncrementalSource.<init>(PostgresIncrementalSource.java:64)
        at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.PostgresIncrementalSourceFactory.lambda$createSource$1(PostgresIncrementalSourceFactory.java:103)
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:112)
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:73)
        ... 7 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

java :11.0.23

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 6 days ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.