lampepfl / gears

A strawman for a low-level async library in Scala 3.
https://lampepfl.github.io/gears/
Apache License 2.0
257 stars 26 forks source link

Experiment: Only allow `Async.Spawn` to spawn runnable futures #46

Closed natsukagami closed 8 months ago

natsukagami commented 9 months ago

What's this?

The goal is to disallow spawning dangling Futures from using Async functions. Async.Spawn is an opaque alias of Async, defined as a subtype of Async, obtained by explicitly "upgrading" it through Async.group -- or automatically given through Async.blocking or Future.apply.

The Async.Spawn-taking functions (signalling usage of dangling futures) should follow the hacky signature of Future.apply:

  def apply[T](body: Async.Spawn ?=> T)
    (using async: Async, spawn: Async.Spawn & async.type): T

to ensure that the given Async instance (which is usually synthesized to be the innermost context) is the same instance as the Async.Spawn instance. It happens quite often (especially when nesting Async contexts) that these don't match:

  extension[T] (s: Seq[T])
    def parallelMap[U](f: T => Async ?=> U)(using Async): Seq[U]

  Async.blocking: // Async.Spawn here...
    val seq = Seq(1, 2, 3, 4, 5)
      .parallelMap: n => // Async here...
         Async.select(
            Future(doSomethingAsync(n)) handle Some(_),     // oops, spawned by Async.blocking
            Future(Async.sleep(1.minute)) handle _ => None, // oops, spawned by Async.blocking
         )
    // oops, leaking all the futures...

with the Future.apply signature as above, this does not happen and will give a compile time error.

adamw commented 9 months ago

Not really related to the PR, rather to the general design - out of curiosity, why do you need the context parameter in def parallelMap[U](f: T => Async ?=> U)(using Async): Seq[U]? If f would spawn any async computations, it could capture the capability from the enclosing environment, at usage-site, no?

In ox we have a similar method, but the signature is simpler (it's a top-level function, not an extension, but that's irrelevant I guess): def mapPar[I, O, C[E] <: Iterable[E]](parallelism: Int)(iterable: => C[I])(transform: I => O): C[O]. So I think we avoid this problem (if I understand the problem correctly), but maybe we have some other problems that I don't know about ;)

natsukagami commented 9 months ago

The context parameter usually appears if as a function you intend to provide a different async scope to the function parameter. In this case, we would want fs to run in Futures within the body. Runnable futures wrap and pass a new Async context to its body, handling cancellation and provide its own scoping. Perhaps parallelMap want to cancel the spawned futures for some reason, it cannot do so if f were to capture the Async instance from outside.

On Wed, Feb 28, 2024 at 12:25 PM Adam Warski @.***> wrote:

Not really related to the PR, rather to the general design - out of curiosity, why do you need the context parameter in def parallelMap[U](f: T => Async ?=> U)(using Async): Seq[U]? If f would spawn any async computations, it could capture the capability from the enclosing environment, at usage-site, no?

In ox we have a similar method, but the signature is simpler (it's a top-level function, not an extension, but that's irrelevant I guess): def mapPar[I, O, C[E] <: Iterable[E]](parallelism: Int)(iterable: => C[I])(transform: I => O): C[O]. So I think we avoid this problem (if I understand the problem correctly), but maybe we have some other problems that I don't know about ;)

— Reply to this email directly, view it on GitHub https://github.com/lampepfl/gears/pull/46#issuecomment-1968779098, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACFEK2MM4WQICCJP2BSGXA3YV4H3XAVCNFSM6AAAAABD4L4LEOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSNRYG43TSMBZHA . You are receiving this because you authored the thread.Message ID: @.***>

adamw commented 9 months ago

Ah yes I see the use-case and problem :)

I guess I would typically expect f to create its own scope, e.g. (using ox's syntax - I think supervised is more or less Async.blocking):

myList.parallelMap { n =>
  supervised { // custom scope
    val f1 = fork(...)
    val f2 = fork(...)
    f1.join() + f2.join()
  }
}

That way if a particular mapping invocation is interrupted (e.g. one of the f invocations throws an exception), this will propagate to interrupt whatever is happening in the supervised.

But still it's possible that when parallelMap is itself inside a scope, it will capture the wrong one:

supervised {
  myList.parallelMap { n =>
    val f1 = fork(...)
    val f2 = fork(...)
    f1.join() + f2.join()
  }
}

Now an exception thrown by any of the f invocations would interrupt the forks, thus ending the outer scope, which is probably not what you'd want. I wonder if capture checking would be able to solve this - we'd have to require that f does not capture Ox/Async.

But I guess that's what you are trying to solve here, in another way? One thing I don't understand - isn't Async and Async.Spawnable really two different capabilities?

natsukagami commented 9 months ago

isn't Async and Async.Spawnable really two different capabilities?

They are quite different from gears's POV I think, due to how gears think of concurrency as suspendable computations rather than just scoped threads.

Initially I think it is totally fine to keep both of the capabilities within Async, but it seems to undermine principles of structured concurrency, especially when calling an using Async function might leave you with futures still running after it returns.

I don't think there is a concept of .await in Ox (we just rely on blocking ops in Loom JVM being able to handle suspension), and so the two looks the same.

I think supervised is more or less Async.blocking

With the above in mind I think it is more clear that Async.spawnable is closer to supervised ;)

adamw commented 9 months ago

@natsukagami Thanks, a great explanation! Indeed, that's where ox/gears differ: in ox, there's no capability needed to block (suspend) - you can always do that. So we only have the other one (scoping threads).

natsukagami commented 8 months ago

Ok, let's get this in ;)