apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.66k stars 1.7k forks source link

[Bug2.3.4] [postgresqlCDC]对源表做update操作时 ,同步任务报错停止When updating the source table, the synchronization task stops with an error #6407

Open 2606090723 opened 5 months ago

2606090723 commented 5 months ago

Search before asking

What happened

image

2024-02-28 08:45:23,237 ERROR [o.a.s.c.s.SeaTunnel ] [main] -

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88) at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:222) at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:189) at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:157) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:132) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:89) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:55) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:108) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110) at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643) at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
    ... 2 more

2024-02-28 08:45:23,260 INFO [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - PrepareStatement sql is: INSERT INTO "test_db"."public"."t_user_2" ("id", "name", "age", "update_time") VALUES (?, ?, ?, ?) ON CONFLICT ("id") DO UPDATE SET "id"=EXCLUDED."id", "name"=EXCLUDED."name", "age"=EXCLUDED."age", "update_time"=EXCLUDED."update_time"

2024-02-28 08:45:23,265 INFO [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - PrepareStatement sql is: DELETE FROM "test_db"."public"."t_user_2" WHERE "id" = ?

2024-02-28 08:45:23,267 INFO [.JdbcMultiTableResourceManager] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - start close connection poolHikariPool-4 2024-02-28 08:45:23,268 INFO [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Shutdown initiated... 2024-02-28 08:45:23,272 WARN [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Interrupted during closing java.lang.InterruptedException: null at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475) ~[?:1.8.0_342] at org.apache.seatunnel.shade.com.zaxxer.hikari.pool.HikariPool.shutdown(HikariPool.java:255) ~[?:?] at org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource.close(HikariDataSource.java:351) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.ConnectionPoolManager.close(ConnectionPoolManager.java:66) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcMultiTableResourceManager.close(JdbcMultiTableResourceManager.java:42) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.close(MultiTableSinkWriter.java:251) ~[seatunnel-transforms-v2.jar:2.3.4] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:153) ~[seatunnel-starter.jar:2.3.4] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$5(SeaTunnelTask.java:330) ~[seatunnel-starter.jar:2.3.4] at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) [?:1.8.0_342] at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) [?:1.8.0_342] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) [?:1.8.0_342] at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290) [?:1.8.0_342] at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) [?:1.8.0_342] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_342] at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1870) [?:1.8.0_342] at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2467) [?:1.8.0_342] at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) [?:1.8.0_342] at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) [?:1.8.0_342] at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) [?:1.8.0_342] at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159) [?:1.8.0_342] at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173) [?:1.8.0_342] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) [?:1.8.0_342] at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) [?:1.8.0_342] at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650) [?:1.8.0_342] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:327) [seatunnel-starter.jar:2.3.4] at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:660) [seatunnel-starter.jar:2.3.4] at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944) [seatunnel-starter.jar:2.3.4] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_342] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342] 2024-02-28 08:45:23,899 INFO [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher 2024-02-28 08:45:23,900 INFO [i.d.j.JdbcConnection ] [pool-64-thread-1] - Connection gracefully closed 2024-02-28 08:45:23,902 INFO [i.d.j.JdbcConnection ] [pool-65-thread-1] - Connection gracefully closed 2024-02-28 08:45:23,903 INFO [i.d.j.JdbcConnection ] [pool-66-thread-1] - Connection gracefully closed 2024-02-28 08:45:24,304 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited. 2024-02-28 08:45:24,316 INFO [s.c.s.s.c.ClientExecuteCommand] [Thread-105] - run shutdown hook because get close signal

SeaTunnel Version

SeaTunnel2.3.4 pgsqlCDC

SeaTunnel Config

env {
  # You can set engine configuration here
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
  read_limit.bytes_per_second=7000000
  read_limit.rows_per_second=400
}

source {
  Postgres-CDC {
    username = "postgres"
    password = "postgres"
    database-names = ["test_db"]
    schema-names = ["public"]
    table-names = ["test_db.public.t_user"]
    base-url = "jdbc:postgresql://192.168.3.46:30028/test_db?loggerLevel=OFF"
  }
}

transform {

}
sink {
  jdbc {
    url = "jdbc:postgresql://192.168.3.46:30028/test_db?loggerLevel=OFF"
    driver = "org.postgresql.Driver"
    user = "postgres"
    password = "postgres"

    generate_sink_sql = true
    # You need to configure both database and table
    database = test_db
    table = "public.t_user_2"
    primary_keys = ["id"]

  }
}

Running Command

root@7366e5930d9f:/opt/apache-seatunnel-2.3.4# ./bin/seatunnel.sh --config ./config/pgsqlcdc.template -e local

Error Exception

2024-02-28 08:45:23,064 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SourceTask (1/1)] future complete with state FAILED
2024-02-28 08:45:23,064 ERROR [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Task TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000} Failed in Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], Begin to cancel other tasks in this pipeline.
2024-02-28 08:45:23,065 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] turned from state RUNNING to FAILING.
2024-02-28 08:45:23,065 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] state process is start
2024-02-28 08:45:23,066 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] turned from state RUNNING to CANCELING.
2024-02-28 08:45:23,067 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-2] - Send cancel Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] operator to member [localhost]:5801
2024-02-28 08:45:23,068 INFO  [o.a.s.e.s.TaskExecutionService] [seatunnel-coordinator-service-2] - [localhost]:5801 [seatunnel-347553] [5.1] Task (TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}) need cancel.
2024-02-28 08:45:23,069 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SourceTask (1/1)] state process is start
2024-02-28 08:45:23,069 WARN  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - [localhost]:5801 [seatunnel-347553] [5.1] Interrupted task 20000 - org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask@1be5e807
2024-02-28 08:45:23,070 INFO  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - [localhost]:5801 [seatunnel-347553] [5.1] taskDone, taskId = 20000, taskGroup = TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}
2024-02-28 08:45:23,070 INFO  [o.a.s.e.c.l.ClassLoaderUtil   ] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - recycle classloader org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader@1c65c15f
2024-02-28 08:45:23,072 INFO  [o.a.s.e.c.l.ClassLoaderUtil   ] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - recycle classloader for thread BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}
2024-02-28 08:45:23,073 INFO  [.e.IncrementalSourceEnumerator] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - Closing enumerator...
2024-02-28 08:45:23,073 INFO  [o.a.s.e.s.TaskExecutionService] [hz.main.seaTunnel.task.thread-2] - [localhost]:5801 [seatunnel-347553] [5.1] Task TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1} complete with state CANCELED
2024-02-28 08:45:23,073 INFO  [o.a.s.e.s.CoordinatorService  ] [hz.main.seaTunnel.task.thread-2] - [localhost]:5801 [seatunnel-347553] [5.1] Received task end from execution TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}, state CANCELED
2024-02-28 08:45:23,075 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [hz.main.seaTunnel.task.thread-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] turned from state CANCELING to CANCELED.
2024-02-28 08:45:23,075 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [hz.main.seaTunnel.task.thread-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] state process is stopped
2024-02-28 08:45:23,075 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] future complete with state CANCELED
2024-02-28 08:45:23,076 INFO  [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-2] - start clean pending checkpoint cause Pipeline turn to end state.
2024-02-28 08:45:23,079 INFO  [a.s.c.s.c.s.r.SourceReaderBase] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Closing Source Reader.
2024-02-28 08:45:23,080 INFO  [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-2] - Turn checkpoint_state_815141672055734273_1 state from RUNNING to CANCELED
2024-02-28 08:45:23,081 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] will end with state FAILED
2024-02-28 08:45:23,081 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] turned from state FAILING to FAILED.
2024-02-28 08:45:23,085 WARN  [o.a.s.s.c.z.h.HikariConfig    ] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - idleTimeout has been set but has no effect because the pool is operating as a fixed size pool.
2024-02-28 08:45:23,085 INFO  [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Starting...
2024-02-28 08:45:23,090 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Shutting down split fetcher 0
2024-02-28 08:45:23,102 INFO  [o.a.s.e.s.m.JobMaster         ] [seatunnel-coordinator-service-2] - release the pipeline Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] resource
2024-02-28 08:45:23,102 INFO  [a.s.e.s.s.s.DefaultSlotService] [hz.main.generic-operation.thread-46] - received slot release request, jobID: 815141672055734273, slot: SlotProfile{worker=[localhost]:5801, slotID=7, ownerJobID=815141672055734273, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='af5ae013-144f-4faa-937b-8981475315eb'}
2024-02-28 08:45:23,103 INFO  [a.s.e.s.s.s.DefaultSlotService] [hz.main.generic-operation.thread-47] - received slot release request, jobID: 815141672055734273, slot: SlotProfile{worker=[localhost]:5801, slotID=8, ownerJobID=815141672055734273, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='af5ae013-144f-4faa-937b-8981475315eb'}
2024-02-28 08:45:23,104 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] state process is stop
2024-02-28 08:45:23,104 INFO  [o.a.s.e.s.d.p.PhysicalPlan    ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] future complete with state FAILED
2024-02-28 08:45:23,105 INFO  [o.a.s.e.s.d.p.PhysicalPlan    ] [seatunnel-coordinator-service-3] - cancel job Job SeaTunnel_Job (815141672055734273) because makeJobEndWhenPipelineEnded is true
2024-02-28 08:45:23,106 INFO  [o.a.s.e.s.d.p.PhysicalPlan    ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (815141672055734273) turned from state RUNNING to FAILING.
2024-02-28 08:45:23,108 INFO  [o.a.s.e.s.d.p.PhysicalPlan    ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (815141672055734273) turned from state FAILING to FAILED.
2024-02-28 08:45:23,109 INFO  [o.a.s.e.s.d.p.PhysicalPlan    ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (815141672055734273) state process is stop
2024-02-28 08:45:23,137 INFO  [o.a.s.e.c.j.ClientJobProxy    ] [main] - Job (815141672055734273) end with state FAILED
2024-02-28 08:45:23,138 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel-347553] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
2024-02-28 08:45:23,142 INFO  [c.h.i.s.t.TcpServerConnection ] [hz.main.IO.thread-in-1] - [localhost]:5801 [seatunnel-347553] [5.1] Connection[id=1, /127.0.0.1:5801->/127.0.0.1:35765, qualifier=null, endpoint=[127.0.0.1]:35765, remoteUuid=b59847b8-8f89-4ef1-bf02-3ebe18c4f582, alive=false, connectionType=JVM, planeIndex=-1] closed. Reason: Connection closed by the other side
2024-02-28 08:45:23,143 INFO  [.c.i.c.ClientConnectionManager] [main] - hz.client_1 [seatunnel-347553] [5.1] Removed connection to endpoint: [localhost]:5801:6e1d5e93-9d1b-4167-a038-b76888e088fe, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:35765->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2024-02-28 08:45:23.129, lastWriteTime=2024-02-28 08:45:19.110, closedTime=2024-02-28 08:45:23.141, connected server version=5.1}
2024-02-28 08:45:23,144 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel-347553] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2024-02-28 08:45:23,147 INFO  [c.h.c.i.ClientEndpointManager ] [hz.main.event-1] - [localhost]:5801 [seatunnel-347553] [5.1] Destroying ClientEndpoint{connection=Connection[id=1, /127.0.0.1:5801->/127.0.0.1:35765, qualifier=null, endpoint=[127.0.0.1]:35765, remoteUuid=b59847b8-8f89-4ef1-bf02-3ebe18c4f582, alive=false, connectionType=JVM, planeIndex=-1], clientUuid=b59847b8-8f89-4ef1-bf02-3ebe18c4f582, clientName=hz.client_1, authenticated=true, clientVersion=5.1, creationTime=1709109719041, latest clientAttributes=lastStatisticsCollectionTime=1709109919108,enterprise=false,clientType=JVM,clientVersion=5.1,clusterConnectionTimestamp=1709109719030,clientAddress=127.0.0.1,clientName=hz.client_1,credentials.principal=null,os.committedVirtualMemorySize=2709667840,os.freePhysicalMemorySize=320233472,os.freeSwapSpaceSize=0,os.maxFileDescriptorCount=1048576,os.openFileDescriptorCount=65,os.processCpuTime=12630000000,os.systemLoadAverage=0.62,os.totalPhysicalMemorySize=8280195072,os.totalSwapSpaceSize=0,runtime.availableProcessors=1,runtime.freeMemory=161719896,runtime.maxMemory=518979584,runtime.totalMemory=260177920,runtime.uptime=203553,runtime.usedMemory=98460344, labels=[]}
2024-02-28 08:45:23,151 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel-347553] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2024-02-28 08:45:23,152 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed SeaTunnel client......
2024-02-28 08:45:23,153 INFO  [c.h.c.LifecycleService        ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] [localhost]:5801 is SHUTTING_DOWN
2024-02-28 08:45:23,158 INFO  [c.h.i.p.i.MigrationManager    ] [hz.main.cached.thread-6] - [localhost]:5801 [seatunnel-347553] [5.1] Shutdown request of Member [localhost]:5801 - 6e1d5e93-9d1b-4167-a038-b76888e088fe this is handled
2024-02-28 08:45:23,164 INFO  [c.h.i.i.Node                  ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] Shutting down connection manager...
2024-02-28 08:45:23,172 INFO  [c.h.i.i.Node                  ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] Shutting down node engine...
2024-02-28 08:45:23,211 INFO  [c.h.i.i.NodeExtension         ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] Destroying node NodeExtension.
2024-02-28 08:45:23,212 INFO  [c.h.i.i.Node                  ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] Hazelcast Shutdown is completed in 56 ms.
2024-02-28 08:45:23,212 INFO  [c.h.c.LifecycleService        ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] [localhost]:5801 is SHUTDOWN
2024-02-28 08:45:23,214 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed HazelcastInstance ......
2024-02-28 08:45:23,216 INFO  [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Start completed.
2024-02-28 08:45:23,218 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed metrics executor service ......
2024-02-28 08:45:23,219 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 

===============================================================================

2024-02-28 08:45:23,220 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Fatal Error, 

2024-02-28 08:45:23,221 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues

2024-02-28 08:45:23,222 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Reason:SeaTunnel job executed failed 

2024-02-28 08:45:23,223 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:222)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:189)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:157)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:132)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:89)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:55)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:108)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
        ... 2 more

2024-02-28 08:45:23,237 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
===============================================================================

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:222)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:189)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:157)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:132)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:89)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:55)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:108)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
        ... 2 more
2024-02-28 08:45:23,260 INFO  [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - PrepareStatement sql is:
INSERT INTO "test_db"."public"."t_user_2" ("id", "name", "age", "update_time") VALUES (?, ?, ?, ?) ON CONFLICT ("id") DO UPDATE SET "id"=EXCLUDED."id", "name"=EXCLUDED."name", "age"=EXCLUDED."age", "update_time"=EXCLUDED."update_time"

2024-02-28 08:45:23,265 INFO  [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - PrepareStatement sql is:
DELETE FROM "test_db"."public"."t_user_2" WHERE "id" = ?

2024-02-28 08:45:23,267 INFO  [.JdbcMultiTableResourceManager] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - start close connection poolHikariPool-4
2024-02-28 08:45:23,268 INFO  [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Shutdown initiated...
2024-02-28 08:45:23,272 WARN  [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Interrupted during closing
java.lang.InterruptedException: null
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067) ~[?:1.8.0_342]
        at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475) ~[?:1.8.0_342]
        at org.apache.seatunnel.shade.com.zaxxer.hikari.pool.HikariPool.shutdown(HikariPool.java:255) ~[?:?]
        at org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource.close(HikariDataSource.java:351) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.ConnectionPoolManager.close(ConnectionPoolManager.java:66) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcMultiTableResourceManager.close(JdbcMultiTableResourceManager.java:42) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.close(MultiTableSinkWriter.java:251) ~[seatunnel-transforms-v2.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:153) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$5(SeaTunnelTask.java:330) ~[seatunnel-starter.jar:2.3.4]
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) [?:1.8.0_342]
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) [?:1.8.0_342]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) [?:1.8.0_342]
        at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290) [?:1.8.0_342]
        at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1870) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2467) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) [?:1.8.0_342]
        at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159) [?:1.8.0_342]
        at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173) [?:1.8.0_342]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) [?:1.8.0_342]
        at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) [?:1.8.0_342]
        at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650) [?:1.8.0_342]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:327) [seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:660) [seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944) [seatunnel-starter.jar:2.3.4]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_342]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_342]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_342]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_342]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
2024-02-28 08:45:23,899 INFO  [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher
2024-02-28 08:45:23,900 INFO  [i.d.j.JdbcConnection          ] [pool-64-thread-1] - Connection gracefully closed
2024-02-28 08:45:23,902 INFO  [i.d.j.JdbcConnection          ] [pool-65-thread-1] - Connection gracefully closed
2024-02-28 08:45:23,903 INFO  [i.d.j.JdbcConnection          ] [pool-66-thread-1] - Connection gracefully closed
2024-02-28 08:45:24,304 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited.
2024-02-28 08:45:24,316 INFO  [s.c.s.s.c.ClientExecuteCommand] [Thread-105] - run shutdown hook because get close signal

Zeta or Flink or Spark Version

zeta

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 3 months ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

xuchangqun commented 2 months ago

+1

chess3cake commented 1 month ago

对源表delete时也有该问题,pgsql版本15, driver版本42.7.3

hailin0 commented 1 month ago

Please check your database settings https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Postgre-CDC.md#using-dependency

Here are the steps to enable CDC (Change Data Capture) in PostgreSQL:

Ensure the wal_level is set to logical: Modify the postgresql.conf configuration file by adding "wal_level = logical", restart the PostgreSQL server for the changes to take effect. Alternatively, you can use SQL commands to modify the configuration directly:

ALTER SYSTEM SET wal_level TO 'logical';
SELECT pg_reload_conf();

-- Change the REPLICA policy of the specified table to FULL
ALTER TABLE your_table_name REPLICA IDENTITY FULL;
chess3cake commented 1 month ago

Please check your database settings https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Postgre-CDC.md#using-dependency

Here are the steps to enable CDC (Change Data Capture) in PostgreSQL:

Ensure the wal_level is set to logical: Modify the postgresql.conf configuration file by adding "wal_level = logical", restart the PostgreSQL server for the changes to take effect. Alternatively, you can use SQL commands to modify the configuration directly:

ALTER SYSTEM SET wal_level TO 'logical';
SELECT pg_reload_conf();

-- Change the REPLICA policy of the specified table to FULL
ALTER TABLE your_table_name REPLICA IDENTITY FULL;

I guess the settings are correct.

lQLPJw2DkNURrWHMts0BvLBCKuSmcRfbXAZVynWC-TEA_444_182
hailin0 commented 1 month ago

check

ALTER TABLE your_table_name REPLICA IDENTITY FULL;