veeqo / activejob-uniqueness

Unique jobs for ActiveJob. Ensure the uniqueness of jobs in the queue.
https://devs.veeqo.com/job-uniqueness-for-activejob/
MIT License
274 stars 27 forks source link

Issue unlocking Jobs manually #34

Open svenne87 opened 3 years ago

svenne87 commented 3 years ago

First of great work on the gem 👍

We are experiencing some issues when trying to unlock jobs manually. When running normal operations we have not noticed any issues with either locking or unlocking, but we experienced this during a restart of Sidekiq. Probably Sidekiq restarted when a lock was active and this was then never unlocked. We noticed the job not being processed and tried unlocking without any success. Had to manually remove the key from Redis, in order to release the lock. I did notice what might be the issue for this behaviour, explanation:

Setup Sidekiq 6.2.1 Redis 4.2.5 Rails 6.1.3.1 Ruby 3.0.1 Activejob Uniqueness 0.2.2 (gem 'activejob-uniqueness', require: 'active_job/uniqueness/sidekiq_patch')

    sidekiq (6.2.1)
      connection_pool (>= 2.2.2)
      rack (~> 2.0)
      redis (>= 4.2.0)
    activejob-uniqueness (0.2.2)
      activejob (>= 4.2, < 7)
      redlock (>= 1.2, < 2)
    redis (4.2.5)
    redlock (1.2.2)
      redis (>= 3.0.0, < 5.0)   

Custom Initializer

ActiveJob::Uniqueness.configure do |config|
  config.redlock_servers = ["redis://#{REDIS_HOST}:6379/0"]
end

Reproducing Issue The class for our job is named WorkflowExecutionPerformerJob and it's pretty straight forward. It locks using the second argument while_executing and on conflicts moves the job back to another queue where we also throttle jobs using the Sidekiq API. The lock key might lock as this organization-x1x2x3.

WorkflowExecutionPerformerJob < ActiveJob::Base

unique :while_executing, on_conflict: ->(job) { job.schedule_job_later }

 def lock_key
    arguments.second
  end

The schedule_job_later function will enqueue another job on our throttle queue, so not really related.

I tried to unlocking the jobs manually using:

WorkflowExecutionPerformerJob.unlock!('organization-x1x2x3') => true
WorkflowExecutionPerformerJob.unlock!("other argument", "organization-x1x2x3") => true
WorkflowExecutionPerformerJob.unlock! => true
ActiveJob::Uniqueness.unlock! => true

Aslo tried to remove the job causing the lock from Sidekiq schedule (using the Sidekiq web GUI). They all returned true but looking in Redis nothing was actually released.

redis = Redis.new(host: REDIS_HOST
redis.exists(WorkflowExecutionPerformerJob.new("other argument", "organization-x1x2x3").lock_key) => true

After looking at the code for the gem I tried the following:

config = ActiveJob::Uniqueness::Configuration.new
lock_manager = ActiveJob::Uniqueness::LockManager.new(config.redlock_servers, config.redlock_options)

After running this I did notice that the config.redlock_servers set in the initializer was not set correctly here so I ran:

config = ActiveJob::Uniqueness::Configuration.new
config.redlock_servers = ["redis://#{REDIS_HOST}:6379/0"]
lock_manager = ActiveJob::Uniqueness::LockManager.new(config.redlock_servers, config.redlock_options)

I then tried to release the lock using:

lock_manager.delete_locks(ActiveJob::Uniqueness::LockKey.new(job_class_name: 'WorkflowExecutionPerformerJob', arguments: ["other argument", "organization-x1x2x3"]).wildcard_key) => true

However the following still returned true:

redis.exists(WorkflowExecutionPerformerJob.new("other argument", "organization-x1x2x3").lock_key)

To finally release the lock I simply removed the key in Redis:

redis = Redis.new(host: REDIS_HOST
redis.del(""other argument", "organization-x1x2x3")

After that the job did process as expected.

Does using ActiveJob::Uniqueness::Configuration.new not respect the initializer here? And what might be the issue causing the manually method to remove the job not working?

My initial guess is that it might be something with the initializer since trying to use the LockManager did not give me the correct config, however as I stated before normal operations obviously sets and unlocks the lock as expected.

Aryk commented 2 years ago

Did you ever find a solution to this?

It seems to me that the locks are not getting released if the Sidekiq box is restarted..

I'm using "until_and_while_executing", not sure if that could be why...

svenne87 commented 2 years ago

@Aryk Sorry for the late reply :) I did manage to develop a workaround by coding a simple service to unlock the present locks. I run this service every time Sidekiq is restarted. We are using Kubernetes so this simple setup works for us. Hope this helps.

Service. We also set host for redis in initializer.

# frozen_string_literal: true

module Sidekiq
  # Unique jobs lock service, used to manage lock in Redis
  # Since we use a custom lock key this can be used to unlock all jobs,
  # using wilcard argument
  #
  # ActiveJob Uniqness and Redlock are required for this service to run
  #
  # Example:
  # Remove the locks for all organizations:
  # Sidekiq::UniqueJobsLockService.unlock!('organization-*')
  #
  # Remove the lock for a single organization
  # Sidekiq::UniqueJobsLockService.unlock!('organization-123-123')
  #
  class UniqueJobsLockService < ::Redlock::Client
    def initialize(*args)
      super

      @servers = ActiveJob::Uniqueness.config.redlock_servers.map do |server|
        RedisInstance.new(url: server, timeout: redis_timeout)
      end
    end

    def self.unlock!(wildcard)
      new.unlock!(wildcard)
    end

    def unlock!(wildcard)
      delete_locks(wildcard)
    end

    private

    def delete_locks(wildcard)
      @servers.each do |server|
        server.instance_variable_get(:'@redis').with do |conn|
          conn.scan_each(match: wildcard).each { |key| conn.del key }
        end
      end

      true
    end

    def redis_timeout
      ActiveJob::Uniqueness.config.redlock_options[:redis_timeout] || 0.1
    end
  end
end

Rake task:

# frozen_string_literal: true

namespace :unique_jobs do
  desc 'Clear all locks for given key (or wildcard key for multiple)'
  task :clear_locks, %i[lock_key] => :environment do |_task, args|
    Sidekiq::UniqueJobsLockService.unlock!(args[:lock_key])
  end
end

Container lifecycle (before main command for container in Kubernetes)

        lifecycle:
          postStart:
            exec:
              command:
                - bundle
                - exec
                - rake
                - "unique_jobs:clear_locks[organization-*]"