brandonhilkert / sucker_punch

Sucker Punch is a Ruby asynchronous processing library using concurrent-ruby, heavily influenced by Sidekiq and girl_friday.
MIT License
2.65k stars 114 forks source link

I want to use a queue for each job class even when using Sucker Punch with Active Job #224

Open ledsun opened 5 years ago

ledsun commented 5 years ago

In Readme it is written "ach job acts as its own queue". But when using Sucker Punch with Active Job, All jobs are executed in one queue.

Cause

I think that this is because the perform_async method of the JobWrapper class is called in the SuckerPunchAdapter. https://github.com/rails/rails/blob/b2eb1d1c55a59fee1e6c4cba7030d8ceb524267c/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb#L24

JobWrapper.perform_async job.serialize

SuckerPunch retrieves job information from job class properties. https://github.com/brandonhilkert/sucker_punch/blob/master/lib/sucker_punch/job.rb#L37

SuckerPunch::Queue.find_or_create(self.to_s, num_workers, num_jobs_max)

Hypothesis

If this will be fixed like the following, it seems possible to run the job in a different queue.

job.class.perform_async job.arguments

However, this fix makes AcitveJob log incomplete. Job start log like the following will be lost.

[ActiveJob] [SearchJob] [2163535d-688d-494a-9433-5a8c9057f392] Performing SearchJob (Job ID: 2163535d-688d-494a-9433-5a8c9057f392) from SuckerPunch(SearchJob) with arguments: "b8f04e7e-69c9-4dc4-9ea6-955f00193512"

If you execute the job using ActiveJob :: Base.execute as follows, the start log is output.

Base.execute job.serialize

If you can separately specify the class calling perform_async and the block to be executed, you should be able to accomplish both to run the job in a different queue and to log ActiveJob.

job.class.perform_async { Base.execute job.serialize }

Solution

I think that the following modifications are necessary. SuckerPunch::Job.perform_async executes the block when called with block. When there is no block, it instantiates its own class and perform it.

def perform_async(*args, &block)
  return unless SuckerPunch::RUNNING.true?
  queue = SuckerPunch::Queue.find_or_create(self.to_s, num_workers, num_jobs_max)
  if block
    queue.post(args) { |job_args| __run_perform(&block)  }
  else
    queue.post(args) { |job_args| __run_perform { self.new.perform(*job_args) }  }
  end
end

And the __run_perform method executes the received block.

def __run_perform(&block)
  SuckerPunch::Counter::Busy.new(self.to_s).increment
  result = block.call
  SuckerPunch::Counter::Processed.new(self.to_s).increment
  result
rescue => ex
  SuckerPunch::Counter::Failed.new(self.to_s).increment
  SuckerPunch.exception_handler.call(ex, self, args)
ensure
  SuckerPunch::Counter::Busy.new(self.to_s).decrement
end

Conclusions

I think this is a good idea. Once you agree, we will make a pull request. Please let me know if you need anything such as tests.