ShiningRush / fastflow

A lightweight, high-performance distributed workflow framework
MIT License
348 stars 80 forks source link

请问如何将Task分发到指定节点呢? #59

Closed AHHH32 closed 3 months ago

ShiningRush commented 4 months ago

目前DagInstance都是由master自动分发的,暂时不支持分发到指定节点运行,不过你可以手工在数据修改字段来完成

AHHH32 commented 4 months ago

目前DagInstance都是由master自动分发的,暂时不支持分发到指定节点运行,不过你可以手工在数据修改字段来完成

请问数据修改字段是指的什么

ShiningRush commented 4 months ago

DagInstance实体中有个字段名叫Worker,可以直接修改这个字段为你的 Worker即可

AHHH32 commented 3 months ago

DagInstance实体中有个字段名叫Worker,可以直接修改这个字段为你的 Worker即可

当生成dagInstance后修改worker,有时候并不能指定worker执行任务

    dagInstance, err := dag.Run(entity.TriggerManually, nil)
    if err != nil {
        log.Fatal(err)
    }
    dagInstance.Worker = "worker-2"
    // fmt.Printf("%v", dagInstance)
    if err := mod.GetStore().CreateDagIns(dagInstance); err != nil {
        log.Fatal(err)
    }
AHHH32 commented 3 months ago

根据源码,我找到如何

DagInstance实体中有个字段名叫Worker,可以直接修改这个字段为你的 Worker即可

当生成dagInstance后修改worker,有时候并不能指定worker执行任务

  dagInstance, err := dag.Run(entity.TriggerManually, nil)
  if err != nil {
      log.Fatal(err)
  }
  dagInstance.Worker = "worker-2"
  // fmt.Printf("%v", dagInstance)
  if err := mod.GetStore().CreateDagIns(dagInstance); err != nil {
      log.Fatal(err)
  }
    for i := range dagIns {
        dagIns[i].Status = entity.DagInstanceStatusScheduled
        // dagIns[i].Worker = "worker-2" // nodes[i%len(nodes)]
      }

我将这个分配节点的代码注释掉,然后根据运行dag相关源码修改指定worker,这样能实现能将任务分发到指定worker的目的。我想问一下,这样做会有其他影响吗?

philhuan commented 3 months ago

改worker的时候顺便改一下status
DagInstanceStatusScheduled 这样试试?

AHHH32 commented 3 months ago

改worker的时候顺便改一下status DagInstanceStatusScheduled 这样试试?

那是表示dag状态的,和这个没关系吧

ShiningRush commented 3 months ago

根据源码,我找到如何

DagInstance实体中有个字段名叫Worker,可以直接修改这个字段为你的 Worker即可

当生成dagInstance后修改worker,有时候并不能指定worker执行任务

    dagInstance, err := dag.Run(entity.TriggerManually, nil)
    if err != nil {
        log.Fatal(err)
    }
    dagInstance.Worker = "worker-2"
    // fmt.Printf("%v", dagInstance)
    if err := mod.GetStore().CreateDagIns(dagInstance); err != nil {
        log.Fatal(err)
    }
  for i := range dagIns {
      dagIns[i].Status = entity.DagInstanceStatusScheduled
      // dagIns[i].Worker = "worker-2" // nodes[i%len(nodes)]
      }

我将这个分配节点的代码注释掉,然后根据运行dag相关源码修改指定worker,这样能实现能将任务分发到指定worker的目的。我想问一下,这样做会有其他影响吗?

会导致调度逻辑失效,如果你要定向调度的话,是可以手工指定下DagInstance状态,这样就不会再自动调度了