mratsim / weave

A state-of-the-art multithreading runtime: message-passing based, fast, scalable, ultra-low overhead
Other
541 stars 21 forks source link

[RFC] Composability with async frameworks and other threading solutions. #132

Open mratsim opened 4 years ago

mratsim commented 4 years ago

This RFC supercedes:

Context

Weave is currently well-tuned to handle CPU-bound tasks:

However it needs better interoperability with:

Current limitations

1. Submitting jobs from dedicated services

Weave requires a task to be spawned on one of the worker thread including the root thread (the one which called init(Weave)). This is problematic because applications and libraries may want to use dedicated threads and submit tasks from them to Weave, as a common threadpool that handles all the workload.

From Intel Presentation of parallelizing the Conqueror's blade game: https://www.slideshare.net/IntelSoftware/01conquerorsbladegdc100pctaftertmnb

image

We can see the following dedicated threads:

This would also allow Weave to collaborate with other runtimes, from async, other threadpools or even when called from another language or an external library with it's own threadpool.

2. Latency, Fairness and Priority.

Weave tasks are processed in a LIFO order. This optimizes for throughput as the latest enqueued task is:

However, this is not fair, assuming we are in a server context, we might submit multiple cryptographic tasks if the worker comes back from business and pops the last requests, the very first client will wait much longer than the last to come. Similarly, in a game engine context, jobs will be submitted frame by frame and we want the first frame jobs to be dealt with first.

Secondly, a soft real-time system might want to submit task with various job priorities.

Proposed solution

On the dedicated thread side

We introduce a hidden lightweight JobEmitter context variable and a public setupDedicatedThread(Weave) procedure and to initialize it. They are used on threads created by createThread, raw thread, pthread or library provided threading solutions.

type
  JobEmitter = ptr object
    mempool: TLPoolAllocator # Thread-safe pool allocator, thread-local instance
    rng: RngState # Random number generator

var emitterCtx {.threadvar.}: JobEmitter

An Emitter interact with Weave and its worker via submit(myJob(a, b, c) for normal tasks and submitDelayed(pledge, (a, b, c) for continuations/callback/dataflow parallelism. This will select a worker from Weave at random (via the RNG) and push it the task. This mirrors spawn and spawnDelayed. Result is still returned via a Flowvar, and a while not myCompute.isReady(): poll(); doSomething() can be used to non-blockingly compose with the emitter work.

The memory pool is used to allocate the Flowvar (Future/ResultChannel) to await the result. If in an async context, isReady can be used to check if this would block.

Similar to how Asyncdispatch and Chronos distinguishes await (in an async proc) and waitFor(in the main thread), we might want a separate name from sync (in Weave threadpool), maybe reuse waitFor? Note: we don't need to do loadBalance() in a loop, as it's not used on a thread participating in Weave runtime. However, assuming we agree that all async libraries implement a poll() function we can do (when declared(poll): poll())

A teardownDedicatedThread(Weave) will also be provided to deallocate the mempool (which should be empty if all tasks were awaited).

The rngState can be seeded with the thread PID.

On Weave side

We add a new field to the global context which holds the communication channels: https://github.com/mratsim/weave/blob/46cf3232d6b05e225dce81f4d92facf85cfd6293/weave/datatypes/context_global.nim#L22-L49

type
  ComChannels* = object
    ## Communication channels
    ## This is a global objects and so must be stored
    ## at a global place.
    # - Nim seq uses thread-local heaps
    #   and are not recommended.
    # - ptr UncheckedArray would work and would
    #   be useful if Channels are packed in the same
    #   heap (with padding to avoid cache conflicts)
    # - A global unitialized array
    #   so they are stored on the BSS segment
    #   (no storage used just size + fixed memory offset)
    #   would work but then it requires a pointer indirection
    #   per channel and a known max number of workers

    # Theft channels are bounded to "NumWorkers * WV_MaxConcurrentStealPerWorker"
    thefts*: ptr UncheckedArray[ChannelMpscUnboundedBatch[StealRequest]]
    tasksInFlight*: ptr UncheckedArray[Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]]]
    tasksSubmitted*: ptr UncheckedArray[ChannelMpscUnboundedBatch[Task
    when static(WV_Backoff):
      parking*: ptr UncheckedArray[EventNotifier]

The previous tasks is renamed tasksInFlight and the new field is a Multi-Producer Single Consumer queue called tasksSubmitted.

The previous event loop of workers was https://github.com/mratsim/weave/blob/46cf3232d6b05e225dce81f4d92facf85cfd6293/weave/state_machines/event_loop.png

image

Zoom on "OutOfTask" state.

https://github.com/mratsim/weave/blob/46cf3232d6b05e225dce81f4d92facf85cfd6293/weave/state_machines/event_loop.nim#L99-L120

Like before, workers first check their local task queue, if they ran out of tasks, they try to steal new tasks. The theft is done via message-passing, and a worker detects that there is no task and should go to sleep by receiving back its own steal request. In the proposal, instead of immediately going to sleep a worker would pop the first task from its taskSubmitted queue and run it.

Analysis of the solution

We will use the following terminology:

This clearly solves (1): submitting jobs to Weave from a "foreign" thread.

Interestingly this also solves (2):

This means that:

Limitations

Alternative

Instead of randomly pushing tasks to one of the worker, we could have a global task queue for submitted tasks but:

Note on blocking tasks

In some cases, we might have to make a blocking call. As with an async IO runtime, a CPU-bound runtime like Weave does not like blocking calls. In both cases it removes a thread from scheduling and will halt handling tasks/futures/events. The way to handle that is to createThread and pass the blocking call to it.

However, for a call like readline, this means manually handling channel creation to get the result. And also if repeatedly called in an interactive CLI application, lots of thread creation and destruction and so kernel work and so context switching and so slowness/blocking of the creating thread.

Weave could create also a blocking threadpool, initially with no thread. With a dedicated spawnBlocking or submitBlocking, blocking tasks can be created on this threadpool, it would be much simpler, no load balancing, no message-passing (blocking tasks are not parallelized through a cluster). To ensure proper "garbage collection" for long running application, if at one point there was a burst of blocking threads, and then none, they could be made to sleep on a futex or a condition variable with a timed wait. On wakeup, if it was for a new request, service it, otherwise exit the thread.

Note on Weave root thread

In all the tests, Nim root thread and task was the same as the main function. We need to make sure that it can be on started from a dedicated thread as well, to allow architecture such as the game engine mentioned before.

Additionally, the init should be configurable with the number of threads (overriding the WEAVE_NUM_THREADS environment variable.

Lastly, Weave prevents creating more threads than the number of cores, we might want to relax that to create up to N+1threads. The rationale is that the root thread might be just an event loop that initialize an program, and then while true: poll(); loadBalance() but never calls sync() or syncRoot() and so we would lose 1 core for CPU-bound tasks.

Note on async poll()

Since Weave would compose seamlessly with async() runtimes which can be on dedicated event loop threads via submit, there is no need to add a per-thread hook in Weave to call poll() regularly, or inversely add a hook in async framework to call Weave'sloadBalance()`

olliNiinivaara commented 4 years ago

I'm not qualified to comment on technical implementation, but here are some general thoughts.

Nim stdlib's threadpool, spawn and parallel are still unstable/broken. We need a replacement for these basic primitives. With ability to set up dedicated threads and submit jobs to them Weave seems to deliver this.

Often submitted jobs are not only isolated, but they do not even return anything to the caller, because their output goes to devices or other processes. This situation should allow some nice optimizations like bypassing the memory pool.

I understand that optimizing for high throughput (LIFO) is ok internally when tasks are cooperative. But in server context incoming requests from clients (the jobs) are more like competitors and therefore optimizing for low latency (FIFO) is there the better choice. No need to go for the last mile because server load is random anyway, the proposed randomized fairness definitely prevents pathological latencies.

Support for existing async solutions is a nice addition but other qualities should not be traded for it. Weave is more fundamental - existing libraries can be adapted afterwards or new ones written that play well with Weave. (Async/await is concurrency concept for single-threaded programming languages anyway, we may not need one.)

For example it seems not a good idea to run Nim's asyncdispatch on top of Weave. It basically calls selector.Selectinto with timeout of 500ms (hard coded) in an endless loop. Any thread where you start this will not be able to do any work with Weave tasks...

Well, my Guildenstern does basically the same, but as soon as there are tasks in flight, the timeout will be dropped to zero and it will call weave synchronization operations in the loop to play nice with it.

I'm speculating that following operation could make my main event loop to play even nicer: syncRoot(m, n) where m is minimum number of tasks that must be in-flight before and n is maximum number of tasks in-flight after. For example, executing syncRoot(3,2) would mean that run syncRoot only if there are at least 3 tasks in-flight, and stop syncing as soon as there are no more than 2 tasks in-flight.

Another idea: loadBalance(t) where t is (approximate) time in milli(or even nano)seconds that other tasks may run. This would be like calling OS sleep(t), but instead of letting OS scheduler select next process, Weave would select other tasks that may be run even in the same thread. One use case would be calling this in loop while waiting for I/O.

Ok, my mental model of how Weave works might not completely align with reality, but I hope these comments may give you some inspiration anyway.

olliNiinivaara commented 4 years ago

File upload/download between hard drive and network socket should be a fruitful practical application area to experiment with parallel device I/0: file device <-> SPSC channel <-> network device

In async/await -terms the SPSC channel seems to be called FutureStream: https://github.com/nim-lang/Nim/blob/version-1-2/lib/pure/asyncstreams.nim#L17

Setting: