runabol / tork

A distributed workflow engine
https://tork.run
MIT License
577 stars 36 forks source link

Pre/Post task evaluate issue when used with job middleware #296

Closed v36372 closed 9 months ago

v36372 commented 9 months ago

I'm currently having the following setup:

func (c *Coordinator) ModifyInputMiddleware(next job.HandlerFunc) job.HandlerFunc {
    return func(ctx context.Context, et job.EventType, j *tork.Job) error {
        inputs := make(map[string]string)
        inputs["Hello"] = "World"
        newJob := j.Clone()
        newJob.Inputs = inputs
        newJob.Context.Inputs = inputs
        return next(ctx, et, newJob)
    }
}
...
coordinator.SetJobMiddleware(coordinator.ModifyInputMiddleware)

I have a job that makes use of each task and also pre task:

name: Hello world
inputs:
  Hello: Abc123
tasks:
  - name: For each
    each:
      list: "{{ ['1','2','3'] }}"
      task:
        name: Each
        run: echo $HELLO > $TORK_OUTPUT
        env:
          HELLO: "{{ inputs.Hello }}"     
        image: ubuntu:mantic
        pre:
          - name: Pre
            run: |
              echo $HELLO
            image: ubuntu:mantic
            env:
              HELLO: "{{ inputs.Hello }}"
        retry:
          limit: 1

This would produce Hello: Abc123 not Hello: World. Any idea what I did wrong and how to fix it?

runabol commented 9 months ago

The evaluation happens on the Coordinator side. Since the task is passed by reference you can simply change the task itself using task middleware. Also you'd want to check the event type that is passed in as well to make sure you modify it at the appropriate event.

v36372 commented 9 months ago

@runabol I tried modifying the pre task env, but it's still picking up the old env value.

func (c *Coordinator) ModifyInputMiddleware(next job.HandlerFunc) job.HandlerFunc {
    return func(ctx context.Context, et job.EventType, j *tork.Job) error {
        inputs := make(map[string]string)
        inputs["Hello"] = "World"
        newJob := j.Clone()
        newJob.Inputs = inputs
        newJob.Context.Inputs = inputs

        for i := 0; i < len(newJob.Tasks); i++ {
            if newJob.Tasks[i].Each != nil {
                if len(newJob.Tasks[i].Each.Task.Pre) > 0 {
                    for j := 0; j < len(newJob.Tasks[i].Each.Task.Pre); j++ {
                        for k, _ := range newJob.Tasks[i].Each.Task.Pre[j].Env {
                            if k == "HELLO" {
                                delete(newJob.Tasks[i].Each.Task.Pre[j].Env, k)
                                newJob.Tasks[i].Each.Task.Pre[j].Env[k] = "World"
                            }
                        }
                    }
                }
            }
        }

        return next(ctx, et, newJob)
    }
}

Can you help test if it's working on your end? I would write a failing test func TestEvalPreWithMiddleware for it to better demonstrate the issue, but this particular one requires middleware mocking so maybe it's quicker if I ask you.

runabol commented 9 months ago

I would use task middleware instead of job middleware if you want to modify tasks. Example:

engine.RegisterTaskMiddleware(func(next task.HandlerFunc) task.HandlerFunc {
  return func(ctx context.Context, et task.EventType, t *tork.Task) error {
    if et != task.StateChange {
      return next(ctx, et, t)
    }

    fmt.Println(t.State)

    if t.Env == nil {
       t.Env = map[string]string{}
    }

    t.Env["SOME_VAR"] = "some value"

    return next(ctx, et, t)
  }
})