alphasights / sneakers_handlers

Retries with exponential backoff for sneakers
MIT License
33 stars 11 forks source link

Multiple errors 'undefined method `queue' for #<AMQ::Protocol::Queue::BindOk:0x00005579e61b8c70>' #38

Closed bronislav closed 4 years ago

bronislav commented 4 years ago

Checking bunny issues tracker shows that this kind of errors are quite often happens when channel is being shared between threads.

Error message:

[SneakersHandlers::ExponentialBackoffHandler] msg=unexpected_handler_error, error='undefined method `queue' for #<AMQ::Protocol::Queue::BindOk:0x00005579e61b8c70>'

EDIT: Once I configured my worker to use only one thread - this error disappear.

bronislav commented 4 years ago

I've reproduced this issue with latest versions of sneakers and sneakers_handlers. It seems that sneakers creates channel for every worker and reusing same channel in threads. This is highly not recommended per bunny documentation.

workers.rb

require 'bundler/inline'

gemfile do
  source 'https://rubygems.org'

  gem 'json', require: false
  gem 'bunny', require: false
  gem 'sneakers', require: false
  gem 'sneakers_handlers', require: false,
      path: '../sneakers_handlers'
  gem 'amazing_print'
end

require 'bunny'
require 'sneakers'
require 'sneakers/runner'
require 'sneakers_handlers'
require 'json'

class ExponentialBackoffWorker
  include Sneakers::Worker

  from_queue 'sneakers_handlers.test',
             ack: true,
             exchange: 'sneakers_handlers',
             exchange_type: :topic,
             routing_key: 'sneakers_handlers.backoff_test',
             handler: SneakersHandlers::ExponentialBackoffHandler,
             max_retries: 10,
             arguments: {
               "x-dead-letter-exchange" => "sneakers_handlers.dlx",
               "x-dead-letter-routing-key" => "sneakers_handlers.test"
             }

  def work(msg)
    data = JSON.parse(msg)

    logger.info(data)

    reject!
  end
end

conn = Bunny.new(
  "amqp://guest:guest@localhost:5672/",
  vhost: "/"
)

Sneakers.configure(
  connection: conn,
  daemonize: false,
  workers: 1
)

Sneakers.logger.level = Logger::INFO

runner = Sneakers::Runner.new([ExponentialBackoffWorker])
runner.run

publisher.rb

require 'bundler/inline'

gemfile do
  source 'https://rubygems.org'

  gem 'bunny', require: false
end

require 'bunny'
require 'json'

conn = Bunny.new(host: 'localhost', port: '5672', user: 'guest', pass: 'guest')
conn.start

ch = conn.create_channel

5.times.each do |i|
  ch.default_exchange.publish({ text: "test #{i}"}.to_json, routing_key: 'sneakers_handlers.test')
end

ch.close
conn.close

Main issue is that messages and being multiplying and initial 5 messages after several retries could became 20-30 messages.

Not sure yet how to fix this in the proper way.

lucasintel commented 2 years ago

I just noticed this error on my end. Indeed the main issue is that the messages are being multiplied, I noticed a massive number of messages in some sneakers queues that uses the exponential backoff handler.

I think the problem is that the SneakersHandlers::ExponentialBackoffHandler keeps cleaning the Bunny internal queues cache and creating new queues (since the retry queues are ephemeral). As you said, I think it's not thread safe as multiple threads might try to create queues at the same time.

Wrapping the retry queues creation in a Mutex solves the issue for me. I'm still puzzling over finding the proper way to solve it though.

Thank you for the minimal reproduction code.