fieldryand / goflow

Simple but powerful DAG scheduler and dashboard
MIT License
393 stars 31 forks source link

Pass uuid in context on the root node #54

Closed zengzhengrong closed 9 months ago

zengzhengrong commented 9 months ago

I see your commit "Use uuid as job id" ,Add the uuid in jobrun,I perfer pass uuid as default context value in job flow

fieldryand commented 9 months ago

Thanks for the suggestion. You mean the uuid could be passed as an argument (maybe in a Config struct) to goflow.Execute(job string)? I will consider this, could be an improvement on the current draft.

zengzhengrong commented 9 months ago

@fieldryand https://github.com/fieldryand/goflow/pull/55 I seem you use shared map as context in PipeOperator , I add a uuid as defalut key value

func (j *Job) run(store gokv.Store, e *Execution) error {

    log.Printf("starting job: name=%v, ID=%v", j.Name, e.ID)

    res := make(pipe)
    // defalut key value
    res["job_id"] = e.ID
    writes := make(chan writeOp)

    for {
        for _, task := range j.Tasks {

            // Start the independent tasks
            v := j.loadTaskState(task.Name)
            if v == none && !j.Dag.isDownstream(task.Name) {
                j.storeTaskState(task.Name, running)
                if task.PipeOperator != nil {
                    go task.runWithPipe(res, writes)
                } else {
                    go task.run(res, writes)
                }
            }

            // Start the tasks that need to be re-tried
            if v == upForRetry {
                task.RetryDelay.wait(task.Name, task.Retries-task.attempts)
                task.attempts = task.attempts - 1
                j.storeTaskState(task.Name, running)
                if task.PipeOperator != nil {
                    go task.runWithPipe(res, writes)
                } else {
                    go task.run(res, writes)
                }
            }

And defining two Operator parameters feels unnecessary,Just PipeOperator instead of Operator is enough

zengzhengrong commented 9 months ago

I have some py script job , and I need konw job uuid in my pyscript for each time jobflow run,Like this:

func ExamplePyJob() *goflow.Job {
    j := &goflow.Job{Name: "Python", Schedule: "*/1 * * * *", Active: false}
    j.Add(&goflow.Task{
        Name:     "ping clickhouse",
        Operator: goflow.Command{Cmd: "python", Args: []string{"script/example/clickhouse_ping.py",j.ID}},
    })

    j.Add(&goflow.Task{
        Name:     "panda dataframe from clickhouse",
        Operator: goflow.Command{Cmd: "python", Args: []string{"script/example/panda_dataframe.py",j.ID}},
    })
    j.SetDownstream(j.Task("ping clickhouse"), j.Task("panda dataframe from clickhouse"))
    return j
}

so uuid can add to goflow.Job struct? or other idea

fieldryand commented 9 months ago

OK, the example helps a lot. I hadn't considered this case before, will need to think about it.

zengzhengrong commented 9 months ago

Just wrap custom operator and impl Command function