arturictus / sidekiq_alive

Liveness probe for Sidekiq in Kubernetes deployments
MIT License
188 stars 57 forks source link

Only purge jobs pertaining to the current queue when 'SidekiqAlive::Worker' is terminated #80

Closed ryanmrodriguez closed 1 year ago

ryanmrodriguez commented 1 year ago

In the latest release and using Sidekiq > 5, when a SidekiqAlive::Worker is shutdown/terminated via sidekiq_alive, the unregister_current_instance method ends up deleting jobs for ALL SidekiqAlive::Workers, not just its own.

This behavior was changed here https://github.com/arturictus/sidekiq_alive/commit/e686c88e7a1384e8cc122c408a2d746f209da3c4

I don't think this is the desired behavior.

Using the latest sidekiq_alive, I am seeing a sort of cascading effect of container restarts on Kubernetes. When one worker shuts down, whether it be to some job holding up the worker, or rolling out a new deployment, it starts a domino/looping effect of SidekiqAlive::Workers restarting and clearing all other SidekiqAlive jobs, which leads to more restarts, etc.

The problem boils down to the purge_pending_jobs method, which was converted to use a server side scan.

Old:

  def self.purge_pending_jobs
    # TODO:
    # Sidekiq 6 allows better way to find scheduled jobs:
    # https://github.com/mperham/sidekiq/wiki/API#scan
    scheduled_set = Sidekiq::ScheduledSet.new
    jobs = scheduled_set.select { |job| job.klass == 'SidekiqAlive::Worker' && job.queue == current_queue }
    logger.info("[SidekiqAlive] Purging #{jobs.count} pending for #{hostname}")
    jobs.each(&:delete)
    logger.info("[SidekiqAlive] Removing queue #{current_queue}")
    Sidekiq::Queue.new(current_queue).clear
  end

Latest release:

    def purge_pending_jobs
      schedule_set = Sidekiq::ScheduledSet.new
      jobs = if Helpers.sidekiq_5
        schedule_set.select { |job| job.klass == "SidekiqAlive::Worker" && job.queue == current_queue }
      else
        schedule_set.scan('"class":"SidekiqAlive::Worker"')
      end
      logger.info("[SidekiqAlive] Purging #{jobs.count} pending for #{hostname}")
      jobs.each(&:delete)

      logger.info("[SidekiqAlive] Removing queue #{current_queue}")
      Sidekiq::Queue.new(current_queue).clear
    end

The problem lies in this conditional:

jobs = if Helpers.sidekiq_5
        schedule_set.select { |job| job.klass == "SidekiqAlive::Worker" && job.queue == current_queue }
      else
        schedule_set.scan('"class":"SidekiqAlive::Worker"') # This will grab all instances, not just the current
      end
andrcuns commented 1 year ago

Thanks! Indeed, this might have gotten dropped when resolving the conflicts or maybe just by mistake. Makes total sense 👍