mhenrixon / sidekiq-unique-jobs

Prevents duplicate Sidekiq jobs
MIT License
1.46k stars 277 forks source link

Feature Request: Unique scheduled jobs #247

Open amclaughlin-valetude opened 6 years ago

amclaughlin-valetude commented 6 years ago

The feature I'm implementing is causing duplicate jobs in Sidekiq::ScheduledSet, so I would really love to define a set of arguments for my jobs that should be unique across this set, rather than in a queue.

mhenrixon commented 6 years ago

I'd be happy to talk about this at some point. Not sure exactly what you are after, could you elaborate?

glennfu commented 6 years ago

I just found this Issue today after searching for a solution: I have jobs per tenant. I tell sidekiq-unique-jobs to make jobs unique by the first argument, which is always the account id. Today we had a situation where 1 tenant needed to be temporarily disabled while they worked through some issues. So essentially, I want to ignore the jobs until the tenant is re-enabled. I don't want to drop them, and I don't want them throwing errors, so in my code I'm saying something like retry_job(wait: 10.minute) if tenant_paused?.

As a result, my Sidekiq::ScheduledSet is now full of duplicate jobs whereas if they were added to the regular queue, this gem would make them unique already.

mhenrixon commented 6 years ago

Thanks for reminding me about the scheduled set @glennfu :) I almost forgot about that for v6!

mhenrixon commented 6 years ago

@glennfu @amclaughlin-valetude I had a look into this. The problem only occurs when there isn't a lock for the specific item arguments anymore. This would for instance occur when the lock is created with an expiration. Keep in mind that any keys that are set to expire will do so at the given expiration regardless. Unfortunately older versions of the gem used a default expiration of 30 minutes which could be why you are seeing this error.

The thing is that when you call MyCoolWorker.perform_in(1.seconds, arguments) the client middleware runs, locks the job and then if there is item['at'] meaning the job is scheduled it will be allowed to be pushed to the schedule queue. Now even though the job is in another queue it should be locked (meaning other jobs from the same worker class with the same arguments won't be allowed to neither perform_async nor perform_in).

What you are experiencing is most likely a problem with configuration, a buggy old version or ActiveJob (which will be unsupported in the next major release).

sharq1 commented 2 years ago

Is that not implemented yet, or do we have faulty configuration? Here's screenshot of scheduled jobs: image

Jobs in regular queues are locked properly.

class ReportRecalculationWorker
  include Sidekiq::Worker
  sidekiq_options queue: "reports", unique_across_queues: true, lock: :until_executing, on_conflict: :reject

  ...
end

It seems a bit random about when it rejects the job...

image

This is likely related to the fact that within the worker itself I added rescheduling mechanism... Basically the condition is, if the job was performed too recently (in less than last 15 seconds) then it's rescheduled:

ReportRecalculationWorker.perform_in(10.seconds)
mhenrixon commented 2 years ago

@sharq1 in your situation you already have a unique job in the one that is trying to delay itself.

You would have to use some sidekiq magic or the replay conflict strategy.

sharq1 commented 2 years ago

@mhenrixon thank you, however I think I wasn't clear enough. I don't want to reschedule / replace the jobs in reports queue / ScheduledSet.

If ReportRecalculationWorker is already enqueued / scheduled, I would expect the perform_async and perform_in to reject / return nil. But instead it's sometimes allowing me to schedule the worker, as you can see on the screenshots above.

mhenrixon commented 2 years ago

@sharq1 then what about the following:

This is likely related to the fact that within the worker itself I added rescheduling mechanism... Basically the condition is, if the job was performed too recently (in less than last 15 seconds) then it's rescheduled.

Can you show the code for how you do this? I am fairly certain this will cause problems for you.

sharq1 commented 2 years ago

Sure:

# frozen_string_literal: true

class ReportRecalculationWorker
  include Sidekiq::Worker
  sidekiq_options queue: "reports", unique_across_queues: true, lock: :until_executing, on_conflict: :reject

  def perform
    if too_many_jobs || last_execution_too_recently
      return ReportRecalculationWorker.perform_in(10.seconds)
    end

    save_execution_time
    recalcuate...
  end

  def too_many_jobs
    Sidekiq::Queue.new("priority_queue").size > 100
  end

  def last_execution_too_recently
    t = Redis.current.get("ReportRecalculationWorker_last_start")
    if t.present? && Time.parse(t).utc.between?(Time.now.utc - 15.seconds, Time.now.utc)
      return true
    end

    false
  end

  def save_execution_time
    Redis.current.set("ReportRecalculationWorker_last_start", Time.now.utc.to_s)
  end
end

Actually it's a bit more complex, as I also added custom runtime lock (not trusting until_and_while_executing lock 😞), so the full implementation is below.

See full implementation ```ruby # frozen_string_literal: true class ReportRecalculationWorker include Sidekiq::Worker sidekiq_options queue: "reports", unique_across_queues: true, lock: :until_executing, on_conflict: :reject def perform return unless acquire_lock if too_many_jobs || last_execution_too_recently return ReportRecalculationWorker.perform_in(10.seconds) end save_execution_time ...recalculate... rescue StandardError => e ...log error... ensure release_lock end def too_many_jobs Sidekiq::Queue.new("priority_queue").size > 100 end def last_execution_too_recently t = Redis.current.get("ReportRecalculationWorker_last_start") if t.present? && Time.parse(t).utc.between?(Time.now.utc - 15.seconds, Time.now.utc) return true end false end def save_execution_time Redis.current.set("ReportRecalculationWorker_last_start", Time.now.utc.to_s) end def acquire_lock # ex: expire in 2h in case lock was not unset because of error # nx: only set the key if it does not already exist - otherwise return `false` Redis.current.set("ReportRecalculationWorker_works", "true", ex: 7200, nx: true) end def release_lock Redis.current.del("ReportRecalculationWorker_works") end end ```