apache / dolphinscheduler

Apache DolphinScheduler is the modern data orchestration platform. Agile to create high performance workflow with low-code
https://dolphinscheduler.apache.org/
Apache License 2.0
12.73k stars 4.58k forks source link

[DSIP-61][Master] Refactor thread pool and state event orchestration in master #16423

Closed ruanwenjun closed 1 month ago

ruanwenjun commented 1 month ago

Search before asking

Motivation

Right now, the thread model and state control in the master are very complicated, these come from the original code and design and lead to the project being hard to maintain.

Concurrency modification of state

Multiple places can modify a workflow instance state in the master. Right now we have an RPC thread pool, a failover daemon thread, a WorkflowExecuteThreadpool, and a TaskExecuteThreadpook. All of them might modify a workflow state or metadata, and all of these modification are not atomic, this means the state of a workflow/task can easily be inconsistent.

image

e.g. If the task is successful but the moment we stop the workflow, then the task state might be successful first but change to stop last, since the task state update is not automatic.

Furthermore, the API will also modify the workflow state e.g. when we do a pause or kill operation, once we do a pause or kill operation, the API server will change the workflow state to READY_PAUSE/READY_STOP and then send a request to the master by RPC, if the RPC fails, the state be in unexcepted.

No end-to-end task state consistency

Once the master receives a pause/kill request, the master will pause/kill the task in the db first, and then send a pause/kill request to the task executor, the request might handle failed、reject, or loss in a distributed environment.

Then the task state will be inconsistent between the master and task executor.

image

e.g. We stop the workflow, then the task will be killed, but if the master sends a kill request to the worker failed or the worker handles the kill request failed or worker failover, then the status will be killed in db but the task is still running in worker or remote server.

State control is very complex

There is no state machine in the master, the translation of state relies on a lot of if-else. There are a lot of bugs and it's impossible to write UT for the state translation. Most of the time, contributors fix a bug but write serial new bugs.

If we want to add a new state, this is impossible, the modification scope will be the whole master.

Failover will cause a master crash

The master doesn't use page query to find out the failover workflow instances, in some stress test, the master will directly OOM once it begins to do master failover or global failover.

The global failover check is too frequent, each time when we do a global failover this will bring a big pressure to db since this will need to scan the whole table of workflow instances. But this only needs when the master startup first.

Seriously, we do failover in the registry client thread, this might block the registry client, since the failover will cost a lot of time.

This DSIP hopes to refactor the master and fix these problems.

Design Detail

The final architecture in the master might look like below

image

Each workflow instance in runtime will be loaded as a WorkflowExecutionRunnable, and the task in the WorkflowExecutionRunnable is represented as a TaskExecutionRunnable. All changes to workflow and task will be triggered by lifecycle event.

WorkflowEventBus

WorkflowEventBus is an event channel that belongs to a specific workflow instance. Each workflow instance has its own EventBus, all operations that will affect the workflow/task running, be transformed into lifecycle events, and put into the EventBus in the order of the order of arrival of events.

This can make the engine correctly easily handle the event since all events belonging to a workflow will be handled by one thread sequence.

WorkflowEventBusCoordinator

The WorkflowEventBusCoordinator is responsible for managing the EventBus and assigns the worker thread for an EventBus.

There are existing configurable workers in the WorkflowEventBusCoordinator. After a WorkflowExecutionRunnable is created, it will be assigned to an EventBusFireWorker, each WorkflowEventBus will only be assigned to one EventBusFireWorker, and one EventBusFireWorker will handle multiple WorkflowEventBus in DFS.

WorkflowEventBusCoordinator ```java public class WorkflowEventBusCoordinator { public void registerWorkflowEventBus(IWorkflowExecutionRunnable workflowExecutionRunnable) { final int workerSlot = calculateWorkflowEventBusFireWorkerSlot(workflowExecutionRunnable); final WorkflowEventBusFireWorker workflowEventBusFireWorker = workflowEventBusFireWorkers.getWorker(workerSlot); workflowEventBusFireWorker.registerWorkflowEventBus(workflowExecutionRunnable); } public void unRegisterWorkflowEventBus(IWorkflowExecutionRunnable workflowExecutionRunnable) { final int workerSlot = calculateWorkflowEventBusFireWorkerSlot(workflowExecutionRunnable); final WorkflowEventBusFireWorker workflowEventBusFireWorker = workflowEventBusFireWorkers.getWorker(workerSlot); workflowEventBusFireWorker.unRegisterWorkflowEventBus(workflowExecutionRunnable); } } ```

The worker number in WorkflowEventBusCoordinator is the only configuration you need to take care of, this will affect the performance of the master, a large number of workers does not mean you will have a better performance since this will increase os thread context switching, and most of the event handling will rely on db connection, other is memory operation this is very fast, only little event will rely on PRC. So it's better not to use more than twice the number of threads in your service's database connection pool, you should do some stress tests if you want to get the best configuration.

WorkflowLifecycleEvent

The workflow lifecycle event represents the operation that might happen in the workflow runtime.

WorkflowLifecycleEventType ```java public enum WorkflowLifecycleEventType implements ILifecycleEventType { /** * Start the workflow instance */ START, /** * Notify the workflow instance there exist a task has been finished, and should do DAG topology logic transaction. */ TOPOLOGY_LOGICAL_TRANSACTION_WITH_TASK_FINISH, /** * Pause the workflow instance */ PAUSE, /** * The workflow instance has been paused */ PAUSED, /** * Stop the workflow instance */ STOP, /** * The workflow instance has been stopped */ STOPPED, /** * The workflow instance has been success */ SUCCEED, /** * The workflow instance has been failed */ FAILED, /** * Finalize the workflow instance. */ FINALIZE, } ```

Not all lifecycle is related to a workflow state. The lifecycle event does not correspond to the state.

Since not all lifecycle changes will affect the state change, some are the inner context transform.

TaskLifecycleEvent

TaskLifecycleEventType ```java public enum TaskLifecycleEventType implements ILifecycleEventType { /** * Start the Task instance. */ START, /** * Dispatch the task instance to target. */ DISPATCH, /** * The task instance is dispatched to the target executor server. */ DISPATCHED, /** * // todo: maybe we can remove this event, once the task has been dispatched it should start * The task instance is running at the target executor server. */ RUNNING, /** * Do Timeout strategy of the task instance. */ TIMEOUT, /** * Retry the task instance. */ RETRY, /** * Pause the task instance. */ PAUSE, /** * The task instance is paused. */ PAUSED, /** * Failover the task instance. */ FAILOVER, /** * Kill the task instance. */ KILL, /** * The task instance is killed. */ KILLED, /** * The task instance is success. */ SUCCEEDED, /** * The task instance is failed. */ FAILED, ; } ```

WorkflowExecutionGraph VS WorkflowGraph

WorkflowExecutionGraph represents a real expected DAG in the runtime, it will record the runtime state of each task chain. You can take it as a physis graph.

WorkflowGraph represents the origin DAG before you trigger a workflow, it will only record the origin DAG context. You can take it as a logic graph.

e.g. If we have a DAG(A, B, C), if we run from B, then the WorkflowExecutionGraph will only contains one task.

image

And if we run from A the WorkflowExecutionGraph will have the same DAG with WorkflowGraph. In addition, the WorkflowExecutionGraph will contain more runtime data, if we want to know the state of the Workflow in runtime, we can directly query from the WorkflowExecutionGraph.

IWorkflowExecutionGraph ```java /** * The workflow execution graph represent the real DAG in runtime, it might be a sub DAG of the workflow DAG. * * @see WorkflowExecutionGraph */ public interface IWorkflowExecutionGraph { /** * Add a new task to the graph. */ void addNode(TaskExecutionRunnable taskExecutionRunnable); /** * Add a new edge to the graph. *

Right now, this method call after all the tasks are added to the graph. */ void addEdge(String fromTaskName, Set toTaskName); /** * Return the start tasks, the start tasks in the workflow execution graph is the tasks which predecessors is empty. */ List getStartNodes(); /** * Get the predecessor tasks of the given task. */ List getPredecessors(String taskName); /** * Return the successor tasks of the given task. */ List getSuccessors(String taskName); /** * Return the successor tasks of the given task. */ List getSuccessors(ITaskExecutionRunnable taskExecutionRunnable); /** * Get the ITaskExecutionRunnable by task code. */ ITaskExecutionRunnable getTaskExecutionRunnableByName(String taskName); /** * Get the ITaskExecutionRunnable by task instance id. */ ITaskExecutionRunnable getTaskExecutionRunnableById(Integer taskInstanceId); /** * Get the ITaskExecutionRunnable by task code. */ ITaskExecutionRunnable getTaskExecutionRunnableByTaskCode(Long taskCode); /** * Get the active TaskExecutionRunnable list. *

The active TaskExecutionRunnable means the task is handling in the workflow execution graph. */ List getActiveTaskExecutionRunnable(); /** * Get all the TaskExecutionRunnable in the graph, this method will return all the TaskExecutionRunnable in the graph, * include active and inactive TaskExecutionRunnable. */ List getAllTaskExecutionRunnable(); /** * Check whether the given task can be trigger now. *

The task can be trigger if all the predecessors are finished and all predecessors are not failure/pause/kill. */ boolean isTriggerConditionMet(ITaskExecutionRunnable taskExecutionRunnable); /** * Mark the TaskExecutionRunnable is active. *

If the TaskExecutionRunnable is active means the task is handling by the workflow. *

Once we begin to handle a task, we should mark the TaskExecutionRunnable active. */ void markTaskExecutionRunnableActive(ITaskExecutionRunnable taskExecutionRunnable); /** * Mark the TaskExecutionRunnable is inactive. *

If the TaskExecutionRunnable is inactive means the task has not been handled by the workflow. *

Once we finish to handle a task, we should mark the TaskExecutionRunnable inactive. */ void markTaskExecutionRunnableInActive(ITaskExecutionRunnable taskExecutionRunnable); /** * Mark the TaskExecutionRunnable is skipped. *

Once the TaskExecutionRunnable is marked as skipped, this means the task will not be trigger. */ void markTaskSkipped(ITaskExecutionRunnable taskExecutionRunnable); /** * Mark the Task is skipped. *

Once the Task is marked as skipped, this means the task will not be trigger. */ void markTaskSkipped(String taskName); /** * Mark the TaskExecutionRunnable chain is failure. *

Once the TaskExecutionRunnable chain is failure, then the successors will not be trigger, and the workflow execution graph might be failure. */ void markTaskExecutionRunnableChainFailure(ITaskExecutionRunnable taskExecutionRunnable); /** * Mark the TaskExecutionRunnable chain is pause. *

Once the TaskExecutionRunnable chain is pause, then the successors will not be trigger, and the workflow execution graph might be paused. */ void markTaskExecutionRunnableChainPause(ITaskExecutionRunnable taskExecutionRunnable); /** * Mark the TaskExecutionRunnable chain is kill. *

Once the TaskExecutionRunnable chain is kill, then the successors will not be trigger, and the workflow execution graph might be stop. */ void markTaskExecutionRunnableChainKill(ITaskExecutionRunnable taskExecutionRunnable); /** * Whether all the TaskExecutionRunnable chain in the graph is finish. */ boolean isAllTaskExecutionRunnableChainFinish(); /** * Whether all the TaskExecutionRunnable chain in the graph is finish with success. */ boolean isAllTaskExecutionRunnableChainSuccess(); /** * Whether there exist the TaskExecutionRunnable chain in the graph is finish with failure. */ boolean isExistFailureTaskExecutionRunnableChain(); /** * Whether there exist the TaskExecutionRunnable chain in the graph is finish with paused. */ boolean isExistPauseTaskExecutionRunnableChain(); /** * Whether there exist the TaskExecutionRunnable chain in the graph is finish with kill. */ boolean isExistKillTaskExecutionRunnableChain(); /** * Check whether the given task is the end of the task chain. *

If the given task has no successor, then it is the end of the task chain. */ boolean isEndOfTaskChain(String taskName); /** * Check whether the given task is the end of the task chain. *

If the given task has no successor, then it is the end of the task chain. */ boolean isEndOfTaskChain(ITaskExecutionRunnable taskExecutionRunnable); /** * Whether the given task is skipped. *

Once we mark the task is skipped, then the task will not be trigger, and will trigger its successors. */ boolean isTaskExecutionRunnableSkipped(ITaskExecutionRunnable taskExecutionRunnable); /** * Whether the given task is forbidden. *

Once the task is forbidden then it will be passed, and will trigger its successors. */ boolean isTaskExecutionRunnableForbidden(ITaskExecutionRunnable taskExecutionRunnable); /** * Whether all predecessors task is skipped. *

Once all predecessors are marked as skipped, then the task will be marked as skipped, and will trigger its successors. */ boolean isAllPredecessorsSkipped(ITaskExecutionRunnable taskExecutionRunnable); /** * Whether all predecessors task are condition task. */ boolean isAllSuccessorsAreConditionTask(ITaskExecutionRunnable taskExecutionRunnable); } ```

StateMachine

Use StateMachine to control the workflow and task state change.

WorkflowStateMachine

The WorkflowStateMachine used to control the state transition in workflow. Each state in workflow will implement IWorkflowStateAction.

Each state should implement IWorkflowStateAction and provide the function to handle WorkflowLifecycleEvent.

IWorkflowStateAction ```java /** * Represents the action to be taken when a workflow is in a certain state and receive a target event. *

Each {@link WorkflowExecutionStatus} should have a corresponding {@link IWorkflowStateAction} implementation. * * @see WorkflowRunningStateAction * @see WorkflowReadyPauseStateAction * @see WorkflowPausedStateAction * @see WorkflowReadyStopStateAction * @see WorkflowStoppedStateAction * @see WorkflowSerialWaitStateAction * @see WorkflowFailedStateAction * @see WorkflowSuccessStateAction * @see WorkflowFailoverStateAction * @see WorkflowWaitToRunStateAction */ public interface IWorkflowStateAction { /** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowStartLifecycleEvent}. */ void startEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowStartLifecycleEvent workflowStartEvent); /** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent}. */ void topologyLogicalTransitionEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent workflowTopologyLogicalTransitionWithTaskFinishEvent); /** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowPauseLifecycleEvent}. */ void pauseEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowPauseLifecycleEvent workflowPauseEvent); /** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowStopLifecycleEvent}. */ void pausedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowPausedLifecycleEvent workflowPausedEvent); /** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowStopLifecycleEvent}. */ void stopEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowStopLifecycleEvent workflowStopEvent); /** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowStoppedLifecycleEvent}. */ void stoppedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowStoppedLifecycleEvent workflowStoppedEvent); /** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowSucceedLifecycleEvent}. */ void succeedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowSucceedLifecycleEvent workflowSucceedEvent); /** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowFailedLifecycleEvent}. */ void failedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowFailedLifecycleEvent workflowFailedEvent); /** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowFinalizeLifecycleEvent}. */ void finalizeEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowFinalizeLifecycleEvent workflowFinalizeEvent); /** * Get the {@link WorkflowExecutionStatus} that this action match. */ WorkflowExecutionStatus matchState(); } ```

TaskStateMachine

The TaskStateAction used to control the state transition in task. Each state in task will implement ITaskStateAction.

Each state should implement ITaskStateAction and provide the function to handle TaskLifecycleEvent.

ITaskStateAction ```java /** * Represents the action to be taken when a task is in a certain state and receive a target event. *

Each {@link TaskExecutionStatus} should have a corresponding {@link ITaskStateAction} implementation. * * @see TaskSubmittedStateAction * @see TaskDelayExecutionStateAction * @see TaskDispatchStateAction * @see TaskRunningStateAction * @see TaskPauseStateAction * @see TaskKillStateAction * @see TaskFailureStateAction * @see TaskSuccessStateAction * @see TaskForceSuccessStateAction * @see TaskFailoverStateAction */ public interface ITaskStateAction { /** * Perform the necessary actions when the task in a certain state receive a {@link TaskStartLifecycleEvent}. *

This method is called when you want to start a task. */ void startEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final ITaskExecutionRunnable taskExecutionRunnable, final TaskStartLifecycleEvent taskStartEvent); /** * Perform the necessary actions when the task in a certain state receive a {@link TaskRunningLifecycleEvent}. *

This method is called when the master receive task running event from executor. */ void startedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final ITaskExecutionRunnable taskExecutionRunnable, final TaskRunningLifecycleEvent taskRunningEvent); /** * Perform the necessary actions when the task in a certain state receive a {@link TaskRetryLifecycleEvent}. *

This method is called when the task need to retry. */ void retryEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final ITaskExecutionRunnable taskExecutionRunnable, final TaskRetryLifecycleEvent taskRetryEvent); /** * Perform the necessary actions when the task in a certain state receive a {@link TaskDispatchLifecycleEvent}. *

This method is called when you want to dispatch a task. */ void dispatchEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final ITaskExecutionRunnable taskExecutionRunnable, final TaskDispatchLifecycleEvent taskDispatchEvent); /** * Perform the necessary actions when the task in a certain state receive a {@link TaskDispatchedLifecycleEvent}. *

This method is called when the task has been dispatched to executor. */ void dispatchedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final ITaskExecutionRunnable taskExecutionRunnable, final TaskDispatchedLifecycleEvent taskDispatchedEvent); /** * Perform the necessary actions when the task in a certain state receive a {@link TaskPauseLifecycleEvent}. *

This method is called when you want to pause a task. */ void pauseEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final ITaskExecutionRunnable taskExecutionRunnable, final TaskPauseLifecycleEvent taskPauseEvent); /** * Perform the necessary actions when the task in a certain state receive a {@link TaskPausedLifecycleEvent}. *

This method is called when the task has been paused. */ void pausedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final ITaskExecutionRunnable taskExecutionRunnable, final TaskPausedLifecycleEvent taskPausedEvent); /** * Perform the necessary actions when the task in a certain state receive a {@link TaskKillLifecycleEvent}. *

This method is called when you want to kill a task. */ void killEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final ITaskExecutionRunnable taskExecutionRunnable, final TaskKillLifecycleEvent taskKillEvent); /** * Perform the necessary actions when the task in a certain state receive a {@link TaskKilledLifecycleEvent}. *

This method is called when the task has been killed. */ void killedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final ITaskExecutionRunnable taskExecutionRunnable, final TaskKilledLifecycleEvent taskKilledEvent); /** * Perform the necessary actions when the task in a certain state receive a {@link TaskFailedLifecycleEvent}. *

This method is called when the task has been failed. */ void failedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final ITaskExecutionRunnable taskExecutionRunnable, final TaskFailedLifecycleEvent taskFailedEvent); /** * Perform the necessary actions when the task in a certain state receive a {@link TaskSuccessLifecycleEvent}. *

This method is called when the task has been success. */ void succeedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final ITaskExecutionRunnable taskExecutionRunnable, final TaskSuccessLifecycleEvent taskSuccessEvent); /** * Perform the necessary actions when the task in a certain state receive a {@link TaskFailoverLifecycleEvent}. *

This method is called when the task need to failover. */ void failoverEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final ITaskExecutionRunnable taskExecutionRunnable, final TaskFailoverLifecycleEvent taskFailoverEvent); /** * Get the {@link TaskExecutionStatus} that this action match. */ TaskExecutionStatus matchState(); } ```

Failover

The failover will use a SystemEventBus to trigger.

image

There are exist three kinds of failover event in DS.

GlobalMasterFailover

The GlobalMasterFailover used to failover the history workflow in the systems. Once the master server startup it will publish a GlobalMasterFailoverEvent, the GlobalMasterFailover will scan the not finished workflow instance in DB, and check whether we should failover them, so the GlobalMasterFailover will cost a lot of time.

MasterFailover

The MasterFailover is used to failover the workflow under the crashed master. Once the master crashed, then other active masters will receive the master crashed event, then publish a MasterFailoverEvent. If all Master crashed, then failover work will rely on GlobalMasterFailover.

WorkerFailover

The WorkerFailover is used to failover the task under the crashed worker. Once the worker crashed, then all active masters will receive the worker crashed event, then publish a WorkerFailoverEvent. Each master will only failover the task which hold on the master and has been dispatched to the crashed worker.

Compatibility, Deprecation, and Migration Plan

Compatibility with the latest version, but will deprecated some interface which is not used by ui.

Test Plan

Test by UT, E2E, manual.

Master Integration Test

Right now, there is no master integration test, in this DSIP, will add master IT test support.

Code of Conduct

hkun0120 commented 1 month ago

well done!