ecodeclub / ecron

大规模分布式任务调度系统(学习版)
Apache License 2.0
63 stars 21 forks source link

Preempter 抽象 #21

Open flycash opened 2 months ago

flycash commented 2 months ago

这算是一个前瞻抽象。

也就是我们单独把抢占和释放这个过程抽象出来。大体上的接口设计是:


type Preempter struct {
     Preempt(ctx context.Context) (Task, cancelFunc, error)
}

这种设计类似于 Context 的设计,要求返回一个 cancelFunc,它的效果是:

这种优势是非常简单,并且将抢占、续约和释放的逻辑内聚到了一起。但是这种形态有一个缺陷,即调用者没有办法知道是否续约失败了,缺乏一个机制通知调用者续约失败了。

另外一种可行的设计是:

type Preempter interface {
    Preempt(ctx context.Context) (Task, error)
    AutoRefresh(ctx, tid) (chan error, error)
    Release(ctx, tid) error
}

那么对于调用者来说,用起来的感觉就是:

 t, err := p.Preempt(ctx)

// 要开一个 Goroutine
errChan, err := p.AutoRefresh()
refreshErr := <- errChan // 续约失败

// 搞完了。
p.Release()

从实践上来说,大部分用户不知道怎么处理续约失败的场景(比如说我们 Scheduler 通知用户说续约失败,他们都不能理解这个概念)。所以第一种设计,并不是不可接受。

flycash commented 2 months ago

在完成了这个抽象之后,提供一个基于 Redis 的实现。结合已有的代码,也就是有:

flycash commented 2 months ago

@Jared-lu

Jared-lu commented 2 months ago

OK,对于第一种设计,我的理解第一种设计像是课程上的实现方案,抽象出一个Preempter

flycash commented 2 months ago

是的,就是让 Scheduler 来管理续约的事情,不够内聚

junwense commented 2 months ago

我的思路可能是这样,对于核心设计我觉得可能可以采用Preempter接口,然后我们可以提供一个Preempter的使用方法,用于给用户一个实现的例子

type Preempter interface {
    Preempt(ctx context.Context) (task.Task, error)
    // AutoRefresh 返回值 chan用于获取错误,第二个chan用于退出,一般是在go func里调用
    AutoRefresh(ctx context.Context, tid int64) (<-chan error, chan<- struct{}, error)
    Release(ctx context.Context, tid int64) error
}

type PreempterExample struct {
    Preempter
}

type PreempterUsage interface {
    DoPreempt(ctx context.Context, f func(ctx context.Context, t task.Task) error) error
}

func (p *PreempterExample) DoPreempt(ctx context.Context, f func(ctx context.Context, t task.Task) error) error {

    if ctx.Err() != nil {
        return ctx.Err()
    }

    myTask, err := p.Preempter.Preempt(ctx)
    if err != nil {
        return err
    }

    var out = make(chan error, 1)
    var end = make(chan error, 1)
    var endRefresh = make(chan error, 1)

    defer func() {
        close(endRefresh)
    }()
    go func() {
        ch, cancel, err := p.Preempter.AutoRefresh(ctx, myTask.ID)
        defer func() { close(cancel) }()
        if err != nil {
            out <- err
            return
        }
        select {
        case <-ch:
            var e = <-ch
            out <- e
        case <-ctx.Done():
        case <-endRefresh:
        }
    }()
    ctx1, cancelFunc := context.WithCancel(ctx)
    defer cancelFunc()
    go func() {
        err = f(ctx1, myTask)
        end <- err
        defer func() {
            close(end)
        }()
    }()

    var needRelease = true
    select {
    // 正常退出
    case <-end:
    // 续约出现问题
    case <-out:
        needRelease = false
    // 超时控制
    case <-ctx.Done():
        return ctx.Err()
    }

    defer func() {
        if needRelease {
            // 超时的情况,考虑是否要重新生成一个ctx
            _ = p.Preempter.Release(ctx, myTask.ID)
            // log err
        }
    }()

    return err
}
flycash commented 2 months ago

这个东西不是给用户用的,它是给 Scheduler 用的。可以认为我是把现在耦合进去 Scheduler 的逻辑抽取出来,让 Scheduler 做更加抽象的事情,比如说选择合适节点,执行容错,安排进度追踪这种。

flycash commented 2 months ago

var out = make(chan error, 1) var end = make(chan error, 1) var endRefresh = make(chan error, 1)

这种设计过于复杂了。信号多了发来发去就容易出现并发问题。

junwense commented 2 months ago

这个东西不是给用户用的,它是给 Scheduler 用的。可以认为我是把现在耦合进去 Scheduler 的逻辑抽取出来,让 Scheduler 做更加抽象的事情,比如说选择合适节点,执行容错,安排进度追踪这种。

有点类似于k8s的kube-scheduler么,我能理解这个功能,但是对等模式有点难搞啊, 合适的节点:tag标记节点,然后再把节点负载资源排序选择,可以是提供一个选择节点的算法抽象(这里隐含里一个概念,就是其实我一直觉得我们项目少了一个抽象,即任务的创建管理者,通过这个对象,可以分配任务,比如分配任务,实际就是给某个任务打上标记,让对应的机器执行,机器可以是多个,这里通过storge查询就能控制) 执行容错:这个应该是要给任务启动协程监控,进度追踪也可以通过这个方案,但是要求任务要有状态和可观测机制

junwense commented 2 months ago

var out = make(chan error, 1) var end = make(chan error, 1) var endRefresh = make(chan error, 1)

这种设计过于复杂了。信号多了发来发去就容易出现并发问题。

     // 这个用于获取续约的错误
    var out = make(chan error, 1)
    // 这个当前流程退出
    var end = make(chan error, 1)
    // 这个用于退出续约流程
    var endRefresh = make(chan error, 1)

具体流程代码可能可以再优化下,但是我暂时没想到, end 这个信号可以用ctx1的cancel方法来处理么,理论上任务结束了,ctx1可以cancel了

flycash commented 1 month ago

不要提前考虑什么筛选节点之类的问题。先把当下的做完