ecodeclub / ecron

大规模分布式任务调度系统(学习版)
Apache License 2.0
63 stars 21 forks source link

长时间运行的任务探查接口 #20

Open flycash opened 2 months ago

flycash commented 2 months ago

目前我们支持了 HTTP 接口作为远程任务。

现在我们需要考虑这么一种场景:我的任务需要长时间运行,并且我作为用户,我希望知道我的运行进度。

因此我们改进调度 HTTP 任务的方式,为此我们有一个假定,即用户提供的 HTTP 任务接口,是符合我们预期的。

因此,对于一个 HTTP 任务来说:

一些设计要点:

flycash commented 2 months ago

不需要写文档,你直接写你代码和接口,我大概就能看明白。看不明白的时候,我就会问你的。

junwense commented 2 months ago

我理解是不是这样,提供一个一个查询接口,然后这个接口检查任务运行状态【这个任务可以是http任务】,这个时候我们可以要求任务的流程提供者提供一个接口,返回任务执行的状态,假设是running,successed,fail,unknow,然后根据结果做一些事情,这里用hid做幂等就行。

flycash commented 2 months ago

我理解是不是这样,提供一个一个查询接口,然后这个接口检查任务运行状态【这个任务可以是http任务】,这个时候我们可以要求任务的流程提供者提供一个接口,返回任务执行的状态,假设是running,successed,fail,unknow,然后根据结果做一些事情,这里用hid做幂等就行。

是的。

flycash commented 2 months ago

目前我们可以简化一下设计,也就是认为调度任务的 HTTP 接口,也可以用来查询任务进度。后面我们可以考虑允许用户提供不同的接口,或者集中提供一个接口查询所有的在它这里调度的任务的进度

flycash commented 2 months ago

@Jared-lu

Jared-lu commented 2 months ago

在部分失败和超时响应的处理上,要考虑用户节点是不是挂了?如果挂了,那当前这个任务是直接终止,等待下一次调度呢,还是让另外的节点立刻去执行?

flycash commented 2 months ago

暂时还不需要考虑这个问题。等后面我们统一设计容错机制。暂时只需要一个探活,后续我们可以要求节点上报更多数据,或者通过第三方监控来汇总性能数据,执行容错。

junwense commented 2 months ago

写了个基本的demo,我不太能理解,现在的task,executor,这2个实体具体的职责范围,可能要明确讨论下,

package executor

import (
    "context"
    "errors"
    "github.com/ecodeclub/ecron/internal/storage"
    "github.com/ecodeclub/ecron/internal/task"
    "log/slog"
    "time"
)

// ExecuteTask
// http服务抽象
// 这里需要改造目前的task设计,要抽取出一个类似于taskCfg的存储结构
// 然后这里根据cid和tid,会创建一个新的task,并且生成eid,如果创建失败,则没有eid
// 但是这样设计会有一个问题,即用户创建的是周期执行任务,eid一个会有问题,,所有也可以考虑不提供返回eid
// 提供接口让用户根据此task自己查询eid
func ExecuteTask(cid int64, tid int64, exp map[string]string) int64 {
    panic("")
}

// QueryJobByEid
// http服务抽象
// 根据eid目前任务状态,返回一个json
func QueryJobByEid(eid int64) string {
    panic("")
}

type CommonStateExecutor struct {
    l            *HttpExecutor
    executionDAO storage.ExecutionDAO
    logger       *slog.Logger
}

func (s *CommonStateExecutor) Name() string {
    return "有状态的" + s.l.Name()
}

// Run 考虑把eid传入ctx,发给job的真正执行方
func (s *CommonStateExecutor) Run(ctx context.Context, t task.Task) error {
    // 这里不把eid暴露出去,则只能通过提供接口查询记录表里面的数据获取eid
    s.saveExecutors(t.ID, t.Eid, task.ExecStatusStarted)
    var eid = t.Eid
    var err error = nil
    ch := make(chan struct{}, 1)
    ticker := time.NewTicker(time.Second * 3)
    ctx, cancel := context.WithCancel(ctx)
    go func() {
        go func() {
            err = s.l.Run(ctx, t)
        }()
        for {
            i := 0
            select {
            // 这里是调用http接口获取状态
            case <-ticker.C:
                // 这段代码考虑通过封装,暴露成http接口,可以主动上报错误,但是比较麻烦的是ch这个怎么处理
                state, rates := getTargetState()
                switch state {
                case TargetStateRunning:
                    i = 0
                    s.updateExecutorRate(t.ID, t.Eid, rates)
                case TargetStateFailed:
                    ch <- struct{}{}
                    cancel()
                    return
                case TargetStateSucceed:
                    ch <- struct{}{}
                    cancel()
                    return
                case TargetStateTimeout:
                    i++
                    if i == 3 {
                        ch <- struct{}{}
                        cancel()
                        return
                    }
                default:
                    ch <- struct{}{}
                    cancel()
                    return
                }
            case <-ctx.Done():
                return
            }
        }

    }()

    select {
    case <-ch:
    case <-ctx.Done():
        err = ctx.Err()
        close(ch)
        _ = cancelTarget()
    }
    defer func() {
        switch {
        case errors.Is(err, context.DeadlineExceeded):
            s.updateExecutor(t.ID, eid, task.ExecStatusDeadlineExceeded)
        case errors.Is(err, context.Canceled):
            s.updateExecutor(t.ID, eid, task.ExecStatusCancelled)
        case err == nil:
            s.updateExecutor(t.ID, eid, task.ExecStatusSuccess)
        default:
            s.updateExecutor(t.ID, eid, task.ExecStatusFailed)
        }
    }()

    return err
}

type TargetState uint8

const (
    TargetStateUnknown TargetState = iota
    TargetStateRunning
    TargetStateSucceed
    TargetStateFailed
    TargetStateTimeout
)

// todo 抽取成方法放在task上
func getTargetState() (TargetState, uint8) {
    return TargetStateSucceed, 0
}

// todo 抽取成方法放在task上
func cancelTarget() error {
    panic("")
}

func (s *CommonStateExecutor) GetState(ctx context.Context, tid int64, eid int64) (ExecStatus, error) {
    panic(" 从 executionDAO 里查当前任务执行状态")
}
func (s *CommonStateExecutor) saveExecutors(tid int64, eid int64, status task.ExecStatus) {
    panic("记录执行记录,返回eid")
}
func (s *CommonStateExecutor) updateExecutor(tid int64, eid int64, status task.ExecStatus) {
    panic("更新结果")
}
func (s *CommonStateExecutor) updateExecutorRate(tid int64, eid int64, rate uint8) {
    panic("更新结果")
}
flycash commented 2 months ago

很简单,Scheduler 就是你的老板,你就是 Executor。你的老板只负责安排任务给你,然后你去执行。在任务这里,根据任务的形态引入了不同的实现,比如说 http 任务代表的是任务本身就是一个 http 接口,调度执行就是发一个 http 请求。

Scheduler 只是居中指挥而已。比如说抢占一个任务,挑选合适的执行节点,安排进度追踪这种

junwense commented 2 months ago

很简单,Scheduler 就是你的老板,你就是 Executor。你的老板只负责安排任务给你,然后你去执行。在任务这里,根据任务的形态引入了不同的实现,比如说 http 任务代表的是任务本身就是一个 http 接口,调度执行就是发一个 http 请求。

Scheduler 只是居中指挥而已。比如说抢占一个任务,挑选合适的执行节点,安排进度追踪这种

Scheduler作为调度者,他的职责我觉得是没有异议的 executor作为执行器和task作为任务这里就有比较奇怪的争议: 比如

  1. 按照以下范围设计:用法可能就是有固定几个executor,然后用户要创建不同复杂的task注册 executor作为执行器,只负责http请求,grpc请求,这些事情 task作为任务,任务可以是普通任务,可观测的任务,可以重复执行的任务,固定定时执行的任务,执行完成休息一段时间再执行的任务(也就是现在的task设计) 这样,就会导致任务的形态很多变,还可能引入taskCfg这种抽象

  2. 按照另一个纬度设计:用法可能是只要指定一个简单的task配置,然后写一个复杂的executor,可以组合不同executor executor有很多个,http请求的executor里面也可以包含各种实现,比如可观测的http请求,固定执行http的请求,这种设计就要求把task的调度逻辑封装在executor中,这个和现在设计是存在冲突的, task作为任务,只有配置信息,不负责流程。

最后附下个人先入为主的观点: scheuler作为调度者 taskcfg作为任务的配置: 包含2部分,1.是任务的调度模式、任务的资源需求,2.任务的调用执行信息 executor作为任务具体执行者,这里为什么我纠结,其实很多task的执行流程是一样的,或者说是有共性,比方说http调用,http调用的可观测叠加,grpc调用,grpc的可观测,甚至可以说shell调用,那么就有一个问题,这些应该是通过修饰器组合而成的,这个时候就要明确下哪些是可以做成修饰器,哪些要做出executor的固定环节,目前我觉得调度的流程应该是不能修饰的,调度流程和执行流程应该是分开的2个部分