ShiningRush / fastflow

A lightweight, high-performance distributed workflow framework
MIT License
348 stars 80 forks source link

创建task有非常大的延迟 #45

Closed HtcOrange closed 10 months ago

HtcOrange commented 10 months ago

`if err := mod.GetStore().CreateDagIns(dagInstance); err != nil { return nil, err }

dagIns, err := mod.GetStore().GetDagInstance(dagInstance.ID)
if err != nil {
    return nil, err
}

dagIns.ShareData.Set("dagInstanceID", dagInstance.ID)
if err := mod.GetStore().UpdateDagIns(dagIns); err != nil {
    return nil, err
}
dagIns.ShareData.Set("jobID", strconv.Itoa(jobID))
if err := mod.GetStore().UpdateDagIns(dagIns); err != nil {
    return nil, err
}`

在执行上述create的操作后,我期待在后面通过一个for循环同步tasks的状态,理论上我最终通过listTaskInstance获取到的task数量应该和dag中声明的task数量相同,但这里存在非常大的延迟,for循环等待时间可能要数十秒或一致卡死,显示我目前db中不存在和当前dagInstance相关联的task,当我查看db时也是如此,dagInstance存在,而tasks却不存在,我想知道这是什么问题呢?有没有相关的排查思路? for { time.Sleep(500 * time.Millisecond) tasks, err = mod.GetStore().ListTaskInstance(&mod.ListTaskInstanceInput{ DagInsID: dagInstance.ID, }) if err != nil { fmt.Printf("total_task_length: %d, taskcount: %d\n, err: %s", taskCount, len(tasks), err.Error()) continue } else if len(tasks) != taskCount { fmt.Printf("total_task_length: %d, taskcount: %d\n", taskCount, len(tasks)) continue } break }

而且在运行过程中出现过非常诡异的现象,dagInstance理论上会从init状态变为scheduled,但是流程中在没有通过外部调用的情况下,dagInstance又从scheduled变回了init状态,请问这是为什么呢?通过mongo连续两次查询相同的dagInstance状态可以观察此现象: image

ShiningRush commented 10 months ago

hi, 这个问题突然关闭了,是找到原因了吗

HtcOrange commented 10 months ago

hi, 这个问题突然关闭了,是找到原因了吗 是的,由于项目配置不当启动了n个孤儿进程导致的问题,非框架引入