apache / dolphinscheduler

Apache DolphinScheduler is the modern data orchestration platform. Agile to create high performance workflow with low-code
https://dolphinscheduler.apache.org/
Apache License 2.0
12.73k stars 4.58k forks source link

[DSIP-65] SubWorkflow logic task support failover/repeat running/pause/kill/recover #16480

Closed ruanwenjun closed 3 weeks ago

ruanwenjun commented 1 month ago

Search before asking

Motivation

Right now the sub workflow doesn't have a good implementation for failover/repeat running/pause/kill/recover, we should redesign this.

Design Detail

The sub workflow task represent a workflow.

image

When the task running, it will create a sub workflow instance, and track its state.

image

And once we trigger/pause/stop/recover the parent workflow we should trigger/pause/stop/recover the sub workflow instance.

Once the sub workflow task running, it will trigger a sub workflow instance. We will use SubWorkflowLogicTaskRuntimeContext to store the sub workflow instance id, so once the task running, we will get the SubWorkflowLogicTaskRuntimeContext, and using the SubWorkflowLogicTaskRuntimeContext to calculate status.

public class SubWorkflowLogicTaskRuntimeContext {

    private Integer subWorkflowInstanceId;

}

When the sub workflow task running, we will trigger the sub workflow by the operation type.

private SubWorkflowLogicTaskRuntimeContext initializeSubWorkflowInstance() {
        // todo: doFailover if the runtime context is not null and task is generated by failover

        if (subWorkflowLogicTaskRuntimeContext == null) {
            return triggerNewSubWorkflow();
        }

        switch (workflowExecutionRunnable.getWorkflowInstance().getCommandType()) {
            case RECOVER_SUSPENDED_PROCESS:
                return recoverFromSuspendTasks();
            case START_FAILURE_TASK_PROCESS:
                return recoverFromFailedTasks();
            default:
                return triggerNewSubWorkflow();
        }

    }

And when we pause/kill the task, we will pause/stop the sub workflow, once the sub workflow instance has been paused/stopped, we can get the status after track status.

@Override
    public void pause() throws MasterTaskExecuteException {
        if (subWorkflowLogicTaskRuntimeContext == null) {
            log.info("subWorkflowLogicTaskRuntimeContext is null cannot pause");
            return;
        }
        final Integer subWorkflowInstanceId = subWorkflowLogicTaskRuntimeContext.getSubWorkflowInstanceId();
        final WorkflowInstancePauseResponse pauseResponse = applicationContext
                .getBean(SubWorkflowControlClient.class)
                .pauseWorkflowInstance(new WorkflowInstancePauseRequest(subWorkflowInstanceId));
        if (pauseResponse.isSuccess()) {
            log.info("Pause sub workflowInstance: id={}", subWorkflowInstanceId + " success");
        } else {
            log.info("Pause sub workflowInstance: id={} failed with response: {}", subWorkflowInstanceId,
                    pauseResponse);
        }
    }

    @Override
    public void kill() throws MasterTaskExecuteException {
        if (subWorkflowLogicTaskRuntimeContext == null) {
            log.info("subWorkflowLogicTaskRuntimeContext is null cannot kill");
            return;
        }
        final Integer subWorkflowInstanceId = subWorkflowLogicTaskRuntimeContext.getSubWorkflowInstanceId();
        final WorkflowInstanceStopResponse stopResponse = applicationContext
                .getBean(SubWorkflowControlClient.class)
                .stopWorkflowInstance(new WorkflowInstanceStopRequest(subWorkflowInstanceId));
        if (stopResponse.isSuccess()) {
            log.info("Kill sub workflowInstance: id={}", subWorkflowInstanceId + " success");
        } else {
            log.info("Kill sub workflowInstance: id={} failed with response: {}", subWorkflowInstanceId, stopResponse);
        }
    }

Compatibility, Deprecation, and Migration Plan

Compatibility with old version.

Test Plan

Test by Master IT.

Code of Conduct