apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.81k stars 1.76k forks source link

Recovery job failed #7472

Open dibrother opened 3 weeks ago

dibrother commented 3 weeks ago

Search before asking

Description

Simulate power outage,when I kill all seatunnel processes, I am unable to recover jobs based on the checkpoint. When the cluster node is restarted, all previously run tasks are not automatically restored.

1.Deploy SeaTunnel Engine In Separated Cluster Mode, all on the same server

seatunnel:
  engine:
    history-job-expire-minutes: 1440
    backup-count: 1
    queue-type: blockingqueue
    print-execution-info-interval: 60
    print-job-metrics-info-interval: 60
    slot-service:
      dynamic-slot: true
    checkpoint:
      interval: 10000
      timeout: 60000
      storage:
        type: hdfs
        max-retained: 3
        plugin-config:
          namespace: /app/seatunnel/checkpoint_snapshot
          storage.type: hdfs
          fs.defaultFS: file:///app/ # Ensure that the directory has written permission
map:
  engine*:
    map-store:
      enabled: true
      initial-mode: EAGER
      factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
      properties:
        type: hdfs
        namespace: /app/seatunnel/imap
        clusterName: seatunnel-cluster
        storage.type: hdfs
        fs.defaultFS: file:///app/

2.Starting the SeaTunnel Engine Master Node 3.Starting The SeaTunnel Engine Worker Node 4.Submitting Jobs

./bin/seatunnel.sh --config config/mysql-to-mysql.conf -n test01

5.Full data extraction and normal synchronization

  1. Kill all seatunnel processes
    ps -ef|grep seatunnel
    kill -9 xxx xxx xxx xxx

recovery: 1.Starting the SeaTunnel Engine Master Node 2.Starting The SeaTunnel Engine Worker Node 3.Resuming Jobs

# View job_id
ls /app/seatunnel/checkpoint_snapshot/
879243812122132482

# Resuming Jobs
./bin/seatunnel.sh -r 879243812122132482 -c config/mysql-to-mysql.conf 

report errors:

2024-08-23 14:14:11,767 INFO  [p.MultipleTableJobConfigParser] [main] - start generating all transforms.
2024-08-23 14:14:11,767 INFO  [p.MultipleTableJobConfigParser] [main] - start generating all sinks.
2024-08-23 14:14:11,781 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Load SeaTunnelSink Plugin from /root/seatunnel/apache-seatunnel-2.3.7/connectors
2024-08-23 14:14:11,868 INFO  [o.a.s.a.t.f.FactoryUtil       ] [main] - Create sink 'jdbc' with upstream input catalog-table[database: sbtest, schema: null, table: sbtest1]
2024-08-23 14:14:12,525 INFO  [o.a.s.e.c.j.ClientJobProxy    ] [main] - Start submit job, job id: 879243812122132482, with plugin jar []
2024-08-23 14:14:16,883 INFO  [o.a.s.e.c.j.ClientJobProxy    ] [main] - Submit job finished, job id: 879243812122132482, job name: SeaTunnel_Job
2024-08-23 14:14:17,674 WARN  [o.a.s.e.c.j.JobMetricsRunner  ] [job-metrics-runner-879243812122132482] - Failed to get job metrics summary, it maybe first-run
2024-08-23 14:14:36,700 INFO  [o.a.s.e.c.j.ClientJobProxy    ] [main] - Job (879243812122132482) end with state FAILED
2024-08-23 14:14:36,703 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
2024-08-23 14:14:36,718 INFO  [.c.i.c.ClientConnectionManager] [main] - hz.client_1 [seatunnel] [5.1] Removed connection to endpoint: [localhost]:5802:9312e0d1-46f4-4093-8fa2-125ecd53a0d8, connection: ClientConnection{alive=false, connectionId=2, channel=NioChannel{/127.0.0.1:34341->localhost/127.0.0.1:5802}, remoteAddress=[localhost]:5802, lastReadTime=2024-08-23 14:14:35.055, lastWriteTime=2024-08-23 14:14:35.053, closedTime=2024-08-23 14:14:36.713, connected server version=5.1}
2024-08-23 14:14:36,720 INFO  [.c.i.c.ClientConnectionManager] [main] - hz.client_1 [seatunnel] [5.1] Removed connection to endpoint: [localhost]:5801:c9a9fb0a-5d6b-4f2a-a8d1-cf59831dc43b, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:49359->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2024-08-23 14:14:36.668, lastWriteTime=2024-08-23 14:14:30.060, closedTime=2024-08-23 14:14:36.719, connected server version=5.1}
2024-08-23 14:14:36,720 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2024-08-23 14:14:36,733 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2024-08-23 14:14:36,734 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed SeaTunnel client......
2024-08-23 14:14:36,735 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed metrics executor service ......
2024-08-23 14:14:36,735 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 

===============================================================================

2024-08-23 14:14:36,736 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Fatal Error, 

2024-08-23 14:14:36,736 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues

2024-08-23 14:14:36,737 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Reason:SeaTunnel job executed failed 

2024-08-23 14:14:36,741 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:211)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.ArrayIndexOutOfBoundsException: Index 4 out of bounds for length 4
        at org.apache.seatunnel.api.table.type.SeaTunnelRowType.getFieldType(SeaTunnelRowType.java:73)
        at org.apache.seatunnel.api.table.type.SeaTunnelRow.getBytesSize(SeaTunnelRow.java:105)
        at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.collect(SeaTunnelSourceCollector.java:125)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter$OutputCollector.collect(IncrementalSourceRecordEmitter.java:206)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:191)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:198)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:150)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:101)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:61)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:156)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:717)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1039)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:203)
        ... 2 more

2024-08-23 14:14:36,744 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
===============================================================================

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:211)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.ArrayIndexOutOfBoundsException: Index 4 out of bounds for length 4
        at org.apache.seatunnel.api.table.type.SeaTunnelRowType.getFieldType(SeaTunnelRowType.java:73)
        at org.apache.seatunnel.api.table.type.SeaTunnelRow.getBytesSize(SeaTunnelRow.java:105)
        at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.collect(SeaTunnelSourceCollector.java:125)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter$OutputCollector.collect(IncrementalSourceRecordEmitter.java:206)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:191)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:198)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:150)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:101)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:61)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:156)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:717)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1039)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:203)
        ... 2 more
2024-08-23 14:14:36,755 INFO  [s.c.s.s.c.ClientExecuteCommand] [Thread-6] - run shutdown hook because get close signal

Usage Scenario

No response

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

dailai commented 3 weeks ago

Please use the 2.3.7 which fix the problem by the pr

dibrother commented 3 weeks ago

The current version being used is 2.3.7.

dailai commented 3 weeks ago

Maybe you're using the wrong snapshot. Has the table structure changed before and after the snapshot you are using?

dibrother commented 3 weeks ago

The source table structure has been changed (deleting a field), will this affect the recovery? The change of upstream data is uncontrollable.

dailai commented 3 weeks ago

We will fix the problem in this pr. https://github.com/apache/seatunnel/pull/7463.

dibrother commented 3 weeks ago

Thank you very much, looking forward to future versions.