breamware / sidekiq-batch

Sidekiq Batch Jobs Implementation
MIT License
357 stars 95 forks source link

0.1.9 incompatible with redis-client which is used by sidekiq 7 #66

Closed jpaas closed 1 year ago

jpaas commented 1 year ago

Sidekiq 7 has switched to redis-client. redis-client does some validation on arguments passed to the redis commands.

In 0.1.9 this line https://github.com/breamware/sidekiq-batch/blob/master/lib/sidekiq/batch.rb#L99 was changed to place the @ready_to_queue jids in an array. This results in passing a nested array as an argument to the SADD command. This fails the redis-client validation check.

Their suggestion is to flatten the array.

jtokoph commented 1 year ago

I've added this initializer that monkey patches two lines (MONKEYPATCH):

raise 'Sidekiq::Batch version mismatch. Check MONKEYPATCH' if Sidekiq::Batch::VERSION != '0.1.9'

module Sidekiq
  class Batch
    def jobs
      raise NoBlockGivenError unless block_given?

      bid_data = Thread.current[:bid_data]
      Thread.current[:bid_data] = []

      begin
        if !@existing && !@initialized
          parent_bid = Thread.current[:batch].bid if Thread.current[:batch]

          Sidekiq.redis do |r|
            r.multi do |pipeline|
              pipeline.hset(@bidkey, 'created_at', @created_at)
              pipeline.hset(@bidkey, 'parent_bid', parent_bid.to_s) if parent_bid
              pipeline.expire(@bidkey, BID_EXPIRE_TTL)
            end
          end

          @initialized = true
        end

        @ready_to_queue = []

        begin
          parent = Thread.current[:batch]
          Thread.current[:batch] = self
          yield
        ensure
          Thread.current[:batch] = parent
        end

        return [] if @ready_to_queue.empty?

        Sidekiq.redis do |r|
          r.multi do |pipeline|
            if parent_bid
              pipeline.hincrby("BID-#{parent_bid}", 'children', 1)
              pipeline.hincrby("BID-#{parent_bid}", 'total', @ready_to_queue.size)
              pipeline.expire("BID-#{parent_bid}", BID_EXPIRE_TTL)
            end

            pipeline.hincrby(@bidkey, 'pending', @ready_to_queue.size)
            pipeline.hincrby(@bidkey, 'total', @ready_to_queue.size)
            pipeline.expire(@bidkey, BID_EXPIRE_TTL)

            pipeline.sadd("#{@bidkey}-jids", [@ready_to_queue].flatten) # MONKEYPATCH: Added flatten
            pipeline.expire("#{@bidkey}-jids", BID_EXPIRE_TTL)
          end
        end

        @ready_to_queue
      ensure
        Thread.current[:bid_data] = bid_data
      end
    end

    class << self
      def enqueue_callbacks(event, bid)
        event_name = event.to_s
        batch_key = "BID-#{bid}"
        callback_key = "#{batch_key}-callbacks-#{event_name}"
        already_processed, _, callbacks, queue, parent_bid, callback_batch = Sidekiq.redis do |r|
          r.multi do |pipeline|
            pipeline.hget(batch_key, event_name)
            pipeline.hset(batch_key, event_name, 'true') # MONKEYPATCH: string true
            pipeline.smembers(callback_key)
            pipeline.hget(batch_key, 'callback_queue')
            pipeline.hget(batch_key, 'parent_bid')
            pipeline.hget(batch_key, 'callback_batch')
          end
        end

        return if already_processed == 'true'

        queue ||= 'default'
        parent_bid = nil if parent_bid.blank? # Basically parent_bid.blank?
        callback_args = callbacks.reduce([]) do |memo, jcb|
          cb = Sidekiq.load_json(jcb)
          memo << [cb['callback'], event_name, cb['opts'], bid, parent_bid]
        end

        opts = { 'bid' => bid, 'event' => event_name }

        # Run callback batch finalize synchronously
        if callback_batch
          # Extract opts from cb_args or use current
          # Pass in stored event as callback finalize is processed on complete event
          cb_opts = callback_args.first&.at(2) || opts

          Sidekiq.logger.debug { "Run callback batch bid: #{bid} event: #{event_name} args: #{callback_args.inspect}" }
          # Finalize now
          finalizer = Sidekiq::Batch::Callback::Finalize.new
          status = Status.new bid
          finalizer.dispatch(status, cb_opts)

          return
        end

        Sidekiq.logger.debug { "Enqueue callback bid: #{bid} event: #{event_name} args: #{callback_args.inspect}" }

        if callback_args.empty?
          # Finalize now
          finalizer = Sidekiq::Batch::Callback::Finalize.new
          status = Status.new bid
          finalizer.dispatch(status, opts)
        else
          # Otherwise finalize in sub batch complete callback
          cb_batch = new
          cb_batch.callback_batch = true
          Sidekiq.logger.debug { "Adding callback batch: #{cb_batch.bid} for batch: #{bid}" }
          cb_batch.on(:complete, 'Sidekiq::Batch::Callback::Finalize#dispatch', opts)
          cb_batch.jobs do
            push_callbacks callback_args, queue
          end
        end
      end
    end
  end
end
MaksJS commented 1 year ago

I also had to edit this line: cb_batch.callback_batch = 'true'

jpaas commented 1 year ago

I also had to edit this line: cb_batch.callback_batch = 'true'

Ya this is even an issue on 0.1.8. Batch can never complete.

github-actions[bot] commented 1 year ago

Stale issue message

ashwinv11 commented 1 year ago

Hello! First off wanted to say thank you for creating this gem, it's fantastic 🙏🏾 Any update on a potential fix for this so we can upgrade to Sidekiq 7?

sebfie commented 1 year ago

I also need it!

luisAzcuaga commented 1 year ago

I see the issue is marked as completed, but was it? 👀

gammons commented 8 months ago

I jammed the monkeypatch updates into #88 which works for me locally.

lbrito1 commented 8 months ago

Ran into this today, would be nice to have the fix merged