ShiningRush / fastflow

A lightweight, high-performance distributed workflow framework
MIT License
348 stars 80 forks source link

再次咨询,action not found问题 #51

Open chenxushao opened 9 months ago

chenxushao commented 9 months ago

情况是这样的,fastflow部署了一共2台机器。运行的时候,经常报action not found 报这个错误的代码在这里: execturor.go func (e DefExecutor) runAction(taskIns entity.TaskInstance) error { act := ActionMap[taskIns.ActionName] if act == nil { return fmt.Errorf("action not found: %s", taskIns.ActionName) }

再翻看源代码,ActionMap 是定义的一个全局变量。 var ( ActionMap = map[string]run.Action{}

defExc Executor defStore Store defKeeper Keeper defParser Parser defCommander Commander )

Action 的注册,是将 Action 放到这个 Map 中,相当于数据是存储在‹内存›中,并没有持久化。 在程序中,我是这么注册Action 的。

// 注册 action fastflow.RegisterAction(actions)

我看到官方文档上是这样描述: 当你开始运行一个 Dag 后,则会为本次执行生成一个执行记录,它被称为 DagInstance,当它生成以后,会由 Leader 实例将其分发到一个健康的 Worker,再由其解析、执行。

假设fastflow.RegisterAction(actions)在2台机器中的A执行,那么是不有可能由另一台机器去执行,是不是因为这个造成的报action not found呢?

chenxushao commented 9 months ago

另外,部署的代码是同一套。

chenxushao commented 9 months ago

Dag和注册Action我这边均是由前端发起请求,动态注册的,假如是A机器收到请求,那么这个action肯定是在A机器了。 但是真正的执行Action根据文档描述,可能是由另一台worker机器B执行,这是不是出现了action not found的根因呢

chenxushao commented 9 months ago

诚盼回复。

philhuan commented 9 months ago

这里 action 的注册用法是在服务启动的时候注册,不是通过接口注册的。

philhuan commented 9 months ago

要注册得有一个实现了接口的结构体吧,通过接口注册怎么关联这个结构体,新增action 得改代码吧?

chenxushao commented 9 months ago

动态不支持吗,比如新增一个编排任务,这时候会新创建dag和新注册action,应该是比较合理的需求。

chenxushao commented 9 months ago

要注册得有一个实现了接口的结构体吧,通过接口注册怎么关联这个结构体,新增action 得改代码吧?

您好,是有两个固化的结构体,只是每次注册name不一样,因为处理的逻辑不一样。

ShiningRush commented 9 months ago

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败
chenxushao commented 9 months ago

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

你好。 fasttflow的初始化,只需要也只能够执行一次对吗?

目前fastflow的初始化,是在go main方法中进行的。 之后,通过controller(使用的gin框架)来触发提交的任务。

在main语文教学中初始化fastflow的代码如下:

func InitFlow() { fastFlowConf := configs.Get().FastFlow connStr := fastFlowConf.ConnStr dataBase := fastFlowConf.Database prefix := fastFlowConf.Prefix

workKeyNum := BuildWorkNum()
logrus.Infof("InitFlow. workKeyNum:%d", workKeyNum)

// 初始化选举组件
Keeper = mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
    Key: "worker-" + util.StringUtils.AsString(workKeyNum),
    // if your mongo does not set user/pwd, you should remove it
    ConnStr:  connStr,
    Database: dataBase,
    Prefix:   prefix,
})

logrus.Infof("init keeper begin. keeper:%v", Keeper)
if err := Keeper.Init(); err != nil {
    errMsg := fmt.Sprintf("init keeper failed: %s", err.Error())
    logrus.Error(errMsg)
    notice.RobotNotice(errMsg)
    return
}

logrus.Infof("init keeper finish. workKey:%s", Keeper.WorkerKey())

isLeader := Keeper.IsLeader()
if isLeader {
    notice.RobotNotice(fmt.Sprintf("keeper leader ip:%s,workKey:%s", runtimes.AppInfo.Ip, Keeper.WorkerKey()))
}

// 初始化存储组件
st := mongoStore.NewStore(&mongoStore.StoreOption{
    // if your mongo does not set user/pwd, you should remove it
    ConnStr:  connStr,
    Database: dataBase,
    Prefix:   prefix,
})

logrus.Infof("init store begin. st:%v", st)
if err := st.Init(); err != nil {
    errMsg := fmt.Sprintf("init store failed: %s", err.Error())
    logrus.Error(errMsg)
    notice.RobotNotice(errMsg)
    return
}
logrus.Info("init store finish")

// 启动fastflow
logrus.Info("start flow begin")
if err := fastflow.Start(&fastflow.InitialOption{
    Keeper:             Keeper,
    Store:              st,
    DagScheduleTimeout: 300 * time.Second,
    ExecutorTimeout:    150 * time.Second,
}); err != nil {
    errMsg := fmt.Sprintf("init fastflow failed: %s", err.Error())
    logrus.Error(errMsg)
    notice.RobotNotice(errMsg)
    return
}
logrus.Info("start flow finish")

}

之后,通过http请求来构建Dag并触发执行。代码形如下:

dagVars, dagVarMap := BuildDagVars(globalVarItems)
tasks, actions := BuildTagElement(rawDagBO.TaskNodes)

// 注册 action
fastflow.RegisterAction(actions)
logrus.Infof("RegisterAction finish.dagExplainId:%d", dagExplainId)

//构建dag模型
dag := initialize.CreateDag(util.StringUtils.AsString(dagId), rawDagBO.Name, rawDagBO.Descr, tasks, dagVars)

ch := make(chan string)
//创建dag并运行
go initialize.CreateDagAndInstance(dag, dagVarMap, ch)

这种使用方式是可行的吗?

chenxushao commented 9 months ago

通过阅读源码, fastflow.RegisterAction(actions) 只会在接收到请求的那台机器的内存中(是一个map)才会有数据。我想的是,之所以报action not found,是否是由于具体的执行被调度到另一台机器上了。 另外,还有一个问题,一个Dag上的N外节点,只会被同一台机器执行吧?因为一个dag节点还存在数据共享和传递。

chenxushao commented 9 months ago

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

第1点有一个疑问: fastflow.RegisterAction 必须在 fastflow.start之前执行是吗?

ShiningRush commented 9 months ago

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

第1点有一个疑问: fastflow.RegisterAction 必须在 fastflow.start之前执行是吗?

最好是,最晚必须在工作流实例被执行前,否则就会在action集合中找不到

ShiningRush commented 9 months ago

通过阅读源码, fastflow.RegisterAction(actions) 只会在接收到请求的那台机器的内存中(是一个map)才会有数据。我想的是,之所以报action not found,是否是由于具体的执行被调度到另一台机器上了。 另外,还有一个问题,一个Dag上的N外节点,只会被同一台机器执行吧?因为一个dag节点还存在数据共享和传递。

你的理解是对的,同一个DagInstance只会被同一个worker执行

chenxushao commented 9 months ago

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

你好。 fasttflow的初始化,只需要也只能够执行一次对吗?

目前fastflow的初始化,是在go main方法中进行的。 之后,通过controller(使用的gin框架)来触发提交的任务。

在main语文教学中初始化fastflow的代码如下:

func InitFlow() { fastFlowConf := configs.Get().FastFlow connStr := fastFlowConf.ConnStr dataBase := fastFlowConf.Database prefix := fastFlowConf.Prefix

workKeyNum := BuildWorkNum()
logrus.Infof("InitFlow. workKeyNum:%d", workKeyNum)

// 初始化选举组件
Keeper = mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
  Key: "worker-" + util.StringUtils.AsString(workKeyNum),
  // if your mongo does not set user/pwd, you should remove it
  ConnStr:  connStr,
  Database: dataBase,
  Prefix:   prefix,
})

logrus.Infof("init keeper begin. keeper:%v", Keeper)
if err := Keeper.Init(); err != nil {
  errMsg := fmt.Sprintf("init keeper failed: %s", err.Error())
  logrus.Error(errMsg)
  notice.RobotNotice(errMsg)
  return
}

logrus.Infof("init keeper finish. workKey:%s", Keeper.WorkerKey())

isLeader := Keeper.IsLeader()
if isLeader {
  notice.RobotNotice(fmt.Sprintf("keeper leader ip:%s,workKey:%s", runtimes.AppInfo.Ip, Keeper.WorkerKey()))
}

// 初始化存储组件
st := mongoStore.NewStore(&mongoStore.StoreOption{
  // if your mongo does not set user/pwd, you should remove it
  ConnStr:  connStr,
  Database: dataBase,
  Prefix:   prefix,
})

logrus.Infof("init store begin. st:%v", st)
if err := st.Init(); err != nil {
  errMsg := fmt.Sprintf("init store failed: %s", err.Error())
  logrus.Error(errMsg)
  notice.RobotNotice(errMsg)
  return
}
logrus.Info("init store finish")

// 启动fastflow
logrus.Info("start flow begin")
if err := fastflow.Start(&fastflow.InitialOption{
  Keeper:             Keeper,
  Store:              st,
  DagScheduleTimeout: 300 * time.Second,
  ExecutorTimeout:    150 * time.Second,
}); err != nil {
  errMsg := fmt.Sprintf("init fastflow failed: %s", err.Error())
  logrus.Error(errMsg)
  notice.RobotNotice(errMsg)
  return
}
logrus.Info("start flow finish")

}

之后,通过http请求来构建Dag并触发执行。代码形如下:

dagVars, dagVarMap := BuildDagVars(globalVarItems)
tasks, actions := BuildTagElement(rawDagBO.TaskNodes)

// 注册 action
fastflow.RegisterAction(actions)
logrus.Infof("RegisterAction finish.dagExplainId:%d", dagExplainId)

//构建dag模型
dag := initialize.CreateDag(util.StringUtils.AsString(dagId), rawDagBO.Name, rawDagBO.Descr, tasks, dagVars)

ch := make(chan string)
//创建dag并运行
go initialize.CreateDagAndInstance(dag, dagVarMap, ch)

这种使用方式是可行的吗?

这种方式可行吗?

chenxushao commented 9 months ago

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

你好。 fasttflow的初始化,只需要也只能够执行一次对吗? 目前fastflow的初始化,是在go main方法中进行的。 之后,通过controller(使用的gin框架)来触发提交的任务。 在main语文教学中初始化fastflow的代码如下: func InitFlow() { fastFlowConf := configs.Get().FastFlow connStr := fastFlowConf.ConnStr dataBase := fastFlowConf.Database prefix := fastFlowConf.Prefix

workKeyNum := BuildWorkNum()
logrus.Infof("InitFlow. workKeyNum:%d", workKeyNum)

// 初始化选举组件
Keeper = mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
    Key: "worker-" + util.StringUtils.AsString(workKeyNum),
    // if your mongo does not set user/pwd, you should remove it
    ConnStr:  connStr,
    Database: dataBase,
    Prefix:   prefix,
})

logrus.Infof("init keeper begin. keeper:%v", Keeper)
if err := Keeper.Init(); err != nil {
    errMsg := fmt.Sprintf("init keeper failed: %s", err.Error())
    logrus.Error(errMsg)
    notice.RobotNotice(errMsg)
    return
}

logrus.Infof("init keeper finish. workKey:%s", Keeper.WorkerKey())

isLeader := Keeper.IsLeader()
if isLeader {
    notice.RobotNotice(fmt.Sprintf("keeper leader ip:%s,workKey:%s", runtimes.AppInfo.Ip, Keeper.WorkerKey()))
}

// 初始化存储组件
st := mongoStore.NewStore(&mongoStore.StoreOption{
    // if your mongo does not set user/pwd, you should remove it
    ConnStr:  connStr,
    Database: dataBase,
    Prefix:   prefix,
})

logrus.Infof("init store begin. st:%v", st)
if err := st.Init(); err != nil {
    errMsg := fmt.Sprintf("init store failed: %s", err.Error())
    logrus.Error(errMsg)
    notice.RobotNotice(errMsg)
    return
}
logrus.Info("init store finish")

// 启动fastflow
logrus.Info("start flow begin")
if err := fastflow.Start(&fastflow.InitialOption{
    Keeper:             Keeper,
    Store:              st,
    DagScheduleTimeout: 300 * time.Second,
    ExecutorTimeout:    150 * time.Second,
}); err != nil {
    errMsg := fmt.Sprintf("init fastflow failed: %s", err.Error())
    logrus.Error(errMsg)
    notice.RobotNotice(errMsg)
    return
}
logrus.Info("start flow finish")

} 之后,通过http请求来构建Dag并触发执行。代码形如下:

dagVars, dagVarMap := BuildDagVars(globalVarItems)
tasks, actions := BuildTagElement(rawDagBO.TaskNodes)

// 注册 action
fastflow.RegisterAction(actions)
logrus.Infof("RegisterAction finish.dagExplainId:%d", dagExplainId)

//构建dag模型
dag := initialize.CreateDag(util.StringUtils.AsString(dagId), rawDagBO.Name, rawDagBO.Descr, tasks, dagVars)

ch := make(chan string)
//创建dag并运行
go initialize.CreateDagAndInstance(dag, dagVarMap, ch)

这种使用方式是可行的吗?

这种方式可行吗?

目前这样,就是遇到 了action not found异常。 我部署了2台机器

ShiningRush commented 9 months ago

我看你Resgiter是在接口里面,你有2台机器,可能在A机器接受的请求,注册Action,但是被分发到了B机器,导致这个错误。 为什么不在初始化的时候注册Action呢

chenxushao commented 9 months ago

因为我们目前的Dag是由业务方构建然后提交的,如果只能在初始化构建,那这个编排任务是否只能在系统启动时才能初始化呢?

chenxushao commented 9 months ago

我们目前是2类固化的Action,一类是查询类,一类是执行动作的变更,我目前是固化了2个Action实现。然后由前端动态构建Dag,构建好后,regionAction(因为action的name每次不一样),之后创建dag并持久化到mongodb,然后执行CreateDagAndInstance。

ShiningRush commented 8 months ago

Dag的执行是可以被任意节点提交的, 但是Action 包含逻辑代码, 所以这部分内容只能在编译时提交

gaohaotian010 commented 7 months ago

Dag的执行是可以被任意节点提交的, 但是Action 包含逻辑代码, 所以这部分内容只能在编译时提交

也就是说,如果像 @chenxushao 这种用法,其实需要预定义所有的“能力”到Action里,然后前端构建DAG的时候只能通过填写“能力”的参数构建DAG的形式,可以这样理解吗?

ShiningRush commented 7 months ago

Dag的执行是可以被任意节点提交的, 但是Action 包含逻辑代码, 所以这部分内容只能在编译时提交

也就是说,如果像 @chenxushao 这种用法,其实需要预定义所有的“能力”到Action里,然后前端构建DAG的时候只能通过填写“能力”的参数构建DAG的形式,可以这样理解吗?

是的,你理解的没问题。 如果需要支持动态插入Action,需要将Action用Plugin来实现,目前没有强需求