socketry / async

An awesome asynchronous event-driven reactor for Ruby.
MIT License
2.12k stars 86 forks source link

Introduce some new `Schedule` operation for explicit nesting. #238

Open ioquatix opened 1 year ago

ioquatix commented 1 year ago

This is an alternative to the proposal in https://github.com/socketry/async/issues/237.

It requires the following:

Schedule {} # (1) Error, No parent async task set!

Sync do
  Schedule {...} # (2) Ok, running in an existing event loop / reactor.
end

I have no opinion about the name right now.

The original proposal has this example code:

def process_jobs(jobs)
  Async::Task.current? or raise ArgumentError, "must start up reactor first with surrounding 'Async { ... }"

  jobs
    .map { Async { _1.process! } }
    .map(&:wait)
end

It would become this:

def process_jobs(jobs)
  jobs
    .map { Schedule { _1.process! } }
    .map(&:wait)
end

However, I think it's worse, because the error of the missing event loop is delayed until some processing is already done. It also begs the question, why such an error is useful. If the expectation is that method can run concurrently, just make it so (see "Using Outer Sync Instead", below).

The alternative design avoids this:

def process_jobs(jobs, parent: Async::Task.current) # Error will occur here if no parent async task is set.
  jobs
    .map { parent.async { _1.process! } }
    .map(&:wait)
end

It's also going to be a little less efficient to look up the parent context on every iteration, and it also gives you less control (e.g. providing an explicit semaphore to parent:).

By the way, it should be noted, that Fiber.schedule is very similar to the desired functionality:

irb(main):001:0> Fiber.schedule{}
(irb):1:in `schedule': No scheduler is available! (RuntimeError)

vs

irb(main):003:0> Async{Fiber.schedule{}}
=> #<Async::Task:0x000000000000d5fc>

It should also be noted that my original proposal was Fiber{} rather than Fiber.schedule{}. In the original proposal, it could be used like this:

def process_jobs(jobs)
  jobs
    .map { Fiber.schedule { _1.process! } }
    .map(&:wait)
end

Using Outer Sync Instead

The point of the Async interface is to avoid the user having to be aware of the parent concurrency, however, I do accept there are a few sharp edge cases. For example, if you are expecting Async to run in conjunction with other tasks.

def process_jobs(jobs)
  Sync do |task| # Ensures a reactor is running, and if not, starts it for this block.
    jobs
      .map { task.async { _1.process! } } # All of these tasks run concurrently now.
      .map(&:wait)
  end
end

For the purpose of this library, I'm not very enthusiastic about breaking backwards compatibility, but I'm okay to introduce new features if they help the code become more expressive and less error-prone.

Contribution