symmetryinvestments / concurrency

Concurrency primitives
MIT License
33 stars 9 forks source link

Structured Concurrency

   Provides various primitives useful for structured concurrency and async tasks.

Senders/Receivers

A Sender is a lazy Task (in the general sense of the word). It needs to be connected to a Receiver and then started before it will (eventually) call one of the three receiver methods exactly once: setValue, setDone, setError.

It can be used to model many asynchronous operations: Futures, Fiber, Coroutines, Threads, etc. It enforces structured concurrency because a Sender cannot start without it being awaited on.

setValue is the only one allowed to throw exceptions, and if it does, setError is called with the Throwable. setDone is called when the operation has been cancelled.

See https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2022/p2300r5.html for the C++ proposal for introducing Senders/Receivers.

Currently we have the following Senders:

Writing your own Sender

Most of the asynchronous tasks you will do involve writing your own Sender.

Here is the implementation of the ValueSender.

/// A Sender that sends a single value of type T
struct ValueSender(T) {
  alias Value = T;
  T value;
  static struct Op(Receiver) {
    Receiver receiver;
    T value;
    @disable this(ref return scope typeof(this) rhs);
    @disable this(this);
    void start() {
      receiver.setValue(value);
    }
  }
  Op!Receiver connect(Receiver)(Receiver receiver) {
    // ensure NVRO
    auto op = Op!(Receiver)(receiver, value);
    return op;
  }
}

A ValueSender!int is nothing more than a int wrapped in a struct with a connect method. It can be constructed and passed around, but it won't produce a value until it is connected and started. The Op object (operational-state) returned by connect represents the state of a connected Sender/Receiver pair, which in case of the ValueSender includes the value to be send. After connecting the operational-state still need its start method called, before it actually produces a value.

A Receiver needs to implement the setValue, setError and setDone. A Sender is required to call exactly one of the three functions once. Both setError and setdone are required to be nothrow. If setValue is not nothrow then the Sender must call setError if setValue throws.

Most Senders should call receiver.getStopToken to retrieve a stoptoken by which they can be notified (or polled) whether they are cancelled. See the section of stoptokens how this works.

Operations

Senders enjoy the following operations.

Streams

A Stream has a .collect function that accepts a shared callable and returns a Sender. Once the Sender is connected and started the Stream will call the callable zero or more times before one of the three terminal functions of the Receiver is called.

An exception throw in the callable will cancel the stream and complete the Sender with that exception.

Streams can be cancelled by triggering the StopToken supplied via the Receiver.

The callable supplied to the Stream has to annotated with shared because the execution context where the callable is called from is undefined.

Currently there are the following Streams:

With the following operations:

Most of the time you will need to write your own Stream however. The following helpers can speed that up:

Scheduler

Schedulers create Senders that run on specific execution contexts. A Sender can query a Receiver with .getScheduler() to get a Scheduler and from there can schedule additional tasks to be ran immediately or after a certain Duration.

syncWait automatically inserts a LocalThreadScheduler with a timingwheels implementation to fulfull the Scheduler contract. This means that by default any Sender can schedule timers that run on the thread that awaits the whole chain.

For testing purposes there is a ManualTimeScheduler which can be used to advance the timingwheels manually.

ThreadPool

stdTaskPool creates a RAII thread pool where Senders can be scheduled on using the .on scheduling operator. Both the sender scheduled will run in the thread pool as well any additional scheduled Senders using getScheduler. It uses the std.parallelism's TaskPool implementation underneath.

Nursery

A place where Senders can be awaited in. Senders placed in the Nursery are started only when the Nursery is started.

In many ways it is like the whenAll, except as an object. That allows it to be passed around and for work to be registered into it dynamically.

Cancellation

Cancellation is a very important aspect of asynchronous work. So much so that it is baked-in into the Receiver's API. It is a cooperative mechanism however, and it requires each Sender to respond to requests. In response to a stop request a Sender should call the setDone function, so that any downstream work is cancelled as well. However, in case of a race, it is perfectly fine to terminate with any of the other termination functions.

See http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2020/p2175r0.html for a thorough explanation for why we need stop tokens in particular and cancellation in general.

Cancellation requests happen through the use of a StopSource and a StopToken. Each Sender should request a StopToken from its Receiver by calling getStopToken. Senders should respond to cancellation through polling the StopToken's isStopRequested method or calling the onStop method to attach a callback. Note that the callback might be called immediately in case a stop has already been requested.

By default the syncWait call will create a StopSource and a StopToken. The StopSource will be connected to any enclosing syncWait operation or otherwise to the globalStopSource. You can supply a StopSource to the syncWait function explicitely, but note that it won't be connected to any enclosing StopSource, for which you are responsible yourself.

Signals and termination

By default the library sets up a signal handler on the first use of syncWait. It spins up a dedicated thread to listen for both SIGINT and SIGTERM. Either signal causes the globalStopSource's stop method to be called on that dedicated thread.

All calls to syncWait that do not supply a StopSource explicitely will create a StopSource which will be connected to the globalStopSource, or, in the case of nested syncWaits, to the parent StopSource.

This ensures that by default both SIGINT and SIGTERM will cancel all work.

This behavior can be overridden by calling setGlobalStopSource before any call to syncWait. If you do so you are responsible for setting up signal handlers yourself. See the functions in signal.d, specifically setupCtrlCHandler.

Additional methods of termination

In certain scenarios you want to have additional ways to terminate outstanding work.

Simply calling globalStopSource().stop() will cause any work to be cancelled. It will use the current thread to run all stop callbacks.

If you want the termination to happen asynchronously, for instance because the current thread is not async-safe, you can call SignalHandler.notify(SIGINT). Note that this does rely on SignalHandler.launchHandlerThread to be called at one point. This happens by default unless you call setGlobalStopSource and it returns true. In that case you need to call SignalHandler.launchHandlerThread yourself too. See the functions in signal.d.

Dynamic libraries

The concurrency library is designed to work with dynamic libraries. It exports 2 functions that it uses to load important globals and thread-locals from the host process.

On Posix, the only requirement is that the linker you are using supports --export-dynamic-symbol (at least gold, lld do).

On Windows, you need to export the 2 symbols explicitly from the executable to make the whole process (i.e., DLLs with statically linked concurrency) use these. This can be achieved by adding according linker flags for the executable, e.g., in dub.sdl:

lflags "/EXPORT:concurrency_getLocalThreadExecutor" "/EXPORT:concurrency_globalStopSourcePointer" platform="windows"

DSemver

This package uses dsemver to calculate the next semantic version.

run dub run dsemver@1.1.0 -- -p $(pwd) -c to calcuate the next version.