apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.65k stars 1.69k forks source link

kafka savepoint then start has error #7145

Closed ccczhouxin closed 2 weeks ago

ccczhouxin commented 2 weeks ago

Search before asking

What happened

kafka to hive,i stop the task with savepoint, then start with savepoint has NullPointerException

SeaTunnel Version

2.3.5

SeaTunnel Config

env  { 
  job.mode = "STREAMING" 
  parallelism = "4" 
  job.retry.times = "0" 
} 
source  { 
  Kafka { 
    topic = "topic_time" 
    bootstrap.servers = "localhost:9092" 
    pattern = "false" 
    consumer.group = "12321321321" 
    commit_on_checkpoint = "false" 
    kafka.config  { 
      enable.auto.commit = "false" 
      security.protocol = "SASL_PLAINTEXT" 
      sasl.kerberos.service.name = "kafka" 
      sasl.mechanism = "GSSAPI" 
      java.security.krb5.conf = "/kafka/krb5.conf" 
      sasl.jaas.config = """    com.sun.security.auth.module.Krb5LoginModule required   serviceName="kafka"   useKeyTab=true   storeKey=true   useTicketCache=false   keyTab="/kafka/kafka.service.keytab"   principal="kafka/node1@HADOOP.COM"; """ 
    } 
    schema = { 
      fields = { 
        id = "bigint" 
        c2 = "date" 
        c3 = "timestamp" 
      } 
    } 
    format = "json" 
    result_table_name = "t_migr_time_json" 
    semantics = "EXACTLY_ONCE" 
    start_mode = "latest" 
    json_field  { 
      fields = { 
        id = "$.t_migr_auth_time[*].id" 
        c2 = "$.t_migr_auth_time[*].c2" 
        c3 = "$.t_migr_auth_time[*].c3" 
      } 
    } 
  } 
} 
sink  { 
  JDBC { 
    database = "d2" 
    table = "u2.t_migr_time" 
    url = "jdbc:postgresql://localhost:15400/d2" 
    user = "u2" 
    password = "123456" 
    driver = "org.postgresql.Driver" 
    generate_sink_sql = "true" 
    schema_save_mode = "ERROR_WHEN_SCHEMA_NOT_EXIST" 
    data_save_mode = "APPEND_DATA" 
    enable_upsert = "false" 
  } 
}

Running Command

./bin/seatunnel.sh --config /DATA1/zx/kafkaScript/test --restore 862961756702507009

Error Exception

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:209)
    at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
    at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: CheckpointCoordinator inside have error.
    at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:279)
    at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:275)
    at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:385)
    at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:184)
    at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.run(CheckpointErrorReportOperation.java:48)
    at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
    at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.NullPointerException

    ... 11 more

    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:201)
    ... 2 more
2024-07-09 15:48:04,044 INFO  [s.c.s.s.c.ClientExecuteCommand] [ForkJoinPool.commonPool-worker-2] - run shutdown hook because get close signal

Zeta or Flink or Spark Version

No response

Java or Scala Version

1.8

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

emmanuelsosa commented 2 weeks ago

What are you using for checkpoint storage? If you are using local storage ensure that you restore the job from the same node where it was paused or where the savepoint was created.

Additionally I'd make sure your checkpoint storage is correctly saving the savepoints in the backend.

ccczhouxin commented 2 weeks ago

您使用什么进行检查点存储?如果使用本地存储,请确保从暂停作业或创建保存点的同一节点还原作业。

此外,我会确保您的检查点存储正确地保存在后端的保存点。

this is my config seatunnel: engine: classloader-cache-mode: false history-job-expire-minutes: 1440 backup-count: 1 queue-type: blockingqueue print-execution-info-interval: 60 print-job-metrics-info-interval: 30 slot-service: dynamic-slot: true checkpoint: interval: 10000 timeout: 60000 storage: type: hdfs max-retained: 1 plugin-config: namespace: /opt/share-files/seatunnel/checkpoint_snapshot storage.type: hdfs fs.defaultFS: file:///opt # Ensure that the directory has written permission disable.cache: true

emmanuelsosa commented 2 weeks ago

Do you see checkpoints files in the /opt directory? Ensure that the directory has written permission for seatunnel

ccczhouxin commented 2 weeks ago

Do you see checkpoints files in the /opt directory? Ensure that the directory has written permission for seatunnel

OK thanks