codegram / futuroscope

Yet another Futures implementation in Ruby
MIT License
209 stars 13 forks source link

Huge feature pack: Deadlock detection, prioritized futures, smart up/down spinning of workers, logging #23

Open ggPeti opened 10 years ago

ggPeti commented 10 years ago

Apologies for the giant PR, but one thing led to another, and it didn't make sense to implement one without another (well, logging could have been separated, but it was very useful for debugging while implementing the rest).

The features implemented are:

Deadlock detection

There are 2 kinds of deadlocks that we can detect.

1. Circular dependency

When futures depend on each other in a circle, Futuroscope will detect that and send each thread involved a DeadlockError with a message describing the situation. For example:

2.1.0 :001 > f2 = nil
 => nil 
2.1.0 :002 > f1 = future { f2 = future { f1.future_value }; f2.future_value }; f1.inspect
Futuroscope::DeadlockError: Cyclical dependency detected, the future was aborted.
    (...stack trace...)
2.1.0 :004 > f2.inspect
Futuroscope::DeadlockError: Cyclical dependency detected, the future was aborted.
    (...stack trace...)
2. Pool size too low

When the pool is full, but all futures are waiting for another future that doesn't have a worker yet, Futuroscope will fail a future which does not depend on any other future (and the one with the least priority out of those). (Note: This selection might be optimized to selecting the root of the smallest unary branch of the dependency forest instead of the root of the smallest tree, but this is a good enough solution for a situation that rarely occurs.) For example:

2.1.0 :001 > Futuroscope.default_pool = Futuroscope::Pool.new 1..1
 => #<Futuroscope::Pool:0x000001020adef0 @min_workers=1, @max_workers=1, @dependencies={}, @priorities={}, @future_needs_worker=#<Thread::ConditionVariable:0x000001020ade78>, @workers=#<Set: {#<Futuroscope::Worker:0x000001020add88 @pool=#<Futuroscope::Pool:0x000001020adef0 ...>, @free=true, @thread=#<Thread:0x000001020add38 sleep>>}>, @mutex=#<Mutex:0x000001020addb0>, @futures={}> 
2.1.0 :002 > future { future { 1 } + 1 }
Futuroscope::DeadlockError: Pool size is too low, the future was aborted.
    (...stack trace...)

Prioritized futures

Instead of a queue, Futuroscope now uses something resembling a priority queue, where the priority is determined by how many threads are directly or indirectly blocking on the future's value. This can avoid deadlock situations, for example:

2.1.0 :001 > Futuroscope.default_pool = Futuroscope::Pool.new 2..2
 => #<Futuroscope::Pool:0x000001018c2470 @min_workers=2, @max_workers=2, @dependencies={}, @priorities={}, @future_needs_worker=#<Thread::ConditionVariable:0x000001018c23f8>, @workers=#<Set: {#<Futuroscope::Worker:0x000001018c2308 @pool=#<Futuroscope::Pool:0x000001018c2470 ...>, @free=true, @thread=#<Thread:0x000001018c22e0 sleep>>, #<Futuroscope::Worker:0x000001018c2150 @pool=#<Futuroscope::Pool:0x000001018c2470 ...>, @free=true, @thread=#<Thread:0x000001018c2128 sleep>>}>, @mutex=#<Mutex:0x000001018c2358>, @futures={}> 
2.1.0 :002 > f1, f2, f3, f4 = future { sleep 1; f4.future_value }, future { sleep 3 }, future { sleep 1; f4.future_value }, future { 4 }
 => [4, 3, 4, 4]

In the old implementation, this would raise a fatal error, because f1 starts to block on f4 before it gets to the queue, then f3 starts to block on f4 as well after f2 is done, so the pool gets full of blocking futures before f4 has a chance to be evaluated.

Smart up/down spinning of workers

Previously, with the push of every new future, there was a 50% chance that a new worker gets spun up if the limit hasn't been reached yet. Now the workers keep track whether they are free or busy, and the pool only spins up a new worker if it has more futures without workers than free workers.

Also previously every thread immediately quit if it was over the minimum thread count. Now, if the pool has more workers than the minimum, the workers have a 2 second linger period in which they will pick up new work if it's available, and only if no new work came in will they die. This is to minimize the cost of spinning threads up and down when the work comes in close spikes.

This also leads to better parallelization in some cases. Old version:

2.1.2 :001 > require 'benchmark'
 => true 
2.1.2 :002 > Futuroscope.default_pool = Futuroscope::Pool.new 1..10
 => #<Futuroscope::Pool:0x007fe502131840 @min_workers=1, @max_workers=10, @queue=#<Thread::Queue:0x007fe5021317f0>, @workers=#<Set: {#<Futuroscope::Worker:0x007fe5021316d8 @pool=#<Futuroscope::Pool:0x007fe502131840 ...>, @thread=#<Thread:0x007fe502131688 sleep>>}>, @mutex=#<Mutex:0x007fe502131700>> 
2.1.2 :004 > Benchmark.measure { (1..10).future_map { sleep 1 }.inspect }.real
 => 2.002055 

New version:

2.1.0 :001 > require 'benchmark'
 => true 
2.1.0 :002 > Futuroscope.default_pool = Futuroscope::Pool.new 1..10
 => #<Futuroscope::Pool:0x0000010189afb0 @min_workers=1, @max_workers=10, @dependencies={}, @priorities={}, @future_needs_worker=#<Thread::ConditionVariable:0x0000010189af38>, @workers=#<Set: {#<Futuroscope::Worker:0x0000010189ae20 @pool=#<Futuroscope::Pool:0x0000010189afb0 ...>, @free=true, @thread=#<Thread:0x0000010189adf8 sleep>>}>, @mutex=#<Mutex:0x0000010189ae70>, @futures={}> 
2.1.0 :003 > Benchmark.measure { (1..10).future_map { sleep 1 }.inspect }.real
 => 1.002627 

Logging

Futuroscope now supports logging. If you put loggers into Futuroscope.loggers (which is a simple array), they will get all log messages from inside. The loggers are expected to conform with Ruby's core Logger, in that they respond to these messages: debug, info, warn, error, fatal.

Example:

2.1.0 :001 > require 'logger'
 => true 
2.1.0 :002 > Futuroscope.loggers << Logger.new(STDERR)
 => [#<Logger:0x00000102887f90 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x00000102887f68 @datetime_format=nil>, @formatter=nil, @logdev=#<Logger::LogDevice:0x00000102887f18 @shift_size=nil, @shift_age=nil, @filename=nil, @dev=#<IO:<STDERR>>, @mutex=#<Logger::LogDevice::LogDeviceMutex:0x00000102887ef0 @mon_owner=nil, @mon_count=0, @mon_mutex=#<Mutex:0x00000102887ea0>>>>] 
2.1.0 :003 > Futuroscope.default_pool = Futuroscope::Pool.new 1..1
I, [2014-09-29T16:50:17.214206 #85319]  INFO -- :         spun up worker with thread 2168719860
 => #<Futuroscope::Pool:0x00000102881690 @min_workers=1, @max_workers=1, @dependencies={}, @priorities={}, @future_needs_worker=#<Thread::ConditionVariable:0x00000102881500>, @workers=#<Set: {#<Futuroscope::Worker:0x00000102881410 @pool=#<Futuroscope::Pool:0x00000102881690 ...>, @free=true, @thread=#<Thread:0x000001028813e8 sleep>>}>, @mutex=#<Mutex:0x00000102881438>, @futures={}> 
I, [2014-09-29T16:50:17.214845 #85319]  INFO -- : POP:    thread 2168719860 going to sleep until there's something to do...
2.1.0 :004 > f = future { f.future_value }
I, [2014-09-29T16:50:25.741545 #85319]  INFO -- : PUSH:   added future 2168704840
I, [2014-09-29T16:50:25.741620 #85319]  INFO -- :         sending signal to wake up a thread
D, [2014-09-29T16:50:25.741673 #85319] DEBUG -- :         current priorities: {"future 2168704840"=>0}
I, [2014-09-29T16:50:25.741800 #85319]  INFO -- : DEPEND: thread 2156059640 depends on future 2168704840
D, [2014-09-29T16:50:25.741861 #85319] DEBUG -- :         current dependencies: {"thread 2156059640"=>"future 2168704840"}
I, [2014-09-29T16:50:25.741962 #85319]  INFO -- :         incrementing priority for future 2168704840
I, [2014-09-29T16:50:25.742025 #85319]  INFO -- : POP:    ... thread 2168719860 woke up
D, [2014-09-29T16:50:25.742069 #85319] DEBUG -- :         current priorities: {"future 2168704840"=>1}
D, [2014-09-29T16:50:25.742110 #85319] DEBUG -- :         current future workers: {"future 2168704840"=>nil}
I, [2014-09-29T16:50:25.742156 #85319]  INFO -- : POP:    thread 2168719860 will start working on future 2168704840
I, [2014-09-29T16:50:25.742203 #85319]  INFO -- : DEPEND: thread 2168719860 depends on future 2168704840
D, [2014-09-29T16:50:25.742266 #85319] DEBUG -- :         current dependencies: {"thread 2156059640"=>"future 2168704840", "thread 2168719860"=>"future 2168704840"}
E, [2014-09-29T16:50:25.742313 #85319] ERROR -- :         deadlock! cyclical dependency, sending interrupt to all threads involved
I, [2014-09-29T16:50:25.742388 #85319]  INFO -- : DONE:   thread 2168719860 is done with future 2168704840
I, [2014-09-29T16:50:25.742423 #85319]  INFO -- :         deleting future 2168704840 from the task list
I, [2014-09-29T16:50:25.742462 #85319]  INFO -- :         deleting dependency from thread 2156059640 to future 2168704840
I, [2014-09-29T16:50:25.742498 #85319]  INFO -- :         deleting dependency from thread 2168719860 to future 2168704840
I, [2014-09-29T16:50:25.742548 #85319]  INFO -- : POP:    thread 2168719860 going to sleep until there's something to do...
Futuroscope::DeadlockError: Cyclical dependency detected, the future was aborted.
    (...stack trace...)

I'd also like to note that this PR comes with multiple engineering decisions that are worth discussing. For example, I've moved away from using a Queue in both the pool and the future; instead, I'm now making use of ConditionVariables (which Queue is using internally as well).

Another one is that the pool keeps track of the __id__s of the futures as hash keys. Why is that so, why am I using 2 hashes instead of one: one with future.__id__s pointing to priority values, and one with future.__id__s pointing to the actual futures? Because if you want to use a Delegator as a hash key, Ruby will call #hash on the object, which gets forwarded to the wrapped object, creating a deadlock. Not forwarding #hash is no solution: it will make the futures non-transparent, hsh[:key] will not be the same as hsh[future { :key }].

Please discuss any parts that you disagree with, I'm happy to elaborate and of course it's always possible that I missed something.

josepjaume commented 10 years ago

OMG I'm so sorry because of all @houndci's trolling :(

This is as awesome as huge :). I'll try to have a look at the implementation tonight, but deadlock detection is definitely an improvement over what we had before.

Out of curiosity: Were you having a lot of deadlock situations? Could you give me an example? I actually haven't used futuroscope much, least in production environments.

ggPeti commented 10 years ago

Yes we have ran into deadlocks! It was the motivation for this PR. We use futures all around, and sometimes a situation emerges that is similar to the example under the headline Prioritized futures, just with more threads and futures in the pool. Basically it's a situation where a future that was pushed early pushes something at the end of the queue, which also happens to be a common dependency for a lot of other futures coming later in the queue. The concrete example:

There is a search provider module which takes phone numbers, email addresses etc. It runs all searches in futures, but also caches them: if later we run the search with a phone number that was ran earlier, it will just give this future back. If a lot of other futures run searches agains the same phone number, the actual search can be stuck outside the pool's reach, locking down everything, as the ones inside are waiting for something that won't be allocated a worker before they themselves get resolved.

Hope this makes sense.

Oh and I also noticed that the build fails on rbx, jruby and 1.9.3 because there's no Thread.handle_interrupt on them :( I can fix 2.0.0 however, by replacing to_h with Hash[]. What do you think the best course of action is? I realize that rbx and jruby are the only versions without a GIL, so it may be awkward for futuroscope to not support those. On the other hand, for IO heavy use cases (such as ours) it really shines on MRI too.

txus commented 9 years ago

@ggPeti this is.... amazing. I say we drop support for 1.9.3 and only support 2+, the latest Rbx (compatible with 2+), and in the future JRuby 9000. Mind changing the CI settings to make your PR pass? I'll merge as soon as it's green. Thank you SO much for all these features! :)

ggPeti commented 9 years ago

Thanks for reviewing it! I will get back on this in the following days. Rbx didn't have asyncronous thread messaging implemented at the time of my PR, I will check on it to see if they implemented it since. If not, I will see if I can implement it myself. This PR makes heavy use of handle_interrupt so we definitely need it.