xuxueli / xxl-job

A distributed task scheduling framework.(分布式任务调度平台XXL-JOB)
http://www.xuxueli.com/xxl-job/
GNU General Public License v3.0
27.47k stars 10.86k forks source link

xxl-job go 语言接入指南 #2423

Closed mousycoder closed 2 years ago

mousycoder commented 3 years ago

介绍 XXL-JOB任务调度平台分为2个部分,Scheduler和Executor。具体的实现Scheduler对应是xxl-job-admin,同时xxl-job-admin还配有web UI,可以配置管理任务。

Scheduler和Executor之间通过HTTP API交互,因此Executor可以通过各种语言实现。

图片

以上图为例,scheduleThread将任务通过Executor的/run api推送给Executor

{

"jobId": 3,

"executorHandler": "task.test",

"executorParams": "x=100",

"executorBlockStrategy": "SERIAL_EXECUTION",

"executorTimeout": 0,

"logId": 17,

"logDateTime": 1606100913829,

"glueType": "BEAN",

"glueSource": "",

"glueUpdatetime": 1606099566000,

"broadcastIndex": 0,

"broadcastTotal": 1

}

Executor会根据executorHandler找到对应的handler,执行完之后,又会调用xxl-job-admin的/xxl-job-admin/api/callback回报任务的执行结果。从上面的描述我们可以知道,xxl-job-admin和excutor都必须暴露出api服务(都是HTTP接口)。

Scheduler可以有多个。它们之间通过MySQL进行同步。

主要的调度逻辑在JobScheduleHelper中

在每一轮执行调度逻辑之前, Scheduler必须先获得行锁

while (!scheduleThreadToStop) {

...

// 加行锁

try {

preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );

preparedStatement.execute();

...

} catch (Exception e) { ... } finally {

...

// 注意:锁必须正常释放

conn.commit();

...

}

由于xxl_job_lock 表中只有一条记录,所以这个逻辑与请求表锁类似,开销是比较大的。

其实这里还可以利用分治法的思想,让不同的任务对应到不同的行锁。来提高整体的并发度。依我推测, xxl-job 设计时考虑就是调度任务的数量不会太多。因此性能不是它的最主要关注点。

xxl-job内部没有使用Zookeeper这种数据库,因此在高可用性上与Quartz相比还是会稍微弱一些。好在它依赖少,搭建、学习的成本就会非常低。

对MySQL而言,如果xxl-job-admin在持有行锁的期间发生异常退出,与MySQL的连接断开。一段时间之后,MySQL会自动主动释放这个行锁。因此并不会出现死锁的问题。

接入指南 xxl-job-go-sdk xxl-job go 客户端

支持 1.执行器注册

2.耗时任务取消

3.任务注册,像写http.Handler一样方便

4.任务panic处理

5.阻塞策略处理

6.任务完成支持返回执行备注

7.任务超时取消 (单位:秒,0为不限制)

8.失败重试次数(在参数param中,目前由任务自行处理)

9.可自定义日志

10.自定义日志查看handler

11.支持外部路由(可与gin集成)

How to get go get github.com/mousycoder/xxl-job-go-sdk

Example package main

import (

"fmt"

xxl “github.com/mousycoder/xxl-job-go-sdk"

“github.com/xxl-job-go-sdk/example/task"

"log"

)

func main() {

exec := xxl.NewExecutor(

xxl.ServerAddr("http://127.0.0.1/xxl-job-admin"),

xxl.AccessToken(""), //请求令牌(默认为空)

xxl.ExecutorIp("127.0.0.1"), //可自动获取

xxl.ExecutorPort("9999"), //默认9999(非必填)

xxl.RegistryKey("golang-jobs"), //执行器名称

xxl.SetLogger(&logger{}), //自定义日志

)

exec.Init()

//设置日志查看handler

exec.LogHandler(func(req xxl.LogReq) xxl.LogRes {

return &xxl.LogRes{Code: 200, Msg: "", Content: xxl.LogResContent{

FromLineNum: req.FromLineNum,

ToLineNum: 2,

LogContent: "这个是自定义日志handler",

IsEnd: true,

}}

})

//注册任务handler

exec.RegTask("task.test", task.Test)

exec.RegTask("task.test2", task.Test2)

exec.RegTask("task.panic", task.Panic)

log.Fatal(exec.Run())

}

//xxl.Logger接口实现

type logger struct{}

func (l *logger) Info(format string, a ...interface{}) {

fmt.Println(fmt.Sprintf("自定义日志 - "+format, a...))

}

func (l *logger) Error(format string, a ...interface{}) {

log.Println(fmt.Sprintf("自定义日志 - "+format, a...))

}

xxl-job-admin配置 添加执行器 执行器管理->新增执行器,执行器列表如下:

AppName 名称 注册方式 OnLine 机器地址 操作

golang-jobs golang执行器 自动注册 查看 ( 1 )

查看->注册节点

http://127.0.0.1:9999

添加任务 任务管理->新增(注意,使用BEAN模式,JobHandler与RegTask名称一致)

1 测试panic BEAN:task.panic 0 ? admin STOP

2 测试耗时任务 BEAN:task.test2 * ? admin STOP

3 测试golang BEAN:task.test * ? admin STOP

SDK 源码地址 https://github.com/mousycoder/xxl-job-go-sdk

SDK 源码解析 1.初始化执行器信息(令牌,执行器名称,IP,端口,调度中心地址)

exec := xxl.NewExecutor(

xxl.ServerAddr("http://xxl-job.test.com:18088/xxl-job-admin"),

xxl.AccessToken(“xxxxx"), //请求令牌(默认为空)

//xxl.ExecutorIp("127.0.0.1"), //可自动获取

xxl.ExecutorPort("9999"), //默认9999(非必填)

xxl.RegistryKey("golang-jobs"), //执行器名称

xxl.SetLogger(&logger{}), //自定义日志

)

2.调用执行器注册接口(/api/registry)注册执行器到调度中心(20秒心跳防止过期)

func (e *executor) registry() {

t := time.NewTimer(time.Second * 0) //初始立即执行

defer t.Stop()

req := &Registry{

RegistryGroup: "EXECUTOR",

RegistryKey: e.opts.RegistryKey,

RegistryValue: "http://" + e.address,

}

param, err := json.Marshal(req)

if err != nil {

log.Fatal("执行器注册信息解析失败:" + err.Error())

}

for {

<-t.C

t.Reset(time.Second * time.Duration(20)) //20秒心跳防止过期

func() {

result, err := e.post("/api/registry", string(param))

if err != nil {

e.log.Error("执行器注册失败1:" + err.Error())

return

}

defer result.Body.Close()

body, err := ioutil.ReadAll(result.Body)

if err != nil {

e.log.Error("执行器注册失败2:" + err.Error())

return

}

res := &res{}

_ = json.Unmarshal(body, &res)

if res.Code != 200 {

e.log.Error("执行器注册失败3:" + string(body))

return

}

e.log.Info("执行器注册成功:" + string(body))

}()

}

}

3.设置日志查看 handler,用于任务调度中心远程查看执行器日志

exec.LogHandler(func(req xxl.LogReq) xxl.LogRes {

return &xxl.LogRes{Code: 200, Msg: "", Content: xxl.LogResContent{

FromLineNum: req.FromLineNum,

ToLineNum: 2,

LogContent: "这个是自定义日志handler",

IsEnd: true,

}}

})

4.注册任务 handler

exec.RegTask("task.test", task.Test)

exec.RegTask("task.test2", task.Test2)

exec.RegTask("task.panic", task.Panic)

加锁,放到内存中

func (t taskList) Set(key string, val Task) {

t.mu.Lock()

t.data[key] = val

t.mu.Unlock()

}

5.创建路由规则

// 创建路由器

mux := http.NewServeMux()

// 设置路由规则

mux.HandleFunc("/run", e.runTask)

mux.HandleFunc("/kill", e.killTask)

mux.HandleFunc("/log", e.taskLog)

runTask

流程:

1.解析调用中心发送的请求参数

2.加入到 runlist

3.根据executorHandler的key值找到对应的任务

4.运行任务

5.回调调度中心

6.从 runlist 中删除

7.返回给调度中心 OK

killTask

流程:

1.解析调用中心发送的请求参数

2.判断是否在runlist里

3.中止任务

4.从 runlist 中删除任务

5.返回给调度中心 OK

taskLog

流程:

1.解析调用中心发送的请求参数

2.找到对应任务的 logHandler

3.返回给调度中心 log 信息

6.创建服务器,监听端口

server := &http.Server{

Addr: e.address,

WriteTimeout: time.Second * 3,

Handler: mux,

}

// 监听端口并提供服务

e.log.Info("Starting server at " + e.address)

go server.ListenAndServe()

7.如果服务器意外退出,则移除注册信息

t := time.NewTimer(time.Second * 0) //初始立即执行

defer t.Stop()

req := &Registry{

RegistryGroup: "EXECUTOR",

RegistryKey: e.opts.RegistryKey,

RegistryValue: "http://" + e.address,

}

param, err := json.Marshal(req)

if err != nil {

e.log.Error("执行器摘除失败:" + err.Error())

}

res, err := e.post("/api/registryRemove", string(param))

if err != nil {

e.log.Error("执行器摘除失败:" + err.Error())

}

body, err := ioutil.ReadAll(res.Body)

e.log.Info("执行器摘除成功:" + string(body))

_ = res.Body.Close()

调度中心/执行器 RESTful API XXL-JOB 目标是一种跨平台、跨语言的任务调度规范和协议。

针对Java应用,可以直接通过官方提供的调度中心与执行器,方便快速的接入和使用调度中心。

针对非Java应用,可借助 XXL-JOB 的标准 RESTful API 方便的实现多语言支持。

调度中心 RESTful API:

说明:调度中心提供给执行器使用的API;不局限于官方执行器使用,第三方可使用该API来实现执行器;

API列表:执行器注册、任务结果回调等;

执行器 RESTful API :

说明:执行器提供给调度中心使用的API;官方执行器默认已实现,第三方执行器需要实现并对接提供给调度中心;

API列表:任务触发、任务终止、任务日志查询……等;

此处 RESTful API 主要用于非Java语言定制个性化执行器使用,实现跨语言。除此之外,如果有需要通过API操作调度中心,可以个性化扩展 “调度中心 RESTful API” 并使用。

调度中心 RESTful API a、任务回调 说明:执行器执行完任务后,回调任务结果时使用


地址格式:{调度中心跟地址}/callback

Header:

XXL-JOB-ACCESS-TOKEN : {请求令牌}

请求数据格式如下,放置在 RequestBody 中,JSON格式:

[{

"logId":1, // 本次调度日志ID

"logDateTim":0, // 本次调度日志时间

"executeResult":{

"code": 200, // 200 表示任务执行正常,500表示失败

"msg": null

}

}]

响应数据格式:

{

"code": 200, // 200 表示正常、其他失败

"msg": null // 错误提示消息

}

b、执行器注册 说明:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度


地址格式:{调度中心跟地址}/registry

Header:

XXL-JOB-ACCESS-TOKEN : {请求令牌}

请求数据格式如下,放置在 RequestBody 中,JSON格式:

{

"registryGroup":"EXECUTOR", // 固定值

"registryKey":"xxl-job-executor-example", // 执行器AppName

"registryValue":"http://127.0.0.1:9999/" // 执行器地址,内置服务地址

}

响应数据格式:

{

"code": 200, // 200 表示正常、其他失败

"msg": null // 错误提示消息

}

c、执行器注册摘除 说明:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行


地址格式:{调度中心跟地址}/registryRemove

Header:

XXL-JOB-ACCESS-TOKEN : {请求令牌}

请求数据格式如下,放置在 RequestBody 中,JSON格式:

{

"registryGroup":"EXECUTOR", // 固定值

"registryKey":"xxl-job-executor-example", // 执行器AppName

"registryValue":"http://127.0.0.1:9999/" // 执行器地址,内置服务跟地址

}

响应数据格式:

{

"code": 200, // 200 表示正常、其他失败

"msg": null // 错误提示消息

}

执行器 RESTful API a、心跳检测 说明:调度中心检测执行器是否在线时使用


地址格式:{执行器内嵌服务跟地址}/beat

Header:

XXL-JOB-ACCESS-TOKEN : {请求令牌}

请求数据格式如下,放置在 RequestBody 中,JSON格式:

响应数据格式:

{

"code": 200, // 200 表示正常、其他失败

"msg": null // 错误提示消息

}

b、忙碌检测 说明:调度中心检测指定执行器上指定任务是否忙碌(运行中)时使用


地址格式:{执行器内嵌服务跟地址}/idleBeat

Header:

XXL-JOB-ACCESS-TOKEN : {请求令牌}

请求数据格式如下,放置在 RequestBody 中,JSON格式:

{

"jobId":1 // 任务ID

}

响应数据格式:

{

"code": 200, // 200 表示正常、其他失败

"msg": null // 错误提示消息

}

c、触发任务 说明:触发任务执行


地址格式:{执行器内嵌服务跟地址}/run

Header:

XXL-JOB-ACCESS-TOKEN : {请求令牌}

请求数据格式如下,放置在 RequestBody 中,JSON格式:

{

"jobId":1, // 任务ID

"executorHandler":"demoJobHandler", // 任务标识

"executorParams":"demoJobHandler", // 任务参数

"executorBlockStrategy":"COVER_EARLY", // 任务阻塞策略,可选值参考 com.xxl.job.core.enums.ExecutorBlockStrategyEnum

"executorTimeout":0, // 任务超时时间,单位秒,大于零时生效

"logId":1, // 本次调度日志ID

"logDateTime":1586629003729, // 本次调度日志时间

"glueType":"BEAN", // 任务模式,可选值参考 com.xxl.job.core.glue.GlueTypeEnum

"glueSource":"xxx", // GLUE脚本代码

"glueUpdatetime":1586629003727, // GLUE脚本更新时间,用于判定脚本是否变更以及是否需要刷新

"broadcastIndex":0, // 分片参数:当前分片

"broadcastTotal":0 // 分片参数:总分片

}

响应数据格式:

{

"code": 200, // 200 表示正常、其他失败

"msg": null // 错误提示消息

}

f、终止任务 说明:终止任务


地址格式:{执行器内嵌服务跟地址}/kill

Header:

XXL-JOB-ACCESS-TOKEN : {请求令牌}

请求数据格式如下,放置在 RequestBody 中,JSON格式:

{

"jobId":1 // 任务ID

}

响应数据格式:

{

"code": 200, // 200 表示正常、其他失败

"msg": null // 错误提示消息

}

d、查看执行日志 说明:终止任务,滚动方式加载


地址格式:{执行器内嵌服务跟地址}/log

Header:

XXL-JOB-ACCESS-TOKEN : {请求令牌}

请求数据格式如下,放置在 RequestBody 中,JSON格式:

{

"logDateTim":0, // 本次调度日志时间

"logId":0, // 本次调度日志ID

"fromLineNum":0 // 日志开始行号,滚动加载日志

}

响应数据格式:

{

"code":200, // 200 表示正常、其他失败

"msg": null // 错误提示消息

"content":{

"fromLineNum":0, // 本次请求,日志开始行数

"toLineNum":100, // 本次请求,日志结束行号

"logContent":"xxx", // 本次请求日志内容

"isEnd":true // 日志是否全部加载完

}

}

最后的彩蛋:

XXL-JOB-PLUS: XXL-JOB的加强版

https://github.com/mousycoder/xxl-job-plus

公众号: image

xuxueli commented 2 years ago

感谢关注,分支版本可以合并至:https://github.com/xuxueli/xxl-job/issues/154

shcw commented 2 years ago

这个可以支持2.0版本嘛 求助!! @mousycoder @xuxueli