chaps-io / gush

Fast and distributed workflow runner using ActiveJob and Redis
MIT License
1.03k stars 103 forks source link

Long-running jobs and human job completion #60

Closed xtagon closed 5 years ago

xtagon commented 5 years ago

Hi,

I'm looking to use Gush to manage workflows, and it seems like a good fit, but some of my workflows involve waiting for a human person to signify that a job has been completed. For example, if one or more jobs in a workflow requires the data to be reviewed before continuing, or needs to get more information from someone before continuing.

This not really limited to "human" jobs either, the same issue applies to long-running external tasks. What would solve this is a way to set a job as "waiting for an external event" or trigger before it is enqueued to run in ActiveJob. So in the case of a human review job, the job before it could send an e-mail to the user, then when it gets to the review job it enqueues it but in a paused or waiting state. Then the user gets the e-mail, does what they need to in my app to mark it as reviewed, and then my app would call something that would update the job/workflow state with some new payload data and trigger it to continue. Or in the case of waiting for an external event, my app could be waiting for AWS S3 to publish events in a bucket, and when that happens it could call a Gush API to tell the workflow that that job for that file can continue, and pass the file name as part of the payload.

I think this would be a very intuitive and useful addition to Gush. What do you think?

pokonski commented 5 years ago

Hey!

I had a discussion about such feature but personally never needed it. Do you have a suggestion how that would work in DSL and implementation?

You could already do it by having a job that checks some external criteria in a loop indefinitely and when they are met, it finishes and unlocks the workflow. But I don't see a nice and universal way to build that into Gush, so I'm open to suggestions :)

xtagon commented 5 years ago

I can certainly help discuss what an API for it might look like, although I don't have a lot of knowledge of the internals of Gush yet so we may have to work together to design how the implementation might work.

Let me put together some example code for a fictional workflow. Thanks for the response! 👍

xtagon commented 5 years ago

Here's an example of what I'm picturing, let me know what you think:

# app/workflows/sample_workflow.rb
class SampleWorkflow < Gush::Workflow
  def configure(url_to_fetch_from, employee_id)
    run FetchJob1, params: { url: url_to_fetch_from }
    run FetchJob2, params: { some_flag: true, url: 'http://url.com' }

    run PersistJob1, after: FetchJob1
    run PersistJob2, after: FetchJob2

    run SendReviewRequest,
      after: [PersistJob1, PersistJob2],
      params: { employee_id: employee_id }

    # After the persist jobs finish, this step waits until an event is
    # published to this workflow that satisfies the block condition.
    wait_for_review = event after: [PersistJob1, PersistJob2] do |payload|
      payload[:employee_id] == employee_id
    end

    run PersistReviewResults, after: [wait_for_review]
    run NotifyReviewResults, after: PersistReviewResults
  end
end

flow = SampleWorkflow.create("http://url.com/book.pdf", "E00071")
flow.start!

# 1. Workflow runs the fetch and then persist jobs

# 2. After both branches finish, workflow sends an email to the employee that send them to the application to review the job

# 3. Workflow then get to the `wait_for_review` event and waits for an event trigger (maybe this puts a waiting state key in Redis but an ActiveJob job does not get enqueued for this step until the trigger condition is met)

# 4. Employee goes into the application, reviews the job, and the application then calls the Gush API to trigger an event:

# Question: How can we find the same workflow instance that is tied to the e-mail that went out to the employee? Could we use a unqiue workflow instance ID, or match on the workflow instance's arguments?
flow = SampleWorkflow.find_by_args(employee_id: employee_id)
event_payload = {employee_id: employee_id, review_score: 3}
flow.publish(event_payload)

# 5. The event published to the workflow instance satisfies the wait_for_review step's condition

# 6. PersistReviewResults then runs, has access to the review_score in the payload

# 7. NotifyReviewResults finally runs, and e-mails someone else to notify them that the jobs have been completed and reviewed (also has access to the review_score in the payload as long as the PersistReviewResults job gives output)

Looking back, this example is a little strange since there is only ONE human task in the workflow (so employee_id doesn't need to be part of the event trigger condition). But I would certainly want to be able to have more than one human task in a workflow meant for different users to complete.

xtagon commented 5 years ago

@pokonski Is there any interest in helping me design this feature? I'm considering using Gush in a project but it would be much more useful in my scenario if I could have these kinds of long-running and human-completed tasks as part of the workflows.

Of course I am not asking that you do all the work, but if this could be useful to others, I would like to take a stab at contributing. But I would need a little hand-holding to do that, as I don't understand all the internals of Gush yet.

This might be a good Hacktoberfest endeavor :)

xtagon commented 5 years ago

You could already do it by having a job that checks some external criteria in a loop indefinitely and when they are met, it finishes and unlocks the workflow. But I don't see a nice and universal way to build that into Gush, so I'm open to suggestions

The first problem I see with that approach is what happens if the long-running process is killed (server dies) mid process?

pokonski commented 5 years ago

Hey @xtagon somehow I completely missed your responses!

The first problem I see with that approach is what happens if the long-running process is killed (server dies) mid process?

If the worker process dies, then it's a general problem. It should be restarted and still continue to process/wait for input again.

While the description you provided sounds general enough, the code example does not look intuitive to me. I would do something like

run PersistReviewResults, 
  after: [PersistJob1, PersistJob2], 
  when: proc { |payload| some_condition_here? }

or

run PersistReviewResults, after: [PersistJob1, PersistJob2] do |payload| 
  some_condition_here? 
end

But regardless of the DSL. Having something be triggered from an event at any time would require an architecture change to Gush.

Right now it works like this:

  1. You start workflow
  2. First job is triggered
  3. Performs work
  4. Checks if there are any jobs dependent on it and enqueues them.
  5. Dependent job performs (go to 3 until no jobs left)

With triggers the problem is that there would need to be some external process constantly running and checking conditions every N seconds/minutes and then triggering jobs.

xtagon commented 5 years ago

I imagine one way to bring the architectural down to the basics would be to have a way to "pause" and "un-pause" jobs. If there was an API for me to pause a job as soon as it's triggered, and then manually un-pause it in my app's event handler, then Gush wouldn't even have to do anything special - it would just un-pause the right job and then it would continue the workflow. What do you think of that idea?

pokonski commented 5 years ago

That is a good idea. Reminds me of how Gitlab CI handles workflows, you can mark a job as manual and it will only start when user clicks "Play" in the UI or via API. So yeah, that sounds good :+1:

xtagon commented 5 years ago

In order for that to work, we'll need to be able to identify the exact job ID to "play". That's what I was thinking with the example code where the lambda method / block checks on an event condition to identify which job to unpause. Thoughts on that?

pokonski commented 5 years ago

You could use the Gush's API to find jobs requiring manual start, so you can do that via some UI?

pokonski commented 5 years ago

Closing due to inactivity :timer_clock: