amitree / delayed_job_recurring

Extends delayed_job to support recurring jobs
MIT License
84 stars 33 forks source link

Ensure singular enqueued jobs in multi-process environment #10

Closed abrom closed 8 years ago

abrom commented 9 years ago

An example of such as case is (for ease of development/testing) having a recurring job scheduled in an initializer. It is possible to have a situation where DJ spools up a previously scheduled job, then (possibly due to a longer load time) another process runs its initializer also scheduling up a new job. Then when the running job finishes, it schedules itself resulting in two of the same job in DJ

afn commented 9 years ago

Cool - thanks for the PR!

A couple of comments. You're absolutely right that this should be done in a transaction. And I noticed that you're calling jobs.destroy_all rather than jobs.each{|j| j.destroy}, as we currently do in the unschedule method. We're actually both wrong. :) It should be jobs.delete_all, so that the DMBS can know that the newly created row in one transaction conflicts with the one being deleted in the other. Calling destroy_all is the same as calling destroy on each, which does something like DELETE FROM delayed_jobs WHERE ID = 123 rather than DELETE FROM delayed_jobs WHERE handler like '%foo%'.

Also, at least on Postgres, the default transaction isolation level (READ COMMITTED) won't prevent the race condition from occurring. You actually need to set the isolation level to SERIALIZABLE to prevent two concurrent processes from creating the same delayed job.

Finally, your proposed change alters the behavior a bit: the schedule! instance method previously didn't destroy existing jobs. This was already being done in the schedule! class method. How about changing the class method rather than the instance method?

Addressing these two points, and changing unschedule to use destroy_all:

      def unschedule
        jobs.delete_all
      end

      # Main interface to start this schedule (adds it to the jobs table).
      # Pass in a time to run the first job (nil runs the first job at run_interval from now).
      def schedule(options = {})
        Delayed::Job.transaction(isolation: :serializable) do
          schedule!(options) unless scheduled?
        end
      end

      def schedule!(options = {})
        Delayed::Job.transaction(isolation: :serializable) do
          return unless Delayed::Worker.delay_jobs
          unschedule
          new.schedule!(options)
        end
      end

Unfortunately, there's still one slight problem: in the event that the race condition is encountered, one of the transactions will fail. I don't believe ActiveRecord::Base.transaction will automatically retry in this event; I'm fairly certain it raises an error. So we'd have to handle this specific error and retry.

Thoughts?

Best, Tony

abrom commented 9 years ago

My view of it is that you'd really only care that one of the transactions goes through successfully.. if the second one fails the net result will still be one job enqueued so any retry logic shouldn't be necessary. DJ already takes care of the read/pop transaction side of things so we'd really only need to worry about the write/push. As such, I believe you'd want to keep the default READ COMMITTED isolation level so as to prevent any sort of transaction read failures on the DJ workers. If that is the case, the transaction block in the schedule function wouldn't be required.

So the reason I used destroy_all in the instance schedule! function was to limit the size (contents) of the transaction block. In practical terms destroy_all is essentially the same as each &:destroy :) doh I had used that instead of delete_all to prevent any future issues with possible validations. Given validations are unlikely, delete_all seems like the obvious choice!

How about:

     def unschedule
        jobs.delete_all
      end

      # Main interface to start this schedule (adds it to the jobs table).
      # Pass in a time to run the first job (nil runs the first job at run_interval from now).
      def schedule(options = {})
        schedule!(options) unless scheduled?
      end

      def schedule!(options = {})
        return unless Delayed::Worker.delay_jobs
        # Still commits the delete and create in the same transaction so will prevent any race conditions.
        # We shouldn't care which thread ends up scheduling the 'new' job. In practical terms they should
        # be the same
        Delayed::Job.transaction do
          unschedule
          new.schedule!(options)
        end
      end
abrom commented 9 years ago

Although without having the call to unschedule in the instance schedule! with the DJ enqueue you'll still end up with a situation where this could happen (isolation: serializable or otherwise).

Scenario:

Recurring job A is enqueued DJ worker pops A and starts working on it App spools up and calls schedule (or schedule!) for a new A. No A jobs exist so it would correctly create a new one. DJ workers finishes work on A and enqueues new A Result is two A jobs.

As such, how about something like:

      ... instance: def schedule! ...

      Delayed::Job.transaction do
        self.class.unschedule

        if Gem.loaded_specs['delayed_job'].version.to_s.first.to_i < 3
          Delayed::Job.enqueue self, enqueue_opts[:priority], enqueue_opts[:run_at]
        else
          Delayed::Job.enqueue self, enqueue_opts
        end
      end

.... class defs ...

      def unschedule
        jobs.delete_all
      end
abrom commented 9 years ago

Neither case would handle a change in configuration terribly reliably though. Realistically, the only solution (where the configuration is loaded with the application initialisation AND the DJ workers are loaded at the same time as the app) would be to ensure that all workers were stopped, make the configuration change via a rake task or similar then start the jobs again. Or have some sort of delay for the start up of the DJ workers to ensure that any other app initialisation/configuration has completed first.

As you've already suggested in https://github.com/amitree/delayed_job_recurring/pull/6 setting up the jobs in the initialiser isn't really ideal for all of the above reasons, but in reality I think a lot of peeps will want to do this (particularly in dev env) for simplicity of build setup.

Hmm..

jonathan-nye commented 8 years ago

@abrom @afn This is plaguing our setup too. Starting a recurring job in an initializer is a simple way to kick one off, and it would be great if this PR could get merged in. Any updates on this?

abrom commented 8 years ago

I would suggest using some ENV flags for scheduling jobs (but only for one of the DJ workers!)

ie:

if ENV['SCHEDULE_TIMERS']
  MyTask.schedule
end

OR.. if you're using some sort of init daemon for managing your services (web app, DJ workers etc) you would be better off putting a simple script in a pre-start block to ensure that 1. it only runs once, and 2. that there is no chance of any race conditions with multiple DJ workers popping and re-initialising the same jobs.

For upstart it might be something like:

pre-start script
  /var/lib/code/project/bin/initialize_timers.rb
end script

initalize_timers.rb

#!/usr/bin/env ruby
include 'lib/my_task.rb'
MyTask.schedule
afn commented 8 years ago

Sorry for the long delay in accepting this PR. Merged!

jonathan-nye commented 7 years ago

@afn, thanks for getting this done!! @afn or @abrom, can we get a release for this?

afn commented 7 years ago

Done - pushed 0.3.6 to rubygems.