ruby-concurrency / concurrent-ruby

Modern concurrency tools including agents, futures, promises, thread pools, supervisors, and more. Inspired by Erlang, Clojure, Scala, Go, Java, JavaScript, and classic concurrency patterns.
https://ruby-concurrency.github.io/concurrent-ruby/
Other
5.71k stars 420 forks source link

Concurrent::Promises::Channel deadlock in example code #860

Open ilyacherevkov opened 4 years ago

ilyacherevkov commented 4 years ago

I'm running example from Promises::Channel docs.

channel = Concurrent::Promises::Channel.new 2
log     = Concurrent::Array.new          # => []

def produce(channel, log, producer, i)
  log.push format "producer %d pushing %d", producer, i
  channel.push_op([producer, i]).then do
    i + 1 < 4 ? produce(channel, log, producer, i + 1) : :done
  end
end                                      # => :produce

def consume(channel, log, consumer, i)
  channel.pop_op.then(consumer, i) do |(from, message), consumer, i|
    log.push format "consumer %d got %d. payload %d from producer %d",
                    consumer, i, message, from
    do_stuff
    i + 1 < 2 ? consume(channel, log, consumer, i + 1) : :done
  end
end                                      # => :consume

producers = Array.new 2 do |i|
  Concurrent::Promises.future(channel, log, i) { |*args| produce *args, 0 }.run
end

consumers = Array.new 4 do |i|
  Concurrent::Promises.future(channel, log, i) { |*args| consume *args, 0 }.run
end

producers.map(&:value!)                  # => [:done, :done]
consumers.map(&:value!)                  # => [:done, :done, :done, :done]
log

and it returns error:

<home>/.rbenv/versions/2.7.0/lib/ruby/gems/2.7.0/gems/concurrent-ruby-1.1.6/lib/concurrent-ruby/concurrent/promises.rb:775:in 'sleep': No live threads left. Deadlock? (fatal)

* Operating system:                mac
* Ruby implementation:             Ruby MRI 2.7.0
* `concurrent-ruby` version:       1.1.6
* `concurrent-ruby-ext` installed: yes
* `concurrent-ruby-edge` used:     yes
blelump commented 4 years ago

It starts to work when the channel capacity is being increased to 4. Nonetheless it is still unclear to me why it has to be 4, or in essence, what's the underlying Channel behavior.

Edit: After several days around concurrency topics I think the deadlock happens due to unfortunate coincidence. So it happens, because there is No live threads left which means that all of the threads are sleeping. The do_stuff default implementation has some sleep x internally, which causes consumers to pause. On the other hand, when the channel is full, the futures also wait for being processed so the deadlock happens, because there is no active thread. How I solved it currently was to add instead of last line:

Thread.new do
  while true
    sleep 0.2
    puts log.unshift(log.length).join("\n")
  end
end

There are of course other solutions.