socketry / async

An awesome asynchronous event-driven reactor for Ruby.
MIT License
2.09k stars 86 forks source link

FiberError: fiber called across threads when using async inside a sidekiq worker #175

Closed zhulik closed 2 years ago

zhulik commented 2 years ago

Hello,

for some time I use async inside sidekiq jobs, each job starts it's own reactor using this simple server middleware:

class Sidekiq::Middleware::Async::Server
  def call(_worker, _job, _queue)
    Async do
      yield
    ensure
      Grumlin.close # grumlin is my Gremlin client built on top of async-websocket
    end
  end
end

Sidekiq.configure_server do |config|
  config.server_middleware do |chain|
    chain.add Sidekiq::Middleware::Async::Server
  end
end

Code running in the job does not spawn any threads, but sidekiq itself is threaded, but I get FiberError: fiber called across threads from time to time.

Stacktrace:

File /app/vendor/bundle/ruby/3.1.0/gems/async-2.0.3/lib/async/condition.rb line 36 in transfer
File /app/vendor/bundle/ruby/3.1.0/gems/async-2.0.3/lib/async/condition.rb line 36 in transfer
File /app/vendor/bundle/ruby/3.1.0/gems/async-2.0.3/lib/async/notification.rb line 47 in block in transfer
File /app/vendor/bundle/ruby/3.1.0/gems/async-2.0.3/lib/async/notification.rb line 46 in each
File /app/vendor/bundle/ruby/3.1.0/gems/async-2.0.3/lib/async/notification.rb line 46 in transfer
File /app/vendor/bundle/ruby/3.1.0/gems/async-2.0.3/lib/async/scheduler.rb line 220 in select
File /app/vendor/bundle/ruby/3.1.0/gems/async-2.0.3/lib/async/scheduler.rb line 220 in run_once
File /app/vendor/bundle/ruby/3.1.0/gems/async-2.0.3/lib/async/scheduler.rb line 239 in run
File /app/vendor/bundle/ruby/3.1.0/gems/async-2.0.3/lib/kernel/async.rb line 48 in Async 
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/middleware/chain.rb line 179 in block in invoke
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb line 43 in block in execute
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/lock/while_executing.rb line 44 in block (2 levels) in execute
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/locksmith.rb line 204 in block (2 levels) in lock!
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/locksmith.rb line 275 in handle_primed
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/locksmith.rb line 257 in primed_async
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/locksmith.rb line 198 in call
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/locksmith.rb line 198 in block in lock!
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/locksmith.rb line 230 in enqueue
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/locksmith.rb line 195 in lock!
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/locksmith.rb line 106 in block in execute
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq.rb line 164 in block in redis
File /app/vendor/bundle/ruby/3.1.0/gems/connection_pool-2.2.5/lib/connection_pool.rb line 63 in block (2 levels) in with
File /app/vendor/bundle/ruby/3.1.0/gems/connection_pool-2.2.5/lib/connection_pool.rb line 62 in handle_interrupt
File /app/vendor/bundle/ruby/3.1.0/gems/connection_pool-2.2.5/lib/connection_pool.rb line 62 in block in with
File /app/vendor/bundle/ruby/3.1.0/gems/connection_pool-2.2.5/lib/connection_pool.rb line 59 in handle_interrupt
File /app/vendor/bundle/ruby/3.1.0/gems/connection_pool-2.2.5/lib/connection_pool.rb line 59 in with
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq.rb line 161 in redis
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/connection.rb line 19 in redis
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/locksmith.rb line 105 in execute
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/lock/while_executing.rb line 43 in block in execute
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/logging.rb line 142 in block in with_logging_context
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/logger.rb line 11 in with
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/logging.rb line 157 in call
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/logging.rb line 157 in with_configured_loggers_context
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/logging.rb line 141 in with_logging_context
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/lock/while_executing.rb line 42 in execute
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb line 43 in execute
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/middleware/server.rb line 27 in call
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/middleware.rb line 41 in block in call
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/logging.rb line 142 in block in with_logging_context
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/logger.rb line 11 in with
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/logging.rb line 157 in call
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/logging.rb line 157 in with_configured_loggers_context
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/logging.rb line 141 in with_logging_context
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-unique-jobs-7.1.27/lib/sidekiq_unique_jobs/middleware.rb line 40 in call
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/middleware/chain.rb line 179 in block in invoke
File /app/vendor/bundle/ruby/3.1.0/gems/rollbar-3.3.1/lib/rollbar/plugins/sidekiq/plugin.rb line 11 in call
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/middleware/chain.rb line 179 in block in invoke
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/middleware/chain.rb line 182 in invoke
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/processor.rb line 169 in block in process
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/processor.rb line 136 in block (6 levels) in dispatch
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/job_retry.rb line 112 in local
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/processor.rb line 135 in block (5 levels) in dispatch
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/rails.rb line 14 in block in call
File /app/vendor/bundle/ruby/3.1.0/gems/activesupport-6.1.6.1/lib/active_support/execution_wrapper.rb line 91 in wrap
File /app/vendor/bundle/ruby/3.1.0/gems/activesupport-6.1.6.1/lib/active_support/reloader.rb line 72 in block in wrap
File /app/vendor/bundle/ruby/3.1.0/gems/activesupport-6.1.6.1/lib/active_support/execution_wrapper.rb line 91 in wrap
File /app/vendor/bundle/ruby/3.1.0/gems/activesupport-6.1.6.1/lib/active_support/reloader.rb line 71 in wrap
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/rails.rb line 13 in call
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/processor.rb line 131 in block (4 levels) in dispatch
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/processor.rb line 263 in stats
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/processor.rb line 126 in block (3 levels) in dispatch
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/job_logger.rb line 13 in call
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/processor.rb line 125 in block (2 levels) in dispatch
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/job_retry.rb line 79 in global
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/processor.rb line 124 in block in dispatch
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/job_logger.rb line 39 in prepare
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/processor.rb line 123 in dispatch
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/processor.rb line 168 in process
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/processor.rb line 78 in process_one
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/processor.rb line 68 in run
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/component.rb line 8 in watchdog
File /app/vendor/bundle/ruby/3.1.0/gems/sidekiq-6.5.4/lib/sidekiq/component.rb line 17 in block in safe_thread 

What would be the best way to debug and fix it? Reduce concurrency of sidekiq to 1 and spawn more processes instead?

ioquatix commented 2 years ago

Is any state shared between workers? i.e. global connection pool or something.

zhulik commented 2 years ago

Maybe unintentionally, each thread is supposed to have it's own connection pool

        Thread.current.thread_variable_set(:grumlin_default_pool,
                                           Async::Pool::Controller.new(Grumlin::Client::PoolResource,
                                                                       limit: config.pool_size))

There is a shared mutex which is probably does not make any sense since I use Thread.current https://github.com/babbel/grumlin/blob/782c0f18ead3c8ccf1f72ff0f9e38a0dd9e51da6/lib/grumlin.rb#L157

ioquatix commented 2 years ago

Is there any chance resources are being leaked across threads?

ioquatix commented 2 years ago

The stack trace tells me that you have a fiber, possibly from a different thread, waiting on a resource or condition of a different thread.

ioquatix commented 2 years ago

We could debug this by logging the thread the condition was created on and bombing out as soon as it fails:

require 'async'

module CrossThreadBomb
  def initialize
    super
    @thread = Thread.current
  end

  def wait
    if Thread.current != @thread
      raise "Cross-thread bomb!"
    end

    super
  end
end

Async::Condition.prepend(CrossThreadBomb)

Async do
  condition = Async::Condition.new

  Thread.new do
    condition.wait
  end.join
end
zhulik commented 2 years ago

Thanks a lot! Unfortunately I can't reproduce the issue locally, I'll deploy your snippet and wait till it explodes

zhulik commented 2 years ago

Apparently this didn't help, I'm still getting exactly the same exception. Any hints what else classes I may monkey patch in the similar way?

UPD: I actually can't even reproduce the issue with these samples:

Async do
  condition = Async::Condition.new

  Thread.new do
    condition.wait
  end.join
end

Crashes with an exception

#<Thread:0x00007f765a72cb58 (irb):11 run> terminated with exception (report_on_exception is true):
                                                                                                  /home/user/.asdf/installs/ruby/3.1.2/lib/ruby/gems/3.1.0/gems/async-2.0.3/lib/async/condition.rb:56:in `wait': undefined method `transfer' for nil:NilClass (NoMethodError)

                        Fiber.scheduler.transfer
                                       ^^^^^^^^^
        from (irb):12:in `block (2 levels) in start'
irb(main):015:0> 50.66s     warn: Async::Task [oid=0xb8c4] [ec=0xb9a0] [pid=824748] [2022-08-24 12:35:30 +0200]
               | Task may have ended with unhandled exception.
               |   NoMethodError: undefined method `transfer' for nil:NilClass
               |   
               |                        Fiber.scheduler.transfer
               |                                       ^^^^^^^^^
               |   → /home/user/.asdf/installs/ruby/3.1.2/lib/ruby/gems/3.1.0/gems/async-2.0.3/lib/async/condition.rb:56 in `wait'
               |     (irb):12 in `block (2 levels) in start

and

Async do
  condition = Async::Condition.new

  Thread.new do
    Async do
      condition.wait
    end
  end.join
end

Does not crash at all

UPD 2: Same behavior with ruby 3.0 and async 1.30

ioquatix commented 2 years ago

Hmm, that's odd, I tested it locally and it blew up. Let me check it again.

zhulik commented 2 years ago

This snippet actually causes Condition#transfer to raise a FiberError once task is finished:

require "async"

Async do |reactor|
  task = reactor.async { Async::Task.current.sleep(1) }

  Thread.new do
    Async do
      task.wait
    end
  end
end
Traceback (most recent call last):
        5: from /home/user/.asdf/installs/ruby/2.7.5/lib/ruby/gems/2.7.0/gems/async-1.30.3/lib/async/task.rb:272:in `block in make_fiber'
        4: from /home/user/.asdf/installs/ruby/2.7.5/lib/ruby/gems/2.7.0/gems/async-1.30.3/lib/async/task.rb:287:in `finish!'
        3: from /home/user/.asdf/installs/ruby/2.7.5/lib/ruby/gems/2.7.0/gems/async-1.30.3/lib/async/condition.rb:62:in `signal'
        2: from /home/user/.asdf/installs/ruby/2.7.5/lib/ruby/gems/2.7.0/gems/async-1.30.3/lib/async/condition.rb:62:in `each'
        1: from /home/user/.asdf/installs/ruby/2.7.5/lib/ruby/gems/2.7.0/gems/async-1.30.3/lib/async/condition.rb:63:in `block in signal'
/home/user/.asdf/installs/ruby/2.7.5/lib/ruby/gems/2.7.0/gems/async-1.30.3/lib/async/condition.rb:63:in `resume': fiber called across threads (FiberError)
ioquatix commented 2 years ago

Sorry, I didn't have time to look at this, I'll spend some time tonight.

zhulik commented 2 years ago

You were absolutely right, the pool created in one thread may have been memoized and used from another thread. I removed the memoization and it fixed the issue

Thank you!

jhass commented 1 month ago

For future minds that find this first while searching for the error message, for me the trigger was the Faraday adapter, see https://github.com/socketry/async-http-faraday/issues/31