apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.08k stars 1.83k forks source link

[Bug] [Postgres-CDC] java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Struct.get(String)" because "struct" is null #7905

Closed cobolbaby closed 1 day ago

cobolbaby commented 1 month ago

Search before asking

What happened

java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Struct.get(String)" because "struct" is null

SeaTunnel Version

2.3.8

SeaTunnel Config

env {
  # You can set engine configuration here
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 60000
  read_limit.bytes_per_second=50000000
  read_limit.rows_per_second=10000
}

source {
  Postgres-CDC {
    # 源端数据库 JDBC Url
    base-url = "jdbc:postgresql://***/bdc"
    username = "***"
    password = "***"
    database-names = ["bdc"]
    schema-names = ["dw"]
    table-names = ["bdc.dw.fact_cpu_sn", "bdc.dw.dim_cpu_dn"]
    result_table_name = "SQT_PG_BDC_CDC_dw_cpu"
  }
}

transform {

}

sink {
  jdbc {
    # https://seatunnel.apache.org/docs/2.3.8/connector-v2/sink/Jdbc
    source_table_name = "SQT_PG_BDC_CDC_dw_cpu"
    url = "jdbc:postgresql://***/bdc_test"
    driver = "org.postgresql.Driver"
    user = "***"
    password = "***"

    # You need to configure both database and table
    database = "bdc_test"
    table = "dw_sqt.${table_name}"
    primary_keys = ["${primary_key}"]
    schema_save_mode = "IGNORE"
    generate_sink_sql = "true"
  }
}

Running Command

./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/pgcdc2pg_bdc_dw_cpu.conf

Error Exception

==> flink--standalonesession-0-0489b20f21c4.log <==
2024-10-24 18:24:50,728 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Postgres-CDC-Source -> MultiTableSink-Sink: Writer (1/1) (8c6dbb40f39dfb76d50b9c9bdc1362e4_cbc357ccb763df2852fee8c4fc7d55f2_0_5) switched from RUNNING to FAILED on 172.17.232.66:40409-cf2488 @ 0489b20f21c4 (dataPort=41559).
java.lang.NullPointerException: null
    at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88) ~[?:?]
    at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:223) ~[?:?]
    at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:188) ~[?:?]
    at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111) ~[?:?]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:198) ~[?:?]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:150) ~[?:?]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:101) ~[?:?]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:61) ~[?:?]
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110) ~[?:?]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[?:?]
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:114) ~[?:?]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[?:?]
    at org.apache.seatunnel.translation.flink.source.FlinkSourceReader.pollNext(FlinkSourceReader.java:80) ~[?:?]
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.18.1.jar:1.18.1]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
2024-10-24 18:24:50,729 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] - Clearing resource requirements of job 515026a838b438877c64bef449244e53
2024-10-24 18:24:50,729 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 0 (#5) of source Source: Postgres-CDC-Source.
2024-10-24 18:24:50,729 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - 1 tasks will be restarted to recover the failed task 8c6dbb40f39dfb76d50b9c9bdc1362e4_cbc357ccb763df2852fee8c4fc7d55f2_0_5.
2024-10-24 18:24:50,729 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job SeaTunnel (515026a838b438877c64bef449244e53) switched from state RUNNING to RESTARTING.

==> flink--taskexecutor-0-0489b20f21c4.log <==
2024-10-24 18:24:50,726 INFO  io.debezium.jdbc.JdbcConnection                              [] - Connection gracefully closed
2024-10-24 18:24:50,726 INFO  org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher [] - Split fetcher 0 exited.
2024-10-24 18:24:50,726 INFO  org.apache.seatunnel.api.event.LoggingEventHandler           [] - log event: ReaderCloseEvent(createdTime=1729794290726, jobId=515026a838b438877c64bef449244e53, eventType=LIFECYCLE_READER_CLOSE)
2024-10-24 18:24:50,726 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Postgres-CDC-Source -> MultiTableSink-Sink: Writer (1/1)#5 (8c6dbb40f39dfb76d50b9c9bdc1362e4_cbc357ccb763df2852fee8c4fc7d55f2_0_5) switched from RUNNING to FAILED with failure cause:
java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Struct.get(String)" because "struct" is null
    at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
    at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:223) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
    at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:188) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
    at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:198) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:150) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:101) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:61) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110) ~[blob_p-9113e096f42818c442db749bb2f798c1c2e41d04-20bab250076a2147589ecdb43456d7d8:2.3.8]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:114) ~[blob_p-9113e096f42818c442db749bb2f798c1c2e41d04-20bab250076a2147589ecdb43456d7d8:2.3.8]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
    at org.apache.seatunnel.translation.flink.source.FlinkSourceReader.pollNext(FlinkSourceReader.java:80) ~[blob_p-9113e096f42818c442db749bb2f798c1c2e41d04-20bab250076a2147589ecdb43456d7d8:2.3.8]
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) [flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) [flink-dist-1.18.1.jar:1.18.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.18.1.jar:1.18.1]
    at java.lang.Thread.run(Unknown Source) [?:?]
2024-10-24 18:24:50,726 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: Postgres-CDC-Source -> MultiTableSink-Sink: Writer (1/1)#5 (8c6dbb40f39dfb76d50b9c9bdc1362e4_cbc357ccb763df2852fee8c4fc7d55f2_0_5).
2024-10-24 18:24:50,727 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: Postgres-CDC-Source -> MultiTableSink-Sink: Writer (1/1)#5 8c6dbb40f39dfb76d50b9c9bdc1362e4_cbc357ccb763df2852fee8c4fc7d55f2_0_5.

Zeta or Flink or Spark Version

Flink: 1.18.1

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 1 week ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] commented 1 day ago

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.

cobolbaby commented 18 hours ago

没有解决。。。烦请保持 open 状态