ixti / sidekiq-throttled

Concurrency and rate-limit throttling for Sidekiq
MIT License
698 stars 75 forks source link

Add logic to temporarily skip queue if excessive number of throttled jobs #160

Closed Connorhd closed 9 months ago

Connorhd commented 10 months ago

This would need tests/documentation before merging, but wanted to open this for discussion.

We recently updated to sidekiq 7 and saw the change in behavior for sidekiq-throttled that the throttled_queue_cooldown is no longer used, there seem to be pros and cons of the behavior before and after and this is an attempt to find a better middle ground. The idea is to add back the throttled_queue_cooldown concept, but only trigger it after a certain number of throttled jobs are found on a queue.

This reduces a problem with the previous solution where a large number of small jobs that had throttling could end up causing a lot of paused time. As well as helping when a queue has a mix of throttled and unthrottled jobs as the throttled ones will be cycled to the back of the queue faster. It also avoids the problem with the current system where throttled jobs in a high queue can completely starve lower queues, and also reduces the number of fetches that happen in situations where all queued jobs are throttled.

Making this configurable means the old style of throttling can still be achieved by making throttled_queue_after_attempts 1, and the new style could be effectively achieved by setting throttled_queue_after_attempts to a very large number. In general it seems that different workloads are going to want to tune these settings differently and there probably isn't a one size fits all solution.

ixti commented 10 months ago

Oh! I love the approach. One thing I'd like to address though, can we somehow make it more generalized so that it can be easily hooked into the other fetch implementations? I was thinking something like QueueControl class or QueueFilter that would be possible to pass to the fetcher.

class QueueFilter
  def on_throttle(unit_of_work)
    @filters.each { _1.on_throttle(unit_of_work) }
  end

  def call(queues)
    @filters.reduce(queues.dup, :call)
  end
end

That way in the Basic fetch code will look like:

def fetch
  ... # original code

  if throttled? ...
    @queue_filter.on_throttle(work)
    ...

Alternatively, I think we can add more "generic" observers or pub/sub. Either way I'm leaning towards merging this and thinking on imporvements later :D

ixti commented 9 months ago

Implemented and released as v1.0.0; Defaults are what they were before the removal of this functionality though. To tune it to your liking:

Sidekiq::Throttled.configure do |config|
  config.cooldown_period    = 1.0
  config.cooldown_threshold = 100
end

Defaults are:

Sidekiq::Throttled.configure do |config|
  config.cooldown_period    = 2.0
  config.cooldown_threshold = 1
end

And to completely turn-off the cooldown:

Sidekiq::Throttled.configure do |config|
  config.cooldown_period = nil
end
Connorhd commented 9 months ago

Amazing, thank you!