chaps-io / gush

Fast and distributed workflow runner using ActiveJob and Redis
MIT License
1.03k stars 103 forks source link

Multiple jobs with the same class name #21

Closed ferusinfo closed 7 years ago

ferusinfo commented 8 years ago

Hey, I need to run the same jobs with different parameters, but I have occured a strange behaviour - when I set the run function with the same name, only the last one of the jobs is being fired, the other ones does not.

This is also affecting the after: option, as it is checking if all jobs with given name has been fired. Simple workflow example:

class FetchPricesWorkflow < Gush::Workflow
  def configure(pages)

    start_time = DateTime.now

    for i in 0..pages
      run FetchPricesJob, params: { page_num: i }
    end

    run SummaryJob, after: FetchPricesJob, params: {start_time: start_time}
  end

Can you clarify on this matter? It is a super important feature.

ferusinfo commented 8 years ago

I guess this line in client.rb file is the issue here:

job_names.map {|name| workflow.find_job(name) }
pokonski commented 8 years ago

Yeah, this wasn't a feature I needed in Gush when I developed it, but I can have a look. It should probably generate UUIDs for jobs, instead of relying on their (not unique as it turns out) names.

ferusinfo commented 8 years ago

I've changed some lines in the code and I almost got it working based on UUIDs, but changing the tests specs is not something that I like very much..

ferusinfo commented 8 years ago

I've been able to come to this: https://github.com/ferusinfo/gush/commits/unique_node_names - the only issue that I am having now is that redis isn't persisting all the jobs created in a queue (getting a workflow through new instance of Gush::Client.new it is only returning the last job).

ferusinfo commented 8 years ago

So now, when you merged the changes, is there any way to add me to the contributors section?

pokonski commented 8 years ago

Sure :)

joshjordan commented 8 years ago

@ferusinfo how would I do this if the number of jobs is variable? What if it looked like this, from your example?

class FetchPricesWorkflow < Gush::Workflow
  def configure
    start_time = DateTime.now

    #pages is unknown when the flow begins

    run FetchPageCountJob

    #somehow get pages...
    for i in 0..pages
      run FetchPricesJob, params: { page_num: i }
    end

    run SummaryJob, after: FetchPricesJob, params: {start_time: start_time}
  end
pokonski commented 8 years ago

@joshjordan use map to collect all the ids that each run returns and pass them to after: option

kladkogex commented 7 years ago

thanks to suggestion @pokonski I made it like this

class MyWorkflow < Gush::Workflow

  def configure(post_id, comment_ids)
   # multiple jobs in parallel
    job_ids = []
    comment_ids.each do |id|
      job_ids << (run RemoveCommentJob, params: { comment_id: id })
    end

   # after all jobs RemoveCommentJob finished
    run RemovePostJob, after: job_ids, params: {post_id: post_id}

  end
end

run workflow:

post_id=1
comment_ids = post.comments.pluck(:id)

flow = MyWorkflow.create(post_id, comment_ids)
flow.start!
pokonski commented 7 years ago

Closing since it was solved :)