apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.78k stars 1.74k forks source link

[Bug] [Zeta] When I use the savepoint method to restart the task, a NullPointerException is thrown. #6898

Open weipengfei-sj opened 3 months ago

weipengfei-sj commented 3 months ago

同步kafka集群消息,采用3节点的集群模式,版本是2.3.5,采用seatunnel引擎的方式提交任务 ========配置文件内容如下======= env { parallelism = 5 job.mode = "STREAMING" job.name = "kafka_to_console" } source { Kafka { format = text

field_delimiter = "#"

#topic = "trans_dmp_99360000008_00_997010200090122"
topic = "ainput"
bootstrap.servers = "bdp02k1:9095,bdp02k2:9095,bdp02k3:9095"
kafka.config = {
  #client.id = client_1
  max.poll.records = 500
  auto.offset.reset = "earliest"
  enable.auto.commit = "false"
}

}
}

sink { kafka { topic = "test-result" bootstrap.servers = "bdp02r1:9092,bdp02r2:9092,bdp02r3:9092" format = text kafka.request.timeout.ms = 60000

semantics = EXACTLY_ONCE

  kafka.config = {
    acks = "all"
    request.timeout.ms = 60000
    buffer.memory = 33554432
  }

} }

提交任务命令: ./bin/seatunnel.sh -c ./config/test-kafka.yaml

保存检查点停止任务 ./bin/seatunnel.sh -s jobid

通过检查点恢复任务 ./bin/seatunnel.sh -c ./config/test-kafka.yaml -r jobid 发现客户端报如下空指针异常:

2024-05-24 14:56:01,286 INFO [c.h.c.LifecycleService ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED 2024-05-24 14:56:01,288 INFO [c.h.c.LifecycleService ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN 2024-05-24 14:56:01,289 INFO [s.c.s.s.c.ClientExecuteCommand] [main] - Closed SeaTunnel client...... 2024-05-24 14:56:01,289 INFO [s.c.s.s.c.ClientExecuteCommand] [main] - Closed metrics executor service ...... 2024-05-24 14:56:01,289 ERROR [o.a.s.c.s.SeaTunnel ] [main] -

2024-05-24 14:56:01,289 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Fatal Error,

2024-05-24 14:56:01,289 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues

2024-05-24 14:56:01,289 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Reason:SeaTunnel job executed failed

2024-05-24 14:56:01,290 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:202) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: CheckpointCoordinator inside have error. at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:274) at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:270) at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:376) at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:183) at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.run(CheckpointErrorReportOperation.java:48) at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.NullPointerException ... 11 more at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:194) ... 2 more 2024-05-24 14:56:01,290 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:202) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: CheckpointCoordinator inside have error. at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:274) at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:270) at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:376) at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:183) at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.run(CheckpointErrorReportOperation.java:48) at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.NullPointerException ... 11 more at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:194) ... 2 more 2024-05-24 14:56:01,292 INFO [s.c.s.s.c.ClientExecuteCommand] [ForkJoinPool.commonPool-worker-18] - run shutdown hook because get close signal

===================通过查看集群节点,找到如下抛空指针的地方: ===================看到是一个HashMap抛出来的空指针异常 ERROR [.s.e.s.c.CheckpointCoordinator] [hz.main.generic-operation.thread-21] - report error from task org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.NullPointerException at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.getTaskMemberAddressByIndex(SourceSplitEnumeratorTask.java:260) at org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext.assignSplit(SeaTunnelSplitEnumeratorContext.java:85) at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.lambda$assignSplit$11(KafkaSourceSplitEnumerator.java:305) at java.util.HashMap.forEach(HashMap.java:1289) at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.assignSplit(KafkaSourceSplitEnumerator.java:303) at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.registerReader(KafkaSourceSplitEnumerator.java:224) at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.receivedReader(SourceSplitEnumeratorTask.java:222) at org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation.lambda$null$0(SourceRegisterOperation.java:74) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:376) ~[seatunnel-starter.jar:2.3.5] at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:183) ~[seatunnel-starter.jar:2.3.5] at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.run(CheckpointErrorReportOperation.java:48) ~[seatunnel-starter.jar:2.3.5] at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.5] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.5] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.5] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471) ~[seatunnel-starter.jar:2.3.5] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197) ~[seatunnel-starter.jar:2.3.5] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137) ~[seatunnel-starter.jar:2.3.5] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) ~[seatunnel-starter.jar:2.3.5] at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) ~[seatunnel-starter.jar:2.3.5]

通过源码分析跟踪: 源码分析点一: 在SourceSplitEnumeratorTask类中,往出现空指针的hashmap中put值,每次完成一个taskID的put 同时跟踪enumerator.registerReader底层逻辑方法,会从该map中取值 public void receivedReader(TaskLocation readerId, Address memberAddr) throws InterruptedException, ExecutionException { log.info("received reader register, readerID: " + readerId); SourceSplitEnumerator<SplitT, Serializable> enumerator = getEnumerator(); ------------------------该行代码是往map中进行数据的添加,每次调用添加一个值 this.addTaskMemberMapping(readerId, memberAddr); ------------------------底层调用从map中取值,底层调用请请查看源码分析点二: enumerator.registerReader(readerId.getTaskIndex()); int taskSize = taskMemberMapping.size(); if (maxReaderSize == taskSize) { readerRegisterComplete = true; log.debug(String.format("reader register complete, current task size %d", taskSize)); } else { log.debug( String.format( "current task size %d, need size %d to complete register", taskSize, maxReaderSize)); } }

源码分析点二: 在KafkaSourceSplitEnumerator类中assignSplit方法,分析到readySplit进行for循环,其中的taskID可能是0/1/2... 发起往hashmap中取值的时候,实际map中还未完成put,故造成空指针的 private synchronized void assignSplit() { Map<Integer, List> readySplit = new HashMap<>(Common.COLLECTION_SIZE); for (int taskID = 0; taskID < context.currentParallelism(); taskID++) { readySplit.computeIfAbsent(taskID, id -> new ArrayList<>()); }

    pendingSplit.forEach(
            (key, value) -> {
                if (!assignedSplit.containsKey(key)) {
                    readySplit.get(getSplitOwner(key, context.currentParallelism())).add(value);
                }
            });

    readySplit.forEach(
            (id, split) -> {
            该行代码的底层调用,参考如下,源码分析点三:
                context.assignSplit(id, readySplit.get(id));
                if (discoveryIntervalMillis <= 0) {
                    context.signalNoMoreSplits(id);
                }
            });

    assignedSplit.putAll(pendingSplit);
    pendingSplit.clear();
}

===============上述逻辑调用的assignSplit点:

源码分析点三: public void assignSplit(int subtaskIndex, List splits) { if (registeredReaders().isEmpty()) { log.warn("No reader is obtained, skip this assign!"); return; }

    List<byte[]> splitBytes =
            splits.stream()
                    .map(split -> sneaky(() -> task.getSplitSerializer().serialize(split)))
                    .collect(Collectors.toList());
    task.getExecutionContext()
            .sendToMember(
                    new AssignSplitOperation<>(
                            task.getTaskMemberLocationByIndex(subtaskIndex), splitBytes),
                    task.getTaskMemberAddressByIndex(subtaskIndex))
            .join();
}

该方法中的 getTaskMemberAddressByIndex 是从SourceSplitEnumeratorTask类中map中取值,方法逻辑如下,也是出现空指针的异常点: 由于readySplit是一个循环,将所有的值进行调用,实际传输的id会存在在map中还未加入的元素 public Address getTaskMemberAddressByIndex(int taskIndex) { return taskMemberMapping.get(taskIndexToTaskLocationMapping.get(taskIndex)); }

经过观察分析,如果在发起reader注册的时候,不是必须调用assignSplit方法,等到收到所有reader后,在发起assignSplit,即可避免空指针问题

github-actions[bot] commented 1 month ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.