JackKelly / hypergrib

Lazily open petabyte-scale GRIB datasets consisting of trillions of GRIB messages from xarray
MIT License
29 stars 0 forks source link

DESIGN DISCUSSION: How to execute a DAG of IO and compute tasks? 2 Tokio threadpools? Or Tokio + Rayon? #10

Open JackKelly opened 2 months ago

JackKelly commented 2 months ago

Andrew Lamb at InfluxData wrote a blog post (in 2022) making compelling arguments for scheduling CPU-bound tasks using Tokio. The essential "trick" is to use two Tokio threadpools: one for IO, and another for CPU-bound tasks (so that CPU-bound tasks don't block IO tasks).

For hypergrib, it might be nice to be able to use Rust's async API to naturally express a directed acyclic graph (DAG) of tasks. For example:

graph TD
    L1[Load GRIB message 1] --> D1[Decode msg 1] --> M[Merge into final array]
    L2[Load GRIB message 2] --> D2[Decode msg 2] --> M[Merge into final array]

Andrew Lamb's blog post suggests using two Tokio threadpools. Andrew's implementation involves ~750 lines of custom Rust code (including tests).

If we really wanted to avoid using Rayon (and use two Tokio threadpools) then I think we could do it by "just" creating two Tokio threadpools. Something like:

use tokio::runtime::Runtime;

// Create the runtime
let cpu_runtime  = Runtime::new().unwrap();

// Execute the future, blocking the current thread until completion
cpu_handle = cpu_runtime.spawn(cpu_main);

let io_runtime = Runtime::new().unwrap();
io_handle = io_runtime.spawn(io_main);

cpu_handle.await??;
io_handle.await??;

(Although I'm really not sure if that'll work! And I'm not sure how to pass Futures between the two runtimes?)

On ballance, I think I prefer Alice Ryhl's recommendation of using Tokio with Rayon, and using a tokio::sync::oneshot::channel to pass things between Tokio and Rayon. I'm 99% sure this'll still allow us to construct a DAG of tasks. And feels like it'll result in less code in hypergrib. And, crucially, we may have tasks that run a long time (seconds?), but Andrew Lamb suggests that, even when using two tokio threadpools, tasks in the CPU threadpool still shouldn't block for more than something like 100ms. But it does add a pretty heavyweight dependency (Rayon).

Further reading

JackKelly commented 2 months ago

Actually, on second thoughts, maybe it's easy to just have a struct which holds the two Runtime instances, and we pass around that struct, and call struct.cpu.spawn. Or we don't pass the struct around and just have a single function which sends tasks to the Runtimes.

Although it's possible that Tokio doesn't allow multiple Runtimes to be installed on the same thread, hence the complexity in Andrew Lamb's solution.

JackKelly commented 2 months ago

This post seems to suggest that it's fine to start multiple Runtimes from the same thread: https://matthewtejo.substack.com/p/building-robust-server-with-async

JackKelly commented 2 months ago

There's still the concern that some of our tasks might take a long time (multiple seconds). I should write a little test to check the tokio scheduler can still run. For example, start a runtime with 2 worker threads. Submit one 1-sec task and ten 100ms tasks, and check that Tokio can schedule the 100ms tasks on the second thread. The whole thing should execute in 1 sec.

JackKelly commented 2 months ago

I've written a little test, where I submit some 1s tasks, and some 100ms tasks to the same tokio scheduler and it "does the right thing"! Code here: https://github.com/JackKelly/learning_rust/blob/main/tokio_long_tasks/src/main.rs

JackKelly commented 2 months ago

OK! I think I've got it working nicely with two runtimes, and without much additional complexity.

Here's the code: https://github.com/JackKelly/learning_rust/blob/e06d97a409bfe6afea4e1a5ce5409a78ba7128c0/tokio_two_runtimes/src/main.rs

And here's an example output:

jack@jack-NUC:~/dev/rust/learning_rust/tokio_two_runtimes$ time cargo run --release
   Compiling tokio_two_runtimes v0.1.0 (/home/jack/dev/rust/learning_rust/tokio_two_runtimes)
    Finished `release` profile [optimized] target(s) in 0.94s
     Running `target/release/tokio_two_runtimes`

ThreadId(1) RuntimeId(1) Starting main_rt!
tokio::time::sleep on main_rt
Finished tokio_time::sleep on main_rt
ThreadId(3) RuntimeId(1) Spawning tasks on main_rt!
ThreadId(3) RuntimeId(1) Ending tasks on main_rt!
ThreadId(2) RuntimeId(1) ** Starting 100ms task 3 ^^!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 9 ^^!
ThreadId(5) RuntimeId(2) Starting tasks on cpu_rt_handle!
ThreadId(5) RuntimeId(2) Ending tasks on cpu_rt_handle!
ThreadId(5) RuntimeId(2) ** Starting 1s task 1 ^^!
ThreadId(4) RuntimeId(2) ** Starting 1s task 0 ^^!
ThreadId(1) RuntimeId(1) Ending main_rt!
ThreadId(2) RuntimeId(1) ** Finished 100ms task 3 ##!
ThreadId(2) RuntimeId(1) ** Starting 1s task 0 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 9 ##!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 4 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 4 ##!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 5 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 5 ##!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 6 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 6 ##!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 7 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 7 ##!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 8 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 8 ##!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 1 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 1 ##!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 0 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 0 ##!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 2 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 2 ##!
ThreadId(5) RuntimeId(2) ** Finished 1s task 1 ##!
ThreadId(4) RuntimeId(2) ** Finished 1s task 0 ##!
ThreadId(2) RuntimeId(1) ** Finished 1s task 0 ##!

real    0m3.093s
user    0m3.151s
sys     0m0.228s
JackKelly commented 2 months ago

Conclusion

Let's not use Rayon. Instead, let's use two Tokio runtimes (using the method outlined in the comment immediately above).

This has several advantages: