inko-lang / inko

A language for building concurrent software with confidence
http://inko-lang.org/
Mozilla Public License 2.0
873 stars 38 forks source link

Consider using structured concurrency instead of being able to spawn processes at any point #759

Closed yorickpeterse closed 5 days ago

yorickpeterse commented 1 week ago

Description

The premise of structured concurrency is simple: you have something that signals a join scope, and a construct to spawn processes in that scope. When the end of the scope is reached, all processes are joined. Conceptually this is the same as having a channel with space for N messages, N processes that use the channel and send it a message when they're join, and N calls to Channel.receive to wait for the processes.

Structured concurrency makes reasoning about concurrency easier, because it's more clear where asynchronous work starts and ends. In addition, by making this part of the compiler/type system you can allow for interesting patterns such as sharing read-only access to data. This article is a good reference on the subject.

A hypothetical setup would be the following:

let nums = [10, 20, 30]

async {
  let handles = nums.iter.map(fn (v) { spawn { v * v } }).to_array

  Int.sum(handles.into_iter.map(fn (v) { v.value }))
}

The idea is that async signals a scope in which asynchronous operations can happen. spawn in turn spawns a new process in the inner most async block. Variables defined outside an async scope are exposed as immutable references. Values of type uni T are moved into an async scope upon capture by it or a spawn expression.

spawn expressions in turn can only capture variables that are value types (e.g. Int) or defined outside the async expression. If a spawn captures a variable of type uni T, the variable is moved into the spawn. This means such variables need to be assigned a new value if captured inside a spawn inside a loop.

When reaching the end of the async expression, we discard any unused values as usual, then join all the spawned processes. The return value of an async is the last expression, just as with regular scopes.

Processes in turn are given a value method. When called, it joins the process and returns whatever value the spawn expression returned. This is done by generating a class for each spawn with the appropriate fields (one for each capture), and by generating said value method. We also generate a dropper that does the same thing, but discards the value. The value is stored by generating a dedicated field (= the first one), writing the return value into that field, and setting a bit somewhere to indicate that we've in fact written a value (we can't use NULL / 0 because then returning integer 0 wouldn't work).

If the value is a T it can be lifted into a uni T through a recover. This is safe because upon termination any T returned can't have any references pointing to it in the old process.

Process messages is removed (i.e. no more fn async). Channels remain and would be used instead if some form of communication protocol is necessary. The concept of sendability remains (i.e. you still can't stick a ref User in a Channel). uni T values in turn are used if you want to transfer ownership of some complex data from one process to another, either via it capturing the variable or through a Channel.

Compared to the current process setup, structured concurrency would allow for more efficient fork-join workflows as processes can capture immutable data, something that's not possible with the class async setup (as immutable borrows aren't sendable).

For long-lived background processes, you simply create a top-level async of sorts, spawn the necessary processes in there, and include the rest of the logic in the async block, i.e:

async {
  spawn { background_work() }
  ... rest of the code ...
}

Related work

yorickpeterse commented 1 week ago

This approach has a challenge: if spawn expression can capture immutable borrows, then those borrows need to use atomic reference counting, as multiple processes may now mutate the reference count of the same value. Since we can't somehow automatically switch between the two, in practice that means always using atomic reference counting for borrows.

yorickpeterse commented 1 week ago

The problem here is related to https://github.com/inko-lang/inko/issues/750. I think we could do something like this to make things better:

ref T and mut T become compile-time borrows, and don't use reference counting. These borrows can't be returned or stored in other data, nor are they compatible with generic type parameters (unless they too use borrows). In practice this means you can only pass them down the stack through arguments. These borrows are created using a borrow expression that takes a list of variables and exposes them as borrows. Through shadowing, the variable we're borrowing can't be moved:

let a = [10, 20]
let b = [30]

borrow a, mut b {
  a # => ref Array[Int]
  b # => mut Array[Int]
}

Such borrows can be created for any type, though borrowing Int, Float, String and other value types should just be a compile-time error as that's likely a mistake.

If we introduce structured concurrency, we can allow sharing ref T values with different processes, though this technically requires violating the "you can't store them rule" as the borrow has to be stored in the process. However, due to the structured nature that should be sound.

For more complex cases you could use borrows that use runtime borrow counting, let's call those rc T and rc mut T. These behave exactly the same way as ref T and mut T currently do, i.e. creating and disposing of them mutates the borrow count. These RC borrows can only be created from owned values or other RC borrows. RC borrows can only be created for types stored in a stable location (= the heap). Compile-time borrows can't be created from RC borrows though, as this would allow you to drop the borrowed data while the compile-time borrow is still used.

This basic setup is interesting, but introduces a problem: if a ref T can't be created from an rc T, because we can't guarantee the borrowed data isn't dropped, then passing rc T values around becomes really annoying. What we could do is that if you borrow an rc T value using borrow, we still increment the reference count and decrement it when the borrow is done, but only at the start and end of the scope. Because this behaviour is transitive, if at any point somewhere inside the borrow we drop the borrowed data, the increment done at the start of the borrow triggers the borrow check to panic at runtime.

In addition, we can make things easier by introducing implicit borrow expressions when passing non-compile-time borrows to arguments that expect them, similar to our current auto borrow logic. That is, foo(data) turns into borrow data { foo(data) } if data is expected to be a compile-time borrow.

RC borrows can't be shared between processes since they use regular reference counting. We could allow using those by simply exposing rc T values as ref T inside spawn expressions, provided the values are defined outside the async:

let nums = [10, 20, 30]

async {
  nums # => ref Array[Int]

  spawn { 
    nums.size # => OK, since ref Array[Int] is sendable and defined outside the async
  }
}

rc mut T values are never sendable, nor are ref T/mut T values defined inside the async.

yorickpeterse commented 1 week ago

A further enhancement of the above idea: not being able to store ref T and mut T is annoying, as you can't do things like this:

fn eq(a: ref Foo, b: ref Foo) -> Bool {
  match (a, b) {
    ...
  }
} 

To make such patterns possible, we could allow storing borrows in data if and only if the borrow doesn't escape the surrounding scope/function. This likely requires more complicated escape analysis though, so a simplification could be this:

A borrow can be stored in another value, but only if the value is not assigned to a variable, and does not receive calls to fn mut and fn move methods.

The assignment restriction means we can keep the check local to the expression, while the call restriction prevents the borrow from escaping through instance methods that either mutate or move the receiver. This may be overly strict though.

yorickpeterse commented 1 week ago

The store restriction for compile-time borrows would be enforced as follows:

Not being able to capture borrows is annoying though, but allowing that would require escape analysis on the closure to determine if it outlives the borrow or not.

yorickpeterse commented 1 week ago

The borrowing proposal has a flaw: if a spawn expression can only capture variables defined outside the async scope, then a spawn can never capture the values yielded by an iterator. That is, code such as this would not compile:

let users = [User(...), User(...)]

async {
  users.iter.each(fn (u) {
    spawn { do_something(u) }
  }
}

Even if we did allow this somehow, there's the problem of borrowing: an iterator can't yield compile-time borrows because that would violate the "you can't store them" invariant. This means they'd have to produce RC borrows, which you can't capture in a spawn (and also wouldn't allow iterating over value types).

Of course we can still introduce structured concurrency and tackle the borrowing issue separately.

yorickpeterse commented 1 week ago

Here's another example of where structured concurrency is easier/requires less code:

class async Worker {
  fn async run(number: Int, out: Channel[Int]) {
    out.send(number * number)
  }
}

class async Main {
  fn async main {
    let numbers = [10, 20, 30]
    let chan = Channel.new(size: 3)

    numbers.into_iter.each(fn (num) { Worker().run(num, chan) })

    let mut total = 0
    let mut i = 0

    while i < 3 {
      total += chan.receive
      i += 1
    }
  }
}

This just illustrates a case where you have a bunch of values and want to compute something asynchronously, them collect the results in some way. This requires a bit of boilerplate (e.g. the class async definition), and manual fiddling with channels and loops. Of course you can abstract over that, but compare that to structured concurrency:

class async Main {
  fn async main {
    let numbers = [10, 20, 30]

    async {
      let handles = numbers
        .into_iter
        .map(fn (num) { spawn { num * num } })
        .to_array

      handles.into_iter.reduce(0, fn (acc, proc) { acc + proc.join })
    }
  }
}

This approach has several benefits:

yorickpeterse commented 1 week ago

Openflow uses class async a fair bit to expose shared state/mutation safely. At its core is a State type that's a class async, which is used for diffing and updating the state of the ventilation system. Different classes used for different types of inputs (e.g. a CO2 sensor or a motion sensor) use references to this State process to apply their changes.

A heavily condensed version of the State class would be something like this:

class Room {}

class async State {
  fn pub async mut add_room(room: uni Room) {}

  fn pub async mut prepare {}

  fn pub async mut update(block: uni fn (mut Map[String, Room]) -> Bool) {}
}

If we throw class async out the window and use structured concurrency, State becomes a regular class initialized in a spawn somewhere, and all clients use a Channel. The resulting setup would be something like this:

class enum Message {
  case AddRoom(uni Room)
  case Prepare
  case Update(uni fn (mut Map[String, Room]) -> Bool)
}

class State {
  let @chan: Channel[uni Message]

  fn mut run {
    loop {
      match @chan.receive {
        case AddRoom(r) -> add_room(r)
        case Prepare -> prepare
        case Update(f) -> update(f)
      }
    }
  }

  fn mut add_room(room: uni Room) {}

  fn mut prepare {}

  fn mut update(block: uni fn (mut Map[String, Room]) -> Bool) {}
}

The main loop here is what I wanted to avoid by adding class async to the language, and it's a pattern also seen in e.g. Erlang where it's handled using gen_server.

With that said, the class async setup does start to break down a little the moment you need to expose the output to another process. In such a case, you need to pass a Channel around somehow so the result can be sent over the channel.

yorickpeterse commented 1 week ago

To put it differently, there's some interesting opposites at play here:

Structured concurrency using the proposed async / spawn reduces boilerplate for fork-join style workloads, but requires a bit of extra boilerplate for long-lived processes. Long-lived processes in turn suffer from the exact opposite: they require more boilerplate for short-lived tasks, but for long-lived tasks it sort of evens out.

yorickpeterse commented 1 week ago

Another thing worth noting: when looking at code, async/spawn makes it clear we're starting a bunch of processes and waiting for them to finish. With class async this isn't clear because the syntax for spawning processes is the same as creating instances of regular classes. It's also not clear when the process finishes running.

yorickpeterse commented 1 week ago

Another thought: structured concurrency is nice, but if the only benefit is that after an async all processes are joined, then I'm not sure that's that much of a benefit. Or to put it differently, runaway processes/threads aren't that common of a problem.

In addition, channels were introduced to make fork-join workflows easier, but this sort of violates the general idea of actors. An alternative would be to (re)introduce the Future type, but also introduce a Futures type that allows for efficient "polling" of multiple futures.

A fully async model would be nicer, but this won't work well for fork-join workflows as the capabilities/messages of the calling process aren't necessarily known, so there's no way for child processes to communicate back their results.

yorickpeterse commented 1 week ago

I'm currently leaning towards keeping class async, and perhaps reintroduce futures but allow for more efficient polling. This would allow for bi-directional communication, but without the need for multiple-published-multiple-consumer channels, though we may want to keep those to allow e.g. distribution of jobs (https://github.com/inko-lang/inko/issues/681).

yorickpeterse commented 1 week ago

Using futures and processes, we can in fact implement something like Channel efficiently:

import std.clone (Clone)
import std.stdio (Stdout)

class Future[T] {
  # This would of course use something else in reality.
  let @inner: Channel[T]

  fn static new -> Future[T] {
    Future(Channel.new(size: 1))
  }

  fn move set(value: uni T) {
    @inner.send(value)
  }

  fn move get -> uni T {
    @inner.receive
  }
}

impl Clone[Future[T]] for Future {
  fn pub clone -> Future[T] {
    Future(@inner.clone)
  }
}

# How one could implement a Channel-like construct but using futures.
#
# This implementation is uses LIFO, but if we used a Deque we'd be able to turn
# it into a FIFO channel.
class async Chan[T] {
  let @values: Array[uni T]
  let @futures: Array[Future[uni T]]

  fn static new -> Chan[T] {
    Chan(values: recover [], futures: recover [])
  }

  fn async mut send(value: uni T) {
    match @futures.pop {
      case Some(f) -> f.set(value)
      case _ -> @values.push(value)
    }
  }

  fn async mut receive(future: uni Future[uni T]) {
    match @values.pop {
      case Some(v) -> future.set(v)
      case _ -> @futures.push(future)
    }
  }
}

class async Main {
  fn async main {
    let out = Stdout.new
    let chan = Chan.new

    chan.send(42)

    let fut = Future.new

    # This part is a little weird since we have to explicitly pass in the
    # futures.
    chan.receive(recover fut.clone)
    out.print(fut.get.to_string)
  }
}

The caveat is that making it a fixed-size channel is probably a little more tricky, and that there would be some extra overhead due to the message sending.

The way you'd use this for e.g. std.test is you'd create a Chan and pass it to each test runner. Then for each test you'd create a future and wait for it to be resolved. This works because we don't really care which test resolves what future, as long as we don't have to wait for all tests to finish before we can show anything.

yorickpeterse commented 1 week ago

Also worth adding: futures would need a shared internal state. This would require making them use atomic reference counting. This in turn means that values can be written and received multiple times, meaning they're technically a promise instead of a future. This in turn also means they're just channels with a capacity of 1, though this allows them to be a bit more efficient.

yorickpeterse commented 1 week ago

Futures/promises allowing multiple reads and writes might actually not be so bad: in the above example it means you can reuse the same future/promise for every test, instead of having to allocate one for each test.

yorickpeterse commented 6 days ago

Using futures, using processes is a bit like iterators: every time you want them to produce a result, you have to "advance" them by sending a message. Similar to external iterators, this can make certain implementations tricky. For example, if you have a process that walks through the files in a directory, you have two choices:

If we compare just futures with channels, channels are strictly superior in terms of flexibility because:

  1. They allow for multiple readers and writers
  2. You can send multiple values instead of just a single one
  3. You can use them for balancing workloads fairly across processes

The latter is also worth highlighting: if you have a bunch of work that needs to be performed across processes, a channel allows for that work to be balanced automatically (since it's just a shared FIFO queue). If all we had were futures, we'd need some sort of round-robin approach. This can result in worse performance if one process is performing a big job, because other processes can't steal the still pending messages.

yorickpeterse commented 6 days ago

A benefit in favour of class async over anonymous processes and channels: using class async, each message can have its own generic type parameters. This is something I use in Openflow:

class async State {
  fn pub async mut update_with[T](
    data: uni T,
    block: uni fn (mut Map[String, Room], T) -> Bool,
  ) {
    ...
  }
}

Using channels we'd have to define a Message[T] type, then assign T some dummy type whenever we create other messages (e.g. Message[Nil]). This can get out of hand quickly when many messages need their own type parameters.

yorickpeterse commented 6 days ago

Regarding load balancing: this can be achieved by spawning a process for each job, i.e. M processes for M jobs, instead of mapping M jobs onto N processes where N < M. In the case of std.test this would simplify the implementation a bit as well.

EDIT: actually, this isn't the case. The test suite has 1079 tests, and a bunch of these spawn Inko sub processes. Each of those uses a bunch of threads, which seems to trigger some thread/process count limit on my machine. In other words, there are times where you do need to limit the amount of concurrency and use channels to balance the load.

yorickpeterse commented 5 days ago

I'm going to leave this be for the time being. At this stage I think it's premature to replace our concurrency setup with something else, and I can't really think of anything better either. We'll likely revisit this at some point in the future.