nim-lang / RFCs

A repository for your Nim proposals.
135 stars 26 forks source link

Executors, task parallelism, channels APIs #347

Open mratsim opened 3 years ago

mratsim commented 3 years ago

This is an API specification for:

The RFC is also available in Weave: https://github.com/mratsim/weave/blob/ba99dce/rfcs/multithreading_apis.md, it might be easier to discuss it in a PR.

I can split the RFC in 3.

---
rfc: -1
title: Executors, task parallelism, channels APIs
author: Mamy André-Ratsimbazafy (@mratsim)
type: Library interoperability standard
category: Multithreading
status: Draft
created: 2020-03-04
license: CC0 1.0 Universal
---

Executors, task parallelism, channels APIs

Abstract

This document:

Table of Contents

Introduction

The Nim programming language has significantly evolved since its threadpool and channels modules were introduced in the standard library.

Since then, few projects actually used channels or threadpools in part due to:

With the progress of core low-level primitives and compiler guarantees, Nim foundations are becoming solid enough to build an ergonomic and safe multithreading ecosystem.

This document specifies the multithreading interfaces related to channels and threadpools so that they can be evolved or replaced with principled foundations.\ This document is interested in the public user API and does not specify the underlying implementation.\ This document does not require compiler support. Primitives can be implemented as a library.\ This document also defines related multithreaded concepts that may be implemented in some libraries without specifying them. The interface is left open until more feedback is gathered.\ This document is written under the assumptions that there is no one-size-fits-all and that projects may want to use multiple threadpools, executors or schedulers within the same application or library. Furthermore, libraries may offer to support multiple parallelism backends and as a project evolves dedicated specialized threadpools may be written that are tuned to specific workloads.

The core primitives mentioned that facilitate multithreading are:

This blog post explains why it's important to embrace multiple schedylers:

Multithreading is a very large subject that cannot be covered in a single specification. Nonetheless to facilitate future RFCs, this document will define terms that are likely to come up in future specifications in a non-goals section.

While channels and task parallelism API are presented in the specification document, they MAY be implemented in different libraries.

Requirements notation

The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT", "SHOULD", "SHOULD NOT", "RECOMMENDED", "NOT RECOMMENDED", "MAY", and "OPTIONAL" in this document are to be interpreted as described in the Internet Engineering Task Force (IETF) BCP14, RFC2119, RFC8174 when, and only when, they appear in all capitals, as shown here.

Definitions

Definitions are ordered in lexicographical order. Definitions may include specification or implementation recommendation.

Channels

A channel is a cross-thread communication queue or ring-buffer.

A channel can come in many flavors:

Note: not using locks does not imply that a channel is lockfree. lockfree is a progress guarantee, even without locks an implementation can block. We call such channels lockless. It is worth noting that usually the stronger the progress guarantees the less throughput your channel has. Strong progress guarantees are needed for low-latency or real-time processing.

For reference types, a channel transfers ownership of the reference. A channel MUST NOT copy, this is required for both correctness and performance. In terms of distributed systems, we require that shared-memory channels achieve exactly-once delivery. This ensures that producer and consumer can rely on the message being sent and received. It also prevents side-effect being executed twice if a message is duplicated for example sending "launch missiles!".

A channel is shared by at least 2 threads. Shared ownership requires ensuring that the shared object is not freed twice and not freed before its last users.

Closures

A closure is a data structure representing procedure and its parameters (environment).

For multithreading purpose, it MUST be using a threadsafe memory allocator and/or garbage collection. Nim closures with Nim default refc GC is not compatible with multithreading. Closures SHOULD be non-copyable, only movable. A closure might free a temporary resource at the end of a computation (CPU or GPU memory, a socket, a database handle), that resource might be manually managed or managed by destructors.

Concurrency

Concurrency is the ability to make progress on more than one task at a time. Parallelism imply concurrency but concurrency does not imply parallelism. A program may interleave execution of task A, then task B, then task A on a single thread and be concurrent and not parallel.

Since IO workloads requires waiting to make progress and CPU workloads require working to make progress, concurrency is often associated with IO-bound tasks and parallelism with CPU-bound tasks.

Continuations

A continuation represents the state of execution of a program at a certain point.

Running a continuation would restore that state and run the program from that point.

Concretely, a continuation is a data structure with a procedure that represents the following statements (instructions) to run and all the parameters required to run them (environment).

At a data structure level, a closure and a continuation have the same fields.

Executor & execution context

An executor is a handle to an execution context. An execution context represents:

Parallelism

Parallelism is the ability to physically run part of a program on multiple hardware resources at the same time.

Fork-join model

The fork-join model allows execution of a program to branch off ('fork') on two threads at designated points in the programs. The forks can be 'joined' at designated points.

Fork points are often called spawn including in Nim threadpool. Join points are often called sync. They are called ^ in Nim threadpool.

The fork-join model can be implemented on top of:

At a fork point, multiple dispatching strategies can be used, for example on parallel merge sort:

proc mergeSort[T](a, b: var openarray[T]; left, right: int) =
  if right - left <= 1: return

  let middle = (left + right) div 2
  let fut = spawn mergeSort(a, b, left, middle) # <-- Fork child task
  # ------------------------------------------------- Everything below is the continuation
  mergeSort(a, b, middle, right)
  sync(fut)                                     # <-- Join
  merge(a, b, left, middle, right)

At the spawn points:

A high-level overview of continuation-stealing vs child-stealing is given in

Task parallelism

Task parallelism dispatches heterogeneous (procedure, data) pairs on execution resources.

The fork-join model is the usual way to split a program into tasks.

Task

A task is a data structure representing a procedure, its parameters (environment) and its execution context.

Closures MAY be packaged in a task. The task data structure is left unspecified.

Threadpool

A threadpool is an executor with a naive scheduler that has no advanced load balancing technique.

Threads & Fibers

A thread is a collection of execution resources:

With OS (or kernel) threads, the stack is managed by the kernel.

Userspace threads where the library allocates, switches and manages its own stack are called fibers (and also stackful coroutines). std/coro is an implementation of fibers.

GPUs also have threads, their stack is managed by the GPU driver.

Task Parallelism API

Task parallelism is implemented via the fork join model. This is the model used in:

The API has a REQUIRED and OPTIONAL interface. Some APIs are unspecified.

The API intentionally mirrors async/await without reusing the names to differentiate between concurrency (async/await) and parallelism (spawn/sync).

Creating a task

spawn: scheduling left to the executor [REQUIRED]

A task-parallel executor MUST implement either the local executor or the global executor API. It MAY implement both.

spawn eagerly adds the task to the executor for execution as soon as an execution resource is available. Scheduling is not delayed until a graph of tasks is built (see dataflow parallelism in Non-goals).

Local executor API

A new task MUST be created on a specific executor with a template or macro with the following signature

macro spawn(ex: MyExecutor, procCall: typed{nkCall | nkCommand}): FlowVar or void
  ## Executor-local spawn

For example

# var pool: MyExecutor
# ...
let fut = pool.spawn myFunc(a, b, c)
Global executor API

A new task MUST be created on a global executor with a template or macro with the following signature

macro spawn(procCall: typed{nkCall | nkCommand}): FlowVar or void
  ## Global spawn

For example

let fut = spawn myFunc(a, b, c)

Disambiguation between global executors can be done by prefixing the module name.

let fut = threadpool.spawn myFunc(a, b, c)

Global executors MAY use a dummy type parameter to refer to their global executor.

macro spawn(_: type MyExecutor, procCall: typed{nkCall | nkCommand}): FlowVar or void
  ## Global spawn

let fut = MyExecutor.spawn myFunc(a, b, c)
Future handle

spawn should return a future handle under the type FlowVar[T] or void. T is the type returned by the procedure being forked. If it returns void, spawn returns void.

If the future of a void procedure is needed, the end user SHOULD wrap that function in a function with a return value, for example a function that returns bool or refactor their code.

Scheduling

spawn is a hint to the executor that processing MAY happen in parallel. The executor is free to schedule the task on a different thread or not depending on hardware resources, current load and other factors.

At a spawn statement, the threadpool implementation may choose to have the current thread execute the child task (continuation-stealing) or the continuation (child-stealing).

Scheduling is done eagerly, there is no abstract computation graph being built that is launched at a later point in time.

Awaiting a future [REQUIRED]

The operation to await a task-parallel future is called sync. This leaves await open for async libraries and framework. It is also the usual name used in multithreading framework, going back to Cilk (1995).

Awaiting a single future [REQUIRED]

Program execution can be suspended until a Flowvar is completed by calling sync on a Flowvar.

proc sync(fv: sink FlowVar[T]): T
  ## Global spawn

A Flowvar can only by synced once, hence the return value within the FlowVar[T] can be moved. Assuming the task completes this ensure exactly-once delivery for Flowvars.

A Flowvar MUST NOT be copied, only moved. This SHOULD be enforced by overloading =copy.

proc `=copy`(dst: var FlowVar[T], src: FlowVar[T]) {.error: "A FlowVar cannot be copied".}

If a Flowvar is created and awaited within the same procedure, the Flowvar MAY use heap allocation elision as an optimization to reduce heap allocation.

At a sync statement, the current thread SHOULD participate in running the executor tasks. The current thread MAY choose to only process tasks that the awaited Flowvar depends on to optimize latency.

Awaiting multiple futures [UNSPECIFIED]

Awaiting multiple futures ("await until any" or "await until all") is unspecified.

Structured parallelism [OPTIONAL]

Structured parallelism ensures that all tasks and their descendants created within a scope are completed before exiting that scope.

template syncScope(ex: MyExecutor, scope: untyped): untyped
template syncScope(scope: untyped): untyped

References:

More recently, concurrency frameworks also converged to similar "structured concurrency" (https://en.wikipedia.org/wiki/Structured_concurrency).

Check if a task was spawned [REQUIRED]

And user can call isSpawned() to check if a Flowvar is associated with a spawned task.

proc isSpawned(fv: Flowvar): bool

This allows users to build speculative algorithms that may or may not spawn tasks (unbalanced task tree), for example nqueens with backtracking.

Check if a result has been computed [REQUIRED]

An user can call isReady on a Flowvar to check if the result is present.

proc isReady(fv: Flowvar): bool

The Flowvar MUST be associated with a spawned task.

This allows users to know if the current thread would block or not when calling sync on A Flowvar.

Error handling

Procs that are spawned MUST NOT throw an exception. They MAY throw Defect which would end the program. Exceptions MUST be trapped by try/except blocks and converted to another form of error handling such as status codes, error codes or Result[T, E] type.

Threadpool and task parallelism libraries SHOULD document that constraint to end users and SHOULD enforce that constraint with the effect system.

Note: even assuming C++ exceptions, or exceptions that don't use the heap or a GC without thread-local heap, exceptions work by unwinding the stack. As each thread has its own stack, you cannot catch exceptions thrown in a thread in another.

Thread-local variables

Procs that are spawned MUST NOT use thread-local storage unless they are internal to the executor library. Executors make no guarantees about the thread of execution.

Threadpool and task parallelism libraries SHOULD document that constraint to end users.

Cancellation

Caller -> Callee

This RFC does not require cancellation primitives so that the caller can cancel the callee.

Rationales:

Alternative:

Callee -> Caller

The callee can notify the caller that it was cancelled with boolean, enums or Result[T, E]

Buffered channels

Channels are a basic inter-thread communication primitive. This specifies buffered channels, i.e. channels that can hold at least one item.

Unbuffered channels, also called rendez-vous channels, are unspecified.

The channel flavors should be communicated clearly in-particular:

Non-blocking send [REQUIRED]

func trySend*[T](chan: var Chan, src: sink Isolated[T]): bool =
  ## Try sending an item into the channel
  ## Returns true if successful (channel had enough free slots)
  ##
  ## ⚠ Use only in the producer thread that writes from the channel.

Non-blocking receive [REQUIRED]

proc tryRecv[T](chan: var Chan, dst: var Isolated[T]): bool =
  ## Try receiving the next item buffered in the channel
  ## returns true if an item was found and moved to `dst`
  ##
  ## ⚠ Use only in a consumer thread that reads from the channel.

Blocking send [OPTIONAL]

func send*[T](chan: var Chan, src: sink Isolated[T]): bool =
  ## Send an item into the channel
  ## (Blocks/Overwrites oldest) if channel if full
  ## Returns true if the channel was full and mitigation strategy was needed.
  ##
  ## ⚠ Use only in the producer thread that writes from the channel.

Blocking send still returns a bool for backpressure management. If blocking or overwriting the oldest is chosen, sending is always successful if the function returns.

Blocking receive [OPTIONAL]

proc recv[T](chan: var Chan, dst: var Isolated[T]): bool =
  ## Receive the next item buffered in the channel
  ## Blocks and returns true if no item is present
  ## Returns the item immediately and returns false if no blocking was needed.
  ##
  ## ⚠ Use only in a consumer thread that reads from the channel.

Blocking receive still returns a bool for backpressure management.

Batched operations [OPTIONAL]

We define batch operations for list-based channels.

proc trySendBatch[T](chan: var Chan, bFirst, bLast: sink Isolated[T], count: SomeInteger): bool =
  ## Send a linked list of items to the back of the channel
  ## They should be linked together by their next field.
  ## `count` refer to the number of items in the list.
  ## Returns true if successful (channel had enough free slots)

proc tryRecvBatch[T](chan: var Chan, bFirst, bLast: var Isolated[T]): int =
  ## Try receiving all items buffered in the channel
  ## The channel may not return all items due to in-progress competition
  ## between the consumer and producer(s)
  ##
  ## Items are returned as a linked list
  ## Returns the number of items received
  ##
  ## If no items are returned bFirst and bLast are undefined
  ## and should not be used.
  ##
  ## The `next` field in `bLast` is undefined.
  ## nil or overwrite it for further use in linked lists

Working with integers in a synchronization primitive like channels MUST NOT throw an exception.

Elements count [OPTIONAL]

Channels MAY keep track of the elements enqueued or dequeued. In that case they MAY provide an approximate count of items with peek.

Working with integers in a synchronization primitive like channel MUST NOT throw an exception.

peek MUST NOT block the caller. Due to the non-deterministic nature of multithreading, even if a channel is locked to get the exact count, it would become an approximation as soon as the channel is unlocked.

If called on a channel with a single consumer, from the consumer thread, the approximation is a lower bound as producers can enqueue items concurrently. If called on a channel with a single producer, from the producer thread, the approximation is a lower bound as consumers can dequeue items concurrently. If called on a channel with multiple producers, from a producer thread, no conclusion is possible as other producers enqueues items and the consumer(s) thread dequeue(s) them concurrently. Similarly on a channel with multiple consumers, from a consumer thread.

API + documentation on a MPSC channel.

func peek*(chan: var Chan): int =
  ## Estimates the number of items pending in the channel
  ## - If called by the consumer the true number might be more
  ##   due to producers adding items concurrently.
  ## - If called by a producer the true number is undefined
  ##   as other producers also add items concurrently and
  ##   the consumer removes them concurrently.
  ##
  ## This is a non-locking operation.

Non-goals

Experimental non-blocking Task Parallelism API

In the proposed API, the scheduler may have the spawning thread participate in clearing the work queue(s) at sync statements. If the spawning thread is handling network or UI event this is would block network or UI handling, in that case it is desirable to ensure that the work cannot happen in the spawning thread. This proposes submit, Job, Pending that mirror spawn, Task, Flowvar.

It is left unspecified for now as given the competing latency (IO schedulers) vs throughput (CPU schedulers) requirements, there might be no use case for submit that isn't better covered by allowing multiple specialized schedulers in a program.

Alternatively, the application could be separated in an UI/networking thread and a heavy processing thread with the heavy processing thread managing a threadpool.

Creating a task

submit: scheduling MUST NOT block the submitter thread [Experimental]

submit is a tentative alternative to spawn that guarantees execution on a different thread. Similar to spawn the API would be:

macro submit(ex: MyExecutor, procCall: typed{nkCall | nkCommand}): Pending or void
  ## Executor-local submit
macro submit(procCall: typed{nkCall | nkCommand}): Pending or void
  ## Global submit

submit, Job, runInBackground(Weave) and setupSubmitterThread(Weave) were added to Weave following

Definitions

This section gives a definition of other terms related to multithreading and async so that there is common vocabulary within the Nim community to talk about those concepts.

However it does not specify them.

Non-goals not covered in this specification

Araq commented 3 years ago

proc trySendBatch[T](chan: var Chan, bFirst, bLast: sink Isolated[T], count: SomeInteger): bool is a most alien construct. Instead we should use array based channels and indexing. Are there any good reasons for list based channels?

mratsim commented 3 years ago

proc trySendBatch[T](chan: var Chan, bFirst, bLast: sink Isolated[T], count: SomeInteger): bool is a most alien construct. Instead we should use array based channels and indexing. Are there any good reasons for list based channels?

Good point we might want another overload for array-based channels.

List-based channels are necessary for:

That doesn't mean we need to provide them in the standard library, but this would make channels interface consistent across all future libraries.

mratsim commented 3 years ago

For array-based channels we can have the following

proc trySendBatch[T](chan: var Chan, src: var openArray[Isolated[T]]): bool =
  ## Send an array of items to the back of the channel
  ## Returns true if successful (channel had enough free slots)
  ##
  ## This destroys `src` on success

proc tryRecvBatch[T](chan: var Chan, dst: var openArray[Isolated[T]]): int =
  ## Try receiving all items buffered in the channel
  ##
  ## The channel may not return all items due to in-progress competition
  ## between the consumer and producer(s)
  ##
  ## Returns the number of items received

Now I realized an issue with sink, for bounded channels, sending can fail, meaning a signature like this:

func trySend*[T](chan: var Chan, src: sink Isolated[T]): bool

is incorrect because in case of failure the sender will likely want to resend later so sink would introduce a copy which should not happen (and implied by Isolated). So we can only use sink for unbounded channel while channels that can block have to use

func trySend*[T](chan: var Chan, src: var Isolated[T]): bool

thoughts?

Araq commented 3 years ago

thoughts?

In the new channel implementation (based on yours) I use a bounded channel with a condition variable so that there is a blocking send that doesn't fail and can take a sink. For a failable trySend you probably need to take the message as var Isolated[T]. It can be emptied on success, otherwise the location is not mutated and can be send again. I think (it's late here).

saem commented 3 years ago

If the return type is not simply an int, but instead an opaque type that can be queried about the number of items, via accessor of some variety. Then could said opaque type also return the non-consumed part of the array or the entire array wholesale? If so would that not allow for a "free" sink?

Apologies, still rather green with ARC/destructors.

mratsim commented 3 years ago

If the return type is not simply an int, but instead an opaque type that can be queried about the number of items, via accessor of some variety. Then could said opaque type also return the non-consumed part of the array or the entire array wholesale? If so would that not allow for a "free" sink?

Apologies, still rather green with ARC/destructors.

In the future it makes sense to allow returning iterators so that you can parallelize infinite streams. Requiring the number of items is probably too restrictive.

saem commented 3 years ago

If the return type is not simply an int, but instead an opaque type that can be queried about the number of items, via accessor of some variety. Then could said opaque type also return the non-consumed part of the array or the entire array wholesale? If so would that not allow for a "free" sink? Apologies, still rather green with ARC/destructors.

In the future it makes sense to allow returning iterators so that you can parallelize infinite streams. Requiring the number of items is probably too restrictive.

It was late, I muddled the received count of tryRecvBatch with the bool of trySendBatch. My suggestion with a dedicate return type, besides being more intention revealing, was to allow for easier migrations come API changes. Wherein the type name is likely to not change:

So beyond this particular call, I meant it as a stylistic thing for the API design for the calls as they return status. This by no means will mitigate all circumstances.

saem commented 3 years ago

spawn should return a future handle under the type FlowVar[T] or void.

Any reason to support void, between composition and knowledge of completion even in the void case being a desireable thing, strictly void seems more exceptional.

If void is truly required, I would explicitly not call it spawn, and that should be more akin to fireAndForget (not an actual name suggestion).


Check if a result has been computed [REQUIRED] An user can call isReady on a Flowvar to check if the result is present.

proc isReady(fv: Flowvar): bool

The Flowvar MUST be associated with a spawned task.

This is mostly to test my own understanding of the intention here, am I on the right track with:

This reads to me as if the spawner is slow to consume, then the executor cannot free resources until that happens. This is also preferred as the fork-join model is typically used in cases where the issuer/consumer are one in the same and the issuer should be "in charge".


Overall API Thoughts:

I really like the clarity of this, it's awesome.

A potential issue with the API overall, especially with only having isReady on FlowVar is that there are no lightweight mechanism for a set of consumers to have task load/commit information signaled by the scheduler to a consumer. In most cases wherein one controls all the code this is fine, but especially with libraries involved that's not ideal.

Can we disallow Defect in any spawn'd proc, using it needs to be discouraged, and is a bug outside of a main module in most cases. The API here should inform better practices and the gentle pressure on library code is likely a better direction.

planetis-m commented 3 years ago

Heads up, the definition of trySend plus the isolated concept, has flaws that are independed of an implementation. So lets look at the first example, with code addapted from https://github.com/nim-lang/RFCs/issues/244

proc producer =
  for i in 0 ..< numIters:
    var p = isolate(Foo(id: $(i + seed))) # crashes
    while not chan.trySend(move p): cpuRelax()

Here is just the while not trySend, it generates:

while (1) {
  tyObject_Isolated__rEIebJMJoptZEKusFoG28w T7_;
  NIM_BOOL T8_;
  nimZeroMem((void*)(&T7_), sizeof(tyObject_Isolated__rEIebJMJoptZEKusFoG28w));
  T7_.value = p.value;
  nimZeroMem((void*)(&p), sizeof(tyObject_Isolated__rEIebJMJoptZEKusFoG28w));
  T8_ = (NIM_BOOL)0;
  T8_ = push_tring_236((&rng_tring_12), T7_);
  if (!!(T8_)) goto LA6;
  cpuRelax_system_2802();
} LA6: ;

As you can see if it fails the first time, p would be moved and empty, causing it send empty data. It can be worked-around with:

while not chan.trySend(isolate(Foo(id: $(i + seed)))): cpuRelax()

But here is another fault, inside the implementation of trySend, pseudocode with expandarc:

  if channelBufferIsFull:
    result = false
  else:
    `=sink`(this.data[head], extract value)
    result = true
  `=destroy`(value)

As you can see value is destroyed inside trySend. It can also be worked-around with .nodestroy.

On the other hand with these workarounds if you only trySend once, your data may be leaked.

Upd: So the correct function signature is func trySend*[T](chan: var Chan, src: var Isolated[T]): bool

planetis-m commented 3 years ago

...also the templates that hide Isolated[T] need warnings about multiple evaluations when used in a loop.

planetis-m commented 1 year ago

I found a new issue with the trySend template https://github.com/nim-lang/threading/issues/20 Any ideas?