apache / dolphinscheduler

Apache DolphinScheduler is the modern data orchestration platform. Agile to create high performance workflow with low-code
https://dolphinscheduler.apache.org/
Apache License 2.0
12.79k stars 4.59k forks source link

[Improvement] Propose for master refactor: less thread consumption, less time consumption #4304

Closed lenboo closed 3 years ago

lenboo commented 3 years ago

Backgroud:

refer: https://github.com/apache/incubator-dolphinscheduler/issues/4083

At present, the problems of master:

There are many polling, that result in unnecessary time-cost

The distributed lock is used when the command is taken, that result in the bottleneck of concurrency

Too many threads(nProcessInstance*nTaskInstances) are used, that result in the waste of system resources

Polling database result in database query pressure bottleneck

Solution:

1. Command processing

The API sends command to the scheduler through netty. If the sending fails, try sending again three times

*Add API, master and scheduler communication process

*API receives the execution workflow command and sends commnad message to the scheduler (synchronization). If it fails, it will try sending again three times and throw up the failure message if it fails three times.

*The API receives the pause / stop command and sends a pause / stop message to the master (synchronization). If it fails, it will try to send it again three times and throw a failure message upward if it fails three times

2. Add scheduler function

In Ha mode, the received command message queue is processed and sent to the master according to the policy (priority, load...)

-After receiving the command message, save the command to the DB, cache it to the local cache queue, and reply to the message sender

-Ha's scheduler is implemented in the master service and elects the active process

-By monitoring the ZK path, it is found that after active is down, stand by begins to compete for active

-After the stand by competition reaches active, scan the command table, and then start to process the command message

-Command cache queue processing, according to the (priority, load...) policy, send the CMD to the master, and the master marks the host of the CMD

-Timing function (choose one from master timing)

-The active scheduler is triggered regularly to generate command messages and cache them to the local and DB. The problem is that the tasks will be lost during the active election process.

3. Add master polling status (subworkflow, dependent task refactoring)

After the master performs the workflow / task, it sends the result to other master (check whether it is needed) and the dependency and sub workflow need this function

-(do not do this scheme for the time being) the worker / Master sends the task and workflow execution results to the master, and the master receives the task execution results

-Perform the next task according to the next task relationship

-Send the task result to the specified master according to the dependency

-A polling thread is added inside the master, which is responsible for polling the status (dependency and subworkflow) of the task / workflow required by the workflow of the master.

-Master timing function

-The master triggers regularly, generates command message, and sends it (asynchronously) to active. If acitve receives the task save DB successfully, but fails to reply to the message in time, it hangs up, and two timed CMDS will be generated

4. Fault tolerant process change

-Master down

-Find the workflow instance that the master is responsible for. Each workflow generates a fault-tolerant command and sends the fault-tolerant commnad to the active scheduler for active processing (1. Save the CMD to DB 2. Modify host = null)

-Find the task instance that the master is responsible for and check whether the worker of the task instance is alive + task start time > worker start time. (the worker hangs up first, the master hangs up later), and changes the task status to "fault tolerant"

-Find the unfinished command and send it to the scheduler for reprocessing

-If the send fails, always try again

-When the master submits a task, it is found that the task is in progress, and it needs to inform the worker of the master to which the task belongs

-If the DAG does not receive the message, it is not responsible for processing the result

-Worker down

-The original scheme: the master who grabs the distributed lock will change the status of all tasks executed on this worker to "fault tolerant"

-New solution: worker hang up - > each master only tolerates its own tasks

5. Reconstruct the communication function

Communication and service separation, communication has the function

-- > send message to the receiver and send message to the sender

-Asynchronous: send thread send message, send thread cache message, receive message and process message, reply to sender command after processing message, remove cache message after receiving command

6. Master execution process modification

Master is responsible for all workflow resolution, fault tolerance, status monitoring and task sending

-The master receives a CMD from the scheduler

-[PI list processing thread] generates PI processing class, caches process instance, and then sends it to PI thread pool for processing. If there is a state change, corresponding processing is performed (sending the next task / modifying workflow state, etc.)

-[PI thread pool] PI processing, parsing parameters, parsing DAG, finding start task, generating start task instance to DB, sending start task data to queue, and the thread ends.

-The master task status queue receives the task / workflow status from the worker / API / master, adds the status data to the queue, and the status queue is processed by the state processing thread pool. The task state of the same workflow can only be processed by one thread.

-The master sends the task thread, gets the task instance from the queue and sends it to the corresponding worker,Need global queue and plug-in.

-Master timeout monitoring time wheel. All tasks / workflows that need timeout monitoring need to join the timeout monitoring time wheel. If a timeout occurs, the timeout policy will be triggered (1. Alarm information 2. Modify the task status to failure

================================================================

背景:

目前发现master的问题:

出现比较多的轮询,造成不必要的耗时 取command的时候使用了分布式锁,造成并发数量的瓶颈 线程使用过多(nProcessInstance*nTaskInstances) ,造成系统资源浪费的现象 轮询数据库,数据量大的情况下,造成数据库查询压力瓶颈

方案:

1. command处理

api将command通过netty发送给scheduler, 发送失败则重试三次发送

2. 新增scheduler功能

HA模式,处理收到的command消息队列, 根据策略(优先级,负载...),发给master

3. 增加master轮询状态(子工作流、依赖任务重构)

master执行完工作流/任务,(检查是否需要)发送结果给别的master,依赖和子工作流需要这个功能

-(暂时不做这个方案) worker/master把任务、工作流执行结果发送给master,master收到任务执行结果以后

4. 容错流程改变

5. 重构通信功能

通信和业务剥离, 通信具有的功能

  - 同步: 发送线程send消息(阻塞) -> 接收方收到消息,并且处理消息(存db,写数据等)->接收方返回消息给发送方 -> 发送方解除阻塞
  - 异步: 发送线程send消息->发送线程缓存消息-> 接收方收到消息,并处理->处理完消息回复发送方command -> 发送方收到command,移除缓存消息

6. master执行流程修改

master负责所有工作流的解析,容错,状态监控,任务发送

wen-hemin commented 3 years ago

Good job.

davidzollo commented 3 years ago

we will discuss the architecture refactoring draft at 7 o'clock tonight (Beijing time), Everyone is welcome to join the meeting. To join the meeting on Google Meet, click this link: meet.google.com/ttu-pzft-gxy

CalvinKirs commented 3 years ago

full of expectation

On 12/26/2020 11:51,dailidongnotifications@github.com wrote:

we will discuss the architecture refactoring draft at 7 o'clock tonight (Beijing time), Everyone is welcome to join the meeting. To join the meeting on Google Meet, click this link: meet.google.com/ttu-pzft-gxy

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub, or unsubscribe.

lenboo commented 3 years ago

image

meet-alfie commented 3 years ago

Even if the Scheduler uses HA mde, it also breaks down distributed decentralization.It's going to be under all the pressure that the master was under before, and it's likely to be introduced migration,data consistency ,maintenance,etc. It can take an optional plug-in, and event if it's not introduced, it won't block the entire process. Prioritize the most commonly used and simplest scenarios over the more complex ones.

lenboo commented 3 years ago

Even if the Scheduler uses HA mde, it also breaks down distributed decentralization.It's going to be under all the pressure that the master was under before, and it's likely to be introduced migration,data consistency ,maintenance,etc. It can take an optional plug-in, and event if it's not introduced, it won't block the entire process. Prioritize the most commonly used and simplest scenarios over the more complex ones.

  1. master is also distributed decentralization;
  2. scheduler can take the command distribution task, master does not need a distributed lock, which can relieve the master pressure;