ecodeclub / ecron

大规模分布式任务调度系统(学习版)
Apache License 2.0
63 stars 21 forks source link

基于MySQL 的抢占式存储引擎实现 任务1:基本实现 #10

Closed flycash closed 1 year ago

flycash commented 1 year ago

设计

在整体架构上,Storage 抽象负责任务的抢占与释放、存储任务信息、存储任务执行信息。其中存储任务信息和存储任务信息属于比较简单的部分,而任务的抢占与释放则是一个比较复杂的话题。我们在这里会深入讨论抢占与释放的问题。

任务抢占与释放

基于 MySQL 的实现中所谓的抢占,本质上就是排他性地将任务从一个状态改到另外一个状态。所以这基本上可以归结为一个 CAS 操作:

UPDATE task set status = 'preempted' WHERE id = ? AND status = 'free'

这一个 SQL 的意思就是如果一个任务还没有任何人抢占到,那么我们就抢占掉它。这里就会有一个问题,我们从哪里知道一个任务没有被抢占呢? 因此该实现会有一个定时轮询的过程,查找数据库中的未被抢占的任务,找到之后遍历这些任务,挨个尝试去抢占。如果抢占成功,那么说明该实现所在的节点可以进一步调度这个任务。

因此整体上是一个 SELECT - UPDATE 流程。

这里可以考虑的另外一个方案就是 SELECT FOR UPDATE 语句,而不必使用 CAS 操作。

那么,如果当前实例,要关机了,或者因为负载的问题,不再准备继续调度,那么它需要将状态修改回去未被抢占的状态。也就是:

UPDATE task set status = 'free' where id = ?

但是这种释放有一个问题,即一切正常的情况下,我们可以预期一个节点抢占了任务之后最终会释放掉它。但是我们要考虑到,一个节点在抢占了之后,它可能中途突然崩溃了。在这种崩溃的场景下,任务在数据库中的状态还是被抢占了,那么此时别的存活的节点也没办法继续抢占这个任务。 为了解决这个问题,我们需要引入一种续约机制。

任务续约

所谓租约是指,一个节点抢占了一个任务之后,它并不是永久的占有了这个任务直到释放,而只是占有了一段时间。如果到期没有续约,那么别的节点就可以继续抢占这个任务。

因此对于抢占到的任务来说,节点需要不断为它续约。一种比较简单的方案就是不断更新数据库中 Task 的数据。这里我们显式引入一个字段叫做 epoch,借鉴自分布式协调算法中的概念:

update task set epoch = epoch +1, update_time = now() WHERE id = ?

事实上,在这里我们其实并不太需要 epoch 这个字段,单纯用 update_time 都可以。但是我们可以利用 epoch 来大概判断 Task 被抢占了多久

那么任务抢占的逻辑就变成了:

其中有几个参数应该是可以被控制的,即租约长短,以及 update_time 落后多久了才会被认为可以再次被抢占。例如说,如果 update_time < now() -lease 就可以被抢占,那么偶发性的数据库更新失败引起续约失败,也会导致任务被别的节点抢占,这可能不是我们想要的

负载均衡

这种抢占式的可能在两种情况下引起负载不均衡,一种就是节点采用了类似灰度发布之类的措施,那么先发布的节点就会把所有的可抢占的任务都抢占掉;另外一种就是长期运行之下,会有极低概率导致小部分节点抢占了大部分任务。

所以我们需要考虑在节点之间进行负载均衡的问题。

要求

在本任务里面,你们不需要考虑负载均衡的问题,只需要能够实现一个 MySQL Storage 引擎,该引擎能够抢占到任务,并且续约。

出于简化的目的,续约本身你也可以简单做成一个任务开一个 goroutine 续约。

后续我们会优化。

hookokoko commented 1 year ago

思路清晰很多,我按照这几个点改进一个先

flycash commented 1 year ago

@hookokoko 我拆分了一下任务,你可以先做一个简单的版本。毕竟在一个合并请求里面搞完全部特性还是过于困难了。

Jun10ng commented 1 year ago
如果一个任务的状态是已经被抢占了,但是 update_time < now() - lease,其中 lease 是租约,这意味着节点续约失败了。这种情况下我们认为别的节点可以抢占这个任务了

我觉得下面这种逻辑会比较清晰:

hookokoko commented 1 year ago
  • 单独一个协程负责扫描续约失败的task,并将其状态从“执行”修改为“抢占”

其实,这里task执行失败之后,会产生执行失败的信号传给storage,无需要再启动协程修改

flycash commented 1 year ago

我觉得下面这种逻辑会比较清晰:

  • 将task的生命周期分为"抢占“ "执行“ 两个大状态
  • 单独一个协程负责扫描续约失败的task,并将其状态从“执行”修改为“抢占”
  • 另外一个定时轮询的协程只需要针对”抢占“状态做扫描即可。 (这个轮询协程的优先级比较高,所以逻辑不宜复杂

其实就是两件事:

整个都可以用一个 goroutine。或者说如果我们考虑到节点上正在调度的任务数量很多,那么可以是第一件事一个 goroutine,第二件事可以开启多个 goroutine。那么根据我的描述,我认为使用一个优先级队列 + 一个 goroutine 会更好。

”执行状态“ 是有歧义的。我们考虑重复任务,比如说半小时执行一次,那么我们这里并不是说我执行一次,然后就放弃,然后等快要到时间了再去抢占。这也就是我避免引入执行这个概念的原因。换成“调度中”可能会好一点。

基本状态机就是:未抢占 -> 抢占。这是 status 这种字段的取值。但是考虑到我们节点抢占之后宕机,那么数据库里面, status = ”抢占“,实际上可能抢占它的节点已经挂了。所以逻辑意义上的抢占,是结合了状态字段 + 续约来确定的。

Jun10ng commented 1 year ago
  • 单独一个协程负责扫描续约失败的task,并将其状态从“执行”修改为“抢占”

其实,这里task执行失败之后,会产生执行失败的信号传给storage,无需要再启动协程修改

我是指执行器crash后无法续约的情况。

Jun10ng commented 1 year ago

"对自己正在调度的任务,续约" "第二件事可以开启多个 goroutine"

续约这个事件是从调度中心触发的吗? 我理解应该是 执行器 --> 调度中心,这样的话调度中心只需要心跳接口给执行器就OK了,然后再定时轮询数据库内没及时续约的task,回退状态?

flycash commented 1 year ago

续约这个事情依旧是调度节点完成的,而不是执行节点,因为续约本质上是调度策略的一种实现中的一个细节。而执行节点只需要无脑执行任务就可以了。

flycash commented 1 year ago

所以我们这里讨论的实际上是我调度节点抢占了任务之后,我调度节点本身崩溃了。Storage 这个东西是给调度节点用的。当然从逻辑上,调度节点和执行节点可以是同一个节点,但是我们也可以有专门的调度节点,然后通过 HTTP 或者 gRPC 来调度远程的执行节点。

flycash commented 1 year ago

我想到一个比较简单的设计,就是如果我们不考不同任务续约间隔不同的话,其实我们可以一个 Goroutine 打天下。就是说调度节点启动一个 goroutine,每隔一段时间试图抢占更多的任务,同时把自己已经抢占了的任务,全部续约一遍。

hookokoko commented 1 year ago

有点疑惑scheduler的任务抢占,任务抢占不是在storage做的吗? scheduler是在storage抢占任务之后会收到storage发过来的已抢占信号,然后创建一个执行任务并发送到延迟队列等待到时间执行,整个过程没有涉及到抢占吧

flycash commented 1 year ago

有点疑惑scheduler的任务抢占,任务抢占不是在storage做的吗? scheduler是在storage抢占任务之后会收到storage发过来的已抢占信号,然后创建一个执行任务并发送到延迟队列等待到时间执行,整个过程没有涉及到抢占吧

是的,我说的是调度节点=。=不是 scheduler。Storage 和 Scheduler 都是调度节点上的一部分