socketry / async

An awesome asynchronous event-driven reactor for Ruby.
MIT License
2.14k stars 88 forks source link

How to prevent deadlock using Async? #107

Closed romiras closed 3 years ago

romiras commented 3 years ago

I wrote a simple demo producer-consumer using Thread(s) and Queue and it works. Then I rewrote it using Async, it fails due to deadlock. Tried to use Async::Reactor instead while loop and no luck.

I, [2021-03-27T14:36:31.695540 #30532]  INFO -- : Consumer
Traceback (most recent call last):
    2: from /home/user/Apps/rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/async-1.28.9/lib/async/task.rb:265:in `block in make_fiber'
    1: from /home/user/Devel/Ruby/async_queue_test.rb:25:in `block (4 levels) in run'
/home/user/Devel/Ruby/async_queue_test.rb:25:in `pop': No live threads left. Deadlock? (fatal)
1 threads, 1 sleeps current:0x0000555f03693840 main thread:0x0000555f03693840
* #<Thread:0x0000555f036f4b60 sleep_forever>
   rb_thread_t:0x0000555f03693840 native:0x00007f9440267740 int:0
   /home/user/Devel/Ruby/async_queue_test.rb:25:in `pop'
   /home/user/Devel/Ruby/async_queue_test.rb:25:in `block (4 levels) in run'
   /home/user/Apps/rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/async-1.28.9/lib/async/task.rb:265:in `block in make_fiber'

Can you advise how to fix it?

require 'logger'

def has_jobs(item)
    !item.nil?
end

def run(log)
    delay = 0.2
    times = 3

    q = Queue.new

    # consumer
    t1 = Thread.new do
        log.info('Consumer')
        while (has_jobs(job = q.deq))
            log.info("consume #{job}")
        end
        log.info('Consumer exited')
    end

    # producer
    t2 = Thread.new do
        log.info('Producer')
        (1..times).each do |i|
            log.info("produce #{i}")
            q.enq(i)
            sleep(delay)
        end
        q.enq(nil)
        q.close
        log.info('Producer exited')
    end

    # Ensure we wait for all tasks to complete before continuing:
    [t1, t2].each(&:join)
end

log = Logger.new(STDOUT)

t = Time.now
run(log)
puts(Time.now-t)

log.info('Done')
require 'async'
require 'async/barrier'
require 'async/semaphore'
require 'logger'

def has_jobs(item)
    !item.nil?
end

def run(log)
    delay = 0.2
    times = 3

    q = Queue.new

    Async do
        barrier = Async::Barrier.new
        semaphore = Async::Semaphore.new(2, parent: barrier)

        # consumer
        semaphore.async do
            Async do |task|
                log.info('Consumer')
                while (has_jobs(job = q.deq))
                    log.info("consume #{job}")
                end
                # Async::Reactor.run do
                #     job = q.deq
                #     if has_jobs(job)
                #         log.info("consume #{job}")
                #     end
                # end
                log.info('Consumer exited')
            end
        end

        # producer
        semaphore.async do
            Async do |task|
                log.info('Producer')
                (1..times).each do |i|
                    log.info("produce #{i}")
                    q.enq(i)
                    task.sleep(delay)
                end
                q.enq(nil)
                q.close
                log.info('Producer exited')
            end
        end

        # Ensure we wait for all tasks to complete before continuing:
        barrier.wait
    end
end

log = Logger.new(STDOUT)

t = Time.now
run(log)
puts(Time.now-t)

log.info('Done')
ioquatix commented 3 years ago

It appears to work fine for me on Ruby 3.

image
romiras commented 3 years ago

@ioquatix well, you're right. I've checked also in Ruby 3.0.0 and it seems to work...

Just checked, this is output in Ruby 2.7.2:

I, [2021-03-29T00:55:56.863708 #2168]  INFO -- : Consumer
Traceback (most recent call last):
    2: from /home/user/Apps/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/async-1.28.9/lib/async/task.rb:265:in `block in make_fiber'
    1: from /home/user/Devel/Ruby/async_queue_test.rb:24:in `block (3 levels) in run'
/home/user/Devel/Ruby/async_queue_test.rb:24:in `pop': No live threads left. Deadlock? (fatal)
1 threads, 1 sleeps current:0x000055cd611bf320 main thread:0x000055cd611bf320
* #<Thread:0x000055cd611eed88 sleep_forever>
   rb_thread_t:0x000055cd611bf320 native:0x00007ff9d706ed80 int:0
   /home/user/Devel/Ruby/async_queue_test.rb:24:in `pop'
   /home/user/Devel/Ruby/async_queue_test.rb:24:in `block (3 levels) in run'
   /home/user/Apps/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/async-1.28.9/lib/async/task.rb:265:in `block in make_fiber'
romiras commented 3 years ago

@ioquatix, so am I using Async in proper way? Does a problem above belong to Async or Ruby?

ioquatix commented 3 years ago

Ruby 3.0 exposes hooks for blocking operations and Async can handle them.

If you want to write something which is based on async, use Async::Queue. Otherwise, if you want to use thread primtives, use them. But if you mix it, and expect concurrency, you better use Ruby 3.

In other words, if you use thread primitives with async on Ruby 2, you will block the reactor.