collectiveidea / delayed_job

Database based asynchronous priority queue system -- Extracted from Shopify
http://groups.google.com/group/delayed_job
MIT License
4.81k stars 955 forks source link

How does max_attempts interface with ActiveJob retry_on method? #1184

Open jeremygottfried opened 1 year ago

jeremygottfried commented 1 year ago

https://apidock.com/rails/v6.1.3.1/ActiveJob/Exceptions/ClassMethods/retry_on

The ActiveJob retry_on hook allows you to customize how you respond to specific errors, but it's unclear how that integrates with max_attempts in this gem. Does this gem keep a separate queue and ignore ActiveJob behavior, or is max_attempts just setting a global retry_on to catch all errors?

shanecav84 commented 1 year ago

I think that AJ's retry_on supersedes DJ's retry logic such that AJ will handle further retries (the attempts argument of retry_on) and deletion. If AJ somehow fails to do any of that and raises an exception, the exception will bubble up to DJ and DJ's retry logic will kick in.

shanecav84 commented 1 year ago

I've been using AJ but relying on DJ's retry logic by using rescue_from instead of retry_on to handle job exceptions then rethrowing to have that error bubble up to DJ to handle retries.

cnadeau commented 8 months ago

In case it might help someone:

I had the same problem trying to configure max_attempts on an ActiveJob and ended up basing some work out of https://github.com/rails/rails/issues/39961 to get it working

# Idea taken from : https://github.com/rails/rails/issues/39961
# Original code from https://github.com/rails/rails/blob/main/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb branch v6.1.7

class CustomDelayedJobAdapter < ActiveJob::QueueAdapters::DelayedJobAdapter
  def enqueue(job) #:nodoc:
    delayed_job = Delayed::Job.enqueue(CustomJobWrapper.new(job), queue: job.queue_name, priority: job.priority)
    job.provider_job_id = delayed_job.id
    delayed_job
  end

  def enqueue_at(job, timestamp) #:nodoc:
    delayed_job = Delayed::Job.enqueue(CustomJobWrapper.new(job), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp))
    job.provider_job_id = delayed_job.id
    delayed_job
  end

  class CustomJobWrapper < ActiveJob::QueueAdapters::DelayedJobAdapter::JobWrapper

    # use the job instance's max_attempts property and serialize it using ActiveJob's which saves it within DelayedJobs.handler field as YAML
    attr_accessor(:max_attempts)

    def initialize(job)
      super(job.serialize)
      self.max_attempts = job.respond_to?(:max_attempts) ? job.max_attempts : nil
    end
  end
end

then use it in the Application's configuration

 config.active_job.queue_adapter = ::CustomDelayedJobAdapter.new()

and add the a max_attempts attribute on the desired ActiveJob

module Matchmaking
  class SingleRunJob < ActiveJob::Base
    attr_accessor(:max_attempts)

    def initialize(*args)
      super(*args)
      self.max_attempts = 1
    end

    def perform()
      # ...
    end
  end
end