socketry / async

An awesome asynchronous event-driven reactor for Ruby.
MIT License
2.04k stars 85 forks source link

How to wait for `.schedule`'d fibers to finish? #290

Closed rmosolgo closed 7 months ago

rmosolgo commented 7 months ago

Hi! I've mustered another burst of energy to try to use this gem's fiber scheduler at the heart of GraphQL-Ruby's execution code.

I think I understand my current hangup, and I'm wondering if you could share any thoughts that might help me move forward. In general, I have two kinds of fibers that might be running:

I'm trying to find a way to make a two-phase job queue, where the Execution jobs run as far as they can, then the data fetching jobs run until they terminate, then back to execution, and so on, until there's nothing left to do.

Ideally -- and this is actually the whole point -- I'd use Ruby's non-blocking fibers so that IO in one job wouldn't cause other jobs to wait. Except for the boundary between the two kinds of jobs; in that case, I do want wait for all jobs of one kind to finish before I resume other jobs.

A couple other points:

I've made a small script to illustrate what I think my problem is. This version of the script works:

Job Queues Script

```ruby require "async" class Database def initialize @requests = [] @data = {} end def request(key) @requests << key puts "Transfering from #{Fiber.current.object_id} to #{Thread.current[:parent_fiber].object_id}" Thread.current[:parent_fiber].transfer @data.fetch(key) end def perform_fetch puts "loading #{@requests}" t1 = Time.now sleep 0.5 @requests.each do |req| @data[req] = "Data for #{req}" end puts "Elapsed #{Time.now - t1}" @requests.clear end end DB = Database.new Fiber.set_scheduler(Async::Scheduler.new) parent_fiber = Fiber.current puts "Parent fiber #{parent_fiber.object_id}" waiting_fibers = [] waiting_fibers << Fiber.schedule do Thread.current[:parent_fiber] = parent_fiber sleep(0.4) v = DB.request(1) pp "Fetched #{v}" end waiting_fibers << Fiber.schedule do Thread.current[:parent_fiber] = parent_fiber v = DB.request(2) pp "Fetched #{v}" v2 = DB.request(3) pp "Fetched #{v2}" end while waiting_fibers.any? DB.perform_fetch waiting_fibers.select! do |f| if f.alive? f.transfer true else false end end end ```

That script works and terminates as expected. But If I use Fiber.schedule { ... } to put the "database" call in the background:

Fiber.schedule do 
  DB.perform_fetch 
end 

Then, the "database" hasn't fetched data by the time the "waiting fiber" resumes and the .fetch call fails because the key is missing. So I tried adding a Fiber.scheduler.run to let DB.perform_fetch finish:

Fiber.schedule do 
  DB.perform_fetch 
end 
Fiber.scheduler.run

But then, the .perform_fetch fiber terminates. But it deadlocks -- I think because the first "waiting fiber" has sleep 0.4 (simulating some application I/O), and it manually parent_fiber.transfers during Fiber.scheduler.run.

So the code never gets past Fiber.scheduler.run, I assume because it's waiting for that fiber, but the fiber has already transferred back to the parent fiber and won't terminate until it's resumed.


Ok, sorry for the long write-up, but, if you have time to look it over, is there anything that comes to mind that I might try to make this work?

I have a draft PR of GraphQL::Dataloader where I'm working on this, too, in case it's any help: https://github.com/rmosolgo/graphql-ruby/pull/4710

Thanks!

ioquatix commented 7 months ago

Async::Task is a form of promise which makes coordination easy.

Fiber.schedule provides no such mechanism.

If you want a promise style mechanism, you need to use something, e.g. Thread::ConditionVariable or Thread::Queue. Alternatively, we could introduce something, like Fiber::Promise or Fiber::Result - it's straight forward to implement.

For trivial Fiber#resume usage, no coordination is needed, but for fan-out or map-reduce style workloads, coordination is needed. Different primitives suit different use cases.

rmosolgo commented 7 months ago

Thanks for sharing your thoughts on this, @ioquatix. It sounds like I'm barking up the wrong tree. I really wanted to stay in Fibers so that I could be sure that, for CPU, only one thing was happening at a time (and I have this fear of spinning up new Threads?!). I'll investigate Async::Task!

I tried Thread::ConditionVariable and Thread::Queue a while back but had trouble getting it to work with the current Fiber-based implementation, where (IIRC) Fibers couldn't be started or stopped by different Threads. But because of my previously-mentioned fear of Threads, I backed away slowly.

Because I'm generally curious, I'd love to hear more about what you mean by Fiber::Promise or Fiber::Result if you have a few minutes to spell it out sometime!

In any case, I'll try re-working it with Async::Task and follow up if I run into any more trouble. Thanks again!

rmosolgo commented 7 months ago

Hot take after reviewing the Async::Task docs: I think what I'm missing to make a seamless switch is a way to pause and resume Tasks, as I might with Fiber.yield or .transfering back to a parent fiber. It looks like Tasks can start, then terminate, but not be paused and resumed.

Pausing and resuming is a core feature of the current Dataloader implementation because it supports seamless code like

record = dataloader.with(RecordSource).load(1) # The fiber pauses here until we've loaded this value from the DB
record.name 

In that case, no block-based chaining is required, which I consider a feature because it avoids "what color is my function"-type code, where some methods return Promise-ish things while others return plain values. I guess that makes it like await ... in application code.

trevorturk commented 7 months ago

Apologies in advance if this is missing the point, but I wanted to note that you have Async::Task#wait which may be usable here. See docs here: https://socketry.github.io/async/guides/asynchronous-tasks/index#waiting-for-tasks and note also Async::Barrier may be interesting. I also worked up some GraphQL examples using Async::Tasks here: https://github.com/socketry/async-examples which allow me to completely avoid the Dataloader system FWIW, just using lazy_resolve, which might be interesting for reference.

I suppose I'm just wondering if the load method in your example called wait on the Async::Task associated with the record 1...?

rmosolgo commented 7 months ago

Thanks for sharing that link, @trevorturk -- I hadn't seen those! That's great.

I definitely know you can do async data loading with GraphQL if you hand-code the concurrency. But what I really want is to live in the world I think Fiber.scheduler promises, where, if you have I/O in a Fiber, the scheduler will background it (if it can).

If I understand the possibility right, then you could say something like:

That would deliver good performance and good DX ... if it's possible! (Maybe it's not :S)

trevorturk commented 7 months ago

Hmm, I'm sorry but I'm not totally following the description of your ideal state here! I'd be happy to chat more if it might be helpful, but I think I'm misunderstanding something (probably more than one thing lol).

Other possible reading that might interest you, just in case it's helpful: https://brunosutic.com/blog/async-ruby and https://brunosutic.com/blog/ruby-fiber-scheduler

I think (but I'm not sure) that you might want to be looking higher up the Async stack, as opposed to looking at Fibers directly, but hopefully @ioquatix can provide some guidance if he understands what you're shooting for.

rmosolgo commented 6 months ago

Hey @trevorturk, thanks again for sharing those examples. I reviewed them, and sometime while they were cookin' in there, I think I finally understood how it might look to use Async primitives inside GraphQL::Dataloader. I've got a nicely-working implementation here: https://github.com/rmosolgo/graphql-ruby/pull/4727

I'm going to do a bit of follow-up before I merge but if either you have any thoughts, I'd appreciate a review on the PR!