apache / dolphinscheduler

Apache DolphinScheduler is the modern data orchestration platform. Agile to create high performance workflow with low-code
https://dolphinscheduler.apache.org/
Apache License 2.0
12.95k stars 4.65k forks source link

[Bug] [dolphinschedule-master] Cannot find worker group xxx #16738

Closed niyanchun closed 1 month ago

niyanchun commented 1 month ago

Search before asking

What happened

our dolphinscheduler task sometimes failed with error below:

[WI-0][TI-0] - [ERROR] 2024-10-23 18:00:15.480 +0800 o.a.d.s.m.r.BaseTaskDispatcher:[58] - Dispatch task: 看板推送任务实例同步 failed, worker group not found.
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException: Cannot find worker group: Can not find worker group 数仓
    at org.apache.dolphinscheduler.server.master.dispatch.host.LowerWeightHostManager.getWorkerHostWeights(LowerWeightHostManager.java:157)
    at org.apache.dolphinscheduler.server.master.dispatch.host.LowerWeightHostManager.select(LowerWeightHostManager.java:74)
    at org.apache.dolphinscheduler.server.master.runner.dispatcher.WorkerTaskDispatcher.getTaskInstanceDispatchHost(WorkerTaskDispatcher.java:78)
    at org.apache.dolphinscheduler.server.master.runner.BaseTaskDispatcher.dispatchTask(BaseTaskDispatcher.java:55)
    at org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueueLooper.run(GlobalTaskDispatchWaitingQueueLooper.java:80)

and the work group definitely exists, 800+ task use this worker group, only some task failed randomly. It also report "Cannot find worker group: Can not find worker group default" sometimes. These error will always succeed with a retry/retrun. I looked into the source code, I wonder if the workerHostWeightsMap will change while running? If yes, how? Thank you.

    private Set<HostWeight> getWorkerHostWeights(String workerGroup) throws WorkerGroupNotFoundException {
        workerGroupReadLock.lock();
        try {
            Set<HostWeight> hostWeights = workerHostWeightsMap.get(workerGroup);
            if (hostWeights == null) {
                throw new WorkerGroupNotFoundException("Can not find worker group " + workerGroup);
            }
            return hostWeights;
        } finally {
            workerGroupReadLock.unlock();
        }
    }

What you expected to happen

work group should always been found

How to reproduce

cannot reproduce for specified task, rerun will succeed, but always happens everyday for random tasks

Anything else

No response

Version

3.2.x

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 1 month ago

Search before asking

What happened

our dolphinscheduler task sometimes failed with error below:

[WI-0][TI-0] - [ERROR] 2024-10-23 18:00:15.480 +0800 o.a.d.s.m.r.BaseTaskDispatcher:[58] - Dispatch task: 看板推送任务实例同步 failed, worker group not found.
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException: Cannot find worker group: Can not find worker group 数仓
    at org.apache.dolphinscheduler.server.master.dispatch.host.LowerWeightHostManager.getWorkerHostWeights(LowerWeightHostManager.java:157)
    at org.apache.dolphinscheduler.server.master.dispatch.host.LowerWeightHostManager.select(LowerWeightHostManager.java:74)
    at org.apache.dolphinscheduler.server.master.runner.dispatcher.WorkerTaskDispatcher.getTaskInstanceDispatchHost(WorkerTaskDispatcher.java:78)
    at org.apache.dolphinscheduler.server.master.runner.BaseTaskDispatcher.dispatchTask(BaseTaskDispatcher.java:55)
    at org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueueLooper.run(GlobalTaskDispatchWaitingQueueLooper.java:80)

and the work group definitely exists, 800+ task use this worker group, only some task failed randomly. It also report "Cannot find worker group: Can not find worker group default" sometimes. These error will always succeed with a retry/retrun. I looked into the source code, I wonder if the workerHostWeightsMap will change while running? If yes, how? Thank you.

    private Set<HostWeight> getWorkerHostWeights(String workerGroup) throws WorkerGroupNotFoundException {
        workerGroupReadLock.lock();
        try {
            Set<HostWeight> hostWeights = workerHostWeightsMap.get(workerGroup);
            if (hostWeights == null) {
                throw new WorkerGroupNotFoundException("Can not find worker group " + workerGroup);
            }
            return hostWeights;
        } finally {
            workerGroupReadLock.unlock();
        }
    }

What you expected to happen

work group should always been found

How to reproduce

cannot reproduce for specified task, rerun will succeed, but always happens everyday for random tasks

Anything else

No response

Version

3.2.x

Are you willing to submit PR?

Code of Conduct

SbloodyS commented 1 month ago

You can check whether the workers under the worker group survive normally.

niyanchun commented 1 month ago

You can check whether the workers under the worker group survive normally.

Yes, we have checked, the worker is in good status, at least no errors in logs. We have 1800+ tasks, two worker nodes, two worker group ( not include default worker group), each worker group use the same two worker nodes, if the worker node is abnormal, many tasks will failed. I wonder if the worker breakdowns (such as network issue or others..) and restore soon (in seconds), is there any error logs? Up to now, I didn't find others errors when the task failed.

niyanchun commented 1 month ago

I may find the reason: in master logs:

 1632 [WI-0][TI-0] - [WARN] 2024-10-28 11:00:02.949 +0800 o.a.d.s.m.d.h.LowerWeightHostManager:[139] - Worker 10.10.3.9:1234 in workerGroup default is Busy, heartbeat is WorkerHeartBeat(workerHostWeight=100, threadPoolUsa      ge=11)
 1633 [WI-0][TI-0] - [WARN] 2024-10-28 11:00:02.949 +0800 o.a.d.s.m.d.h.LowerWeightHostManager:[139] - Worker 10.10.3.15:1234 in workerGroup default is Busy, heartbeat is WorkerHeartBeat(workerHostWeight=100, threadPoolUs      age=7)
 1634 [WI-0][TI-0] - [WARN] 2024-10-28 11:00:02.949 +0800 o.a.d.s.m.d.h.LowerWeightHostManager:[139] - Worker 10.10.3.9:1234 in workerGroup 采集 is Busy, heartbeat is WorkerHeartBeat(workerHostWeight=100, threadPoolUsage=      11)
 1635 [WI-0][TI-0] - [WARN] 2024-10-28 11:00:02.949 +0800 o.a.d.s.m.d.h.LowerWeightHostManager:[139] - Worker 10.10.3.15:1234 in workerGroup 采集 is Busy, heartbeat is WorkerHeartBeat(workerHostWeight=100, threadPoolUsage      =7)
 1636 [WI-0][TI-0] - [WARN] 2024-10-28 11:00:02.949 +0800 o.a.d.s.m.d.h.LowerWeightHostManager:[139] - Worker 10.10.3.9:1234 in workerGroup 数仓 is Busy, heartbeat is WorkerHeartBeat(workerHostWeight=100, threadPoolUsage=      11)
 1637 [WI-0][TI-0] - [WARN] 2024-10-28 11:00:02.949 +0800 o.a.d.s.m.d.h.LowerWeightHostManager:[139] - Worker 10.10.3.15:1234 in workerGroup 数仓 is Busy, heartbeat is WorkerHeartBeat(workerHostWeight=100, threadPoolUsage      =7)

If worker status is busy, it will cause workerHostWeightsMap empty or without the workgroup:

        /**
         * Sync worker resource.
         *
         * @param workerGroupNodes  worker group nodes, key is worker group, value is worker group nodes.
         * @param workerNodeInfoMap worker node info map, key is worker node, value is worker info.
         */
        private void syncWorkerResources(final Map<String, Set<String>> workerGroupNodes,
                                         final Map<String, WorkerHeartBeat> workerNodeInfoMap) {
            try {
                Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
                for (Map.Entry<String, Set<String>> entry : workerGroupNodes.entrySet()) {
                    String workerGroup = entry.getKey();
                    Set<String> nodes = entry.getValue();
                    Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
                    for (String node : nodes) {
                        WorkerHeartBeat heartbeat = workerNodeInfoMap.getOrDefault(node, null);
                        Optional<HostWeight> hostWeightOpt = getHostWeight(node, workerGroup, heartbeat);
                        hostWeightOpt.ifPresent(hostWeights::add);
                    }
                    if (!hostWeights.isEmpty()) {
                        workerHostWeights.put(workerGroup, hostWeights);
                    }
                }
                syncWorkerHostWeight(workerHostWeights);
            } catch (Throwable ex) {
                log.error("Sync worker resource error", ex);
            }
        }

        private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights) {
            workerGroupWriteLock.lock();
            try {
                workerHostWeightsMap.clear();
                workerHostWeightsMap.putAll(workerHostWeights);
            } finally {
                workerGroupWriteLock.unlock();
            }
        }
    }

    public Optional<HostWeight> getHostWeight(String workerAddress, String workerGroup, WorkerHeartBeat heartBeat) {
        if (heartBeat == null) {
            log.warn("Worker {} in WorkerGroup {} have not received the heartbeat", workerAddress, workerGroup);
            return Optional.empty();
        }
        if (ServerStatus.BUSY == heartBeat.getServerStatus()) {
            log.warn("Worker {} in workerGroup {} is Busy, heartbeat is {}", workerAddress, workerGroup, heartBeat);
            return Optional.empty();
        }
        return Optional.of(
                new HostWeight(
                        HostWorker.of(workerAddress, heartBeat.getWorkerHostWeight(), workerGroup),
                        heartBeat.getCpuUsage(),
                        heartBeat.getMemoryUsage(),
                        heartBeat.getDiskUsage(),
                        heartBeat.getThreadPoolUsage(),
                        heartBeat.getStartupTime()));
    }

I think this can be optimized, when the server is busy, it does not means work group not exist, it only indicate the server is busy.

SbloodyS commented 1 month ago

Which version you are using? @niyanchun

niyanchun commented 1 month ago

Which version you are using? @niyanchun

@SbloodyS 3.2.2. We also find cpu overload in worker logs:

dolphinscheduler-worker.2024-10-28_12.0.log:[WI-0][TI-0] - [INFO] 2024-10-28 12:00:40.308 +0800 o.a.d.m.m.BaseServerLoadProtection:[43] - OverLoad: the system cpu usage: 0.9396984924623116 is over then the maxSystemCpuUsagePercentageThresholds 0.9
dolphinscheduler-worker.2024-10-28_12.0.log:[WI-0][TI-0] - [INFO] 2024-10-28 12:00:41.320 +0800 o.a.d.m.m.BaseServerLoadProtection:[43] - OverLoad: the system cpu usage: 0.9603960396039605 is over then the maxSystemCpuUsagePercentageThresholds 0.9
dolphinscheduler-worker.2024-10-28_12.0.log:[WI-0][TI-0] - [INFO] 2024-10-28 12:00:42.320 +0800 o.a.d.m.m.BaseServerLoadProtection:[43] - OverLoad: the system cpu usage: 0.9849246231155779 is over then the maxSystemCpuUsagePercentageThresholds 0.9
dolphinscheduler-worker.2024-10-28_12.0.log:[WI-0][TI-0] - [INFO] 2024-10-28 12:00:43.328 +0800 o.a.d.m.m.BaseServerLoadProtection:[43] - OverLoad: the system cpu usage: 0.9900744416873449 is over then the maxSystemCpuUsagePercentageThresholds 0.9

Up to now, the failed likely caused by worker resource lack, but in fact the server load is low ( avg cpu is 20%), so we decide to modify some worker configs, and observe for some while:

Thanks.