apache / dolphinscheduler

Apache DolphinScheduler is the modern data orchestration platform. Agile to create high performance workflow with low-code
https://dolphinscheduler.apache.org/
Apache License 2.0
12.73k stars 4.58k forks source link

[Bug] [Switch] The Switch component cannot jump to the corresponding branch normally #16471

Open MuNan777 opened 1 month ago

MuNan777 commented 1 month ago

Search before asking

What happened

  1. Configure the switch component according to the official website. When executing, the condition is passed, but an error is reported, saying that nextBranch is not found.
  2. If none of the conditions are met, it is normal to execute the default branch

The following is the Log


[INFO] 2024-08-15 15:47:32.182 +0800 - Load task instance plugin [INFO] 2024-08-15 15:47:32.184 +0800 - *** [INFO] 2024-08-15 15:47:32.199 +0800 - Send task status RUNNING_EXECUTION to master 172.26.0.5:5678 [INFO] 2024-08-15 15:47:32.201 +0800 - Success initialize task parameters: { "localParams" : [ ], "varPool" : [ ], "switchResult" : { "dependTaskList" : [ { "condition" : "${value}==\"A\"", "nextNode" : 117018842817920 }, { "condition" : "${value}==\"B\"", "nextNode" : 117018871028096 } ], "nextNode" : 117018842817920 }, "nextBranch" : null } [INFO] 2024-08-15 15:47:32.202 +0800 - Initialized task plugin instance: SWITCH successfully [INFO] 2024-08-15 15:47:32.204 +0800 - Initialize taskVarPool: [{"prop":"value","direct":"OUT","type":"VARCHAR","value":"B"}] successfully [INFO] 2024-08-15 15:47:32.205 +0800 -


[INFO] 2024-08-15 15:47:32.207 +0800 - * Execute task instance ***** [INFO] 2024-08-15 15:47:32.209 +0800 - *** [INFO] 2024-08-15 15:47:32.211 +0800 - Begin to execute switch item: SwitchResultVo(condition=${value}=="A", nextNode=117018842817920) [INFO] 2024-08-15 15:47:32.214 +0800 - paramName:value,paramValue:"B" [INFO] 2024-08-15 15:47:32.215 +0800 - Format condition sentence::"B"=="A" successfully [INFO] 2024-08-15 15:47:32.272 +0800 - Execute condition sentence: "B"=="A" successfully: false [INFO] 2024-08-15 15:47:32.273 +0800 - Begin to execute switch item: SwitchResultVo(condition=${value}=="B", nextNode=117018871028096) [INFO] 2024-08-15 15:47:32.275 +0800 - paramName:value,paramValue:"B" [INFO] 2024-08-15 15:47:32.277 +0800 - Format condition sentence::"B"=="B" successfully [INFO] 2024-08-15 15:47:32.317 +0800 - Execute condition sentence: "B"=="B" successfully: true [ERROR] 2024-08-15 15:47:32.318 +0800 - Task execute failed, due to meet an exception java.lang.IllegalArgumentException: The branch is empty, please check the switch task configuration at org.apache.dolphinscheduler.server.master.runner.task.switchtask.SwitchLogicTask.checkIfBranchExist(SwitchLogicTask.java:130) at org.apache.dolphinscheduler.server.master.runner.task.switchtask.SwitchLogicTask.handle(SwitchLogicTask.java:78) at org.apache.dolphinscheduler.server.master.runner.execute.SyncMasterTaskExecutor.executeTask(SyncMasterTaskExecutor.java:44) at org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor.run(MasterTaskExecutor.java:110) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) [INFO] 2024-08-15 15:47:32.319 +0800 -


[INFO] 2024-08-15 15:47:32.320 +0800 - * Finalize task instance **** [INFO] 2024-08-15 15:47:32.321 +0800 - *** [ERROR] 2024-08-15 15:47:32.322 +0800 - Get a exception when execute the task, will try to cancel the task java.lang.IllegalArgumentException: The branch is empty, please check the switch task configuration at org.apache.dolphinscheduler.server.master.runner.task.switchtask.SwitchLogicTask.checkIfBranchExist(SwitchLogicTask.java:130) at org.apache.dolphinscheduler.server.master.runner.task.switchtask.SwitchLogicTask.handle(SwitchLogicTask.java:78) at org.apache.dolphinscheduler.server.master.runner.execute.SyncMasterTaskExecutor.executeTask(SyncMasterTaskExecutor.java:44) at org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor.run(MasterTaskExecutor.java:110) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) [WARN] 2024-08-15 15:47:32.324 +0800 - The Sync task does not support kill operation [INFO] 2024-08-15 15:47:32.338 +0800 - Send task status: FAILURE to master: 172.26.0.5:5678 successfully [INFO] 2024-08-15 15:47:32.339 +0800 - Get a exception when execute the task, sent the task execute result to master, the current task execute result is TaskExecutionStatus{code=6, desc='failure'} [INFO] 2024-08-15 15:47:32.341 +0800 - Get a exception when execute the task, removed the TaskExecutionContext [INFO] 2024-08-15 15:47:32.343 +0800 - FINALIZE_SESSION

What you expected to happen

I read the SwitchLogicTask.java file and saw that the calculateSwitchBranch() method only assigned a value to its own internal nextBranch, without giving it taskParameters, which resulted in an error when executing checkIfBranchExist(taskParameters.getNextBranch());

private void calculateSwitchBranch() {
      List<SwitchResultVo> switchResultVos = taskParameters.getSwitchResult().getDependTaskList();
      Map<String, Property> globalParams = taskExecutionContext.getPrepareParamsMap();
      Map<String, Property> varParams = JSONUtils
              .toList(taskInstance.getVarPool(), Property.class)
              .stream()
              .collect(Collectors.toMap(Property::getProp, Property -> Property));

      Long nextBranch = null;
      for (SwitchResultVo switchResultVo : switchResultVos) {
          log.info("Begin to execute switch item: {} ", switchResultVo);
          try {
              String content = SwitchTaskUtils.generateContentWithTaskParams(switchResultVo.getCondition(),
                      globalParams, varParams);
              log.info("Format condition sentence::{} successfully", content);
              boolean conditionResult = SwitchTaskUtils.evaluate(content);
              log.info("Execute condition sentence: {} successfully: {}", content, conditionResult);
              if (conditionResult) {
                  // If matched, break the loop
                  nextBranch = switchResultVo.getNextNode();
                  break;
              }
          } catch (Exception e) {
              log.info("Execute switch item: {} failed", switchResultVo, e);
          }
      }

      if (nextBranch == null) {
          log.info("All switch item is not satisfied");
          moveToDefaultBranch();
      }
  }

How to reproduce

Create a process, select the Switch component, add two branches, add conditions, select the branch, and the default branch

Anything else

No response

Version

3.2.x

Are you willing to submit PR?

Code of Conduct

SbloodyS commented 1 month ago

This will be fixed in #16423

github-actions[bot] commented 1 week ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.