compio-rs / compio

A thread-per-core Rust runtime with IOCP/io_uring/polling.
MIT License
420 stars 37 forks source link

Make futures channel work across thread #108

Closed nazar-pc closed 1 year ago

nazar-pc commented 1 year ago

As you can imagine it will not be uncommon to use anything async other than compio's APIs in a typical application:

fn main() {
    let (tx, rx) = futures::channel::oneshot::channel::<()>();

    std::thread::spawn(move || {
        std::thread::sleep(std::time::Duration::from_millis(100));
        tx.send(()).unwrap();
    });

    compio::runtime::block_on(async move {
        rx.await.unwrap();
    });
}

However, in case of compio unless anything io_uring-related is happening, it just hangs:

#0  syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1  0x000055555559f5e7 in io_uring::sys::io_uring_enter (fd=3, to_submit=0, min_complete=1, flags=1, arg=0x0, size=128) at /home/nazar-pc/.cargo/registry/src/index.crates.io-6f17d22bba15001f/io-uring-0.6.2/src/sys/mod.rs:114
#2  0x000055555559f273 in io_uring::submit::Submitter::enter<libc::unix::linux_like::linux::gnu::b64::sigset_t> (self=0x7ffffffeba90, to_submit=0, min_complete=1, flag=1, arg=...) at /home/nazar-pc/.cargo/registry/src/index.crates.io-6f17d22bba15001f/io-uring-0.6.2/src/submit.rs:95
#3  0x000055555559f3b0 in io_uring::submit::Submitter::submit_and_wait (self=0x7ffffffeba90, want=1) at /home/nazar-pc/.cargo/registry/src/index.crates.io-6f17d22bba15001f/io-uring-0.6.2/src/submit.rs:137
#4  0x000055555559d81d in io_uring::IoUring<io_uring::squeue::Entry, io_uring::cqueue::Entry>::submit_and_wait<io_uring::squeue::Entry, io_uring::cqueue::Entry> (self=0x7ffff7f7b730, want=1) at /home/nazar-pc/.cargo/registry/src/index.crates.io-6f17d22bba15001f/io-uring-0.6.2/src/lib.rs:194
#5  0x000055555559bd47 in compio_driver::sys::Driver::submit_auto (self=0x7ffff7f7b730, timeout=..., wait=true) at /home/nazar-pc/.cargo/registry/src/index.crates.io-6f17d22bba15001f/compio-driver-0.1.0/src/iour/mod.rs:54
#6  0x00005555555885e1 in compio_driver::sys::Driver::poll<smallvec::SmallVec<[compio_driver::Entry; 1024]>> (self=0x7ffff7f7b730, timeout=..., entries=0x7ffffffebd68, registry=0x7ffff7f7b880) at /home/nazar-pc/.cargo/registry/src/index.crates.io-6f17d22bba15001f/compio-driver-0.1.0/src/iour/mod.rs:136
#7  0x0000555555592247 in compio_driver::Proactor::poll<smallvec::SmallVec<[compio_driver::Entry; 1024]>> (self=0x7ffff7f7b730, timeout=..., entries=0x7ffffffebd68) at /home/nazar-pc/.cargo/registry/src/index.crates.io-6f17d22bba15001f/compio-driver-0.1.0/src/lib.rs:216
#8  0x0000555555596443 in compio_runtime::runtime::Runtime::poll (self=0x7ffff7f7b728) at /home/nazar-pc/.cargo/registry/src/index.crates.io-6f17d22bba15001f/compio-runtime-0.1.0/src/runtime/mod.rs:151
#9  0x000055555557b75c in compio_runtime::runtime::Runtime::block_on<compio_hang::main::{async_block_env#1}> (self=0x7ffff7f7b728, future=...) at /home/nazar-pc/.cargo/registry/src/index.crates.io-6f17d22bba15001f/compio-runtime-0.1.0/src/runtime/mod.rs:67
#10 0x0000555555585958 in compio_runtime::block_on::{closure#0}<compio_hang::main::{async_block_env#1}> (runtime=0x7ffff7f7b728) at /home/nazar-pc/.cargo/registry/src/index.crates.io-6f17d22bba15001f/compio-runtime-0.1.0/src/lib.rs:40
#11 0x00005555555801f0 in std::thread::local::LocalKey<compio_runtime::runtime::Runtime>::try_with<compio_runtime::runtime::Runtime, compio_runtime::block_on::{closure_env#0}<compio_hang::main::{async_block_env#1}>, ()> (self=0x5555555ed208, f=...) at /rustc/42b1224e9eb37177f608d3f6a6f2be2ee13902e4/library/std/src/thread/local.rs:270
#12 0x00005555555800aa in std::thread::local::LocalKey<compio_runtime::runtime::Runtime>::with<compio_runtime::runtime::Runtime, compio_runtime::block_on::{closure_env#0}<compio_hang::main::{async_block_env#1}>, ()> (self=0x5555555ed208, f=<error reading variable: Cannot access memory at address 0x10>) at /rustc/42b1224e9eb37177f608d3f6a6f2be2ee13902e4/library/std/src/thread/local.rs:246
#13 0x000055555558590c in compio_runtime::block_on<compio_hang::main::{async_block_env#1}> (future=...) at /home/nazar-pc/.cargo/registry/src/index.crates.io-6f17d22bba15001f/compio-runtime-0.1.0/src/lib.rs:40
#14 0x0000555555584ebe in compio_hang::main () at crates/subspace-farmer/examples/compio-hang.rs:9

This is certainly unexpected and I think should be fixed, otherwise it is painful and less performant to combine compio with anything async at all since io_uring is not the only reason async task could be polled. Looks like waker handling is broken/missing right now.

nazar-pc commented 1 year ago

Even something as simple as using FuturesUnordered combinator with a bunch of async reads doesn't seem to work.

Berrysoft commented 1 year ago

futures channels should be used inside the same runtime. You're using the sender and receiver in different threads. That seems a bug because the waker won't wake up the driver.

And what do you mean about FuturesUnordered?

nazar-pc commented 1 year ago

futures channels should be used inside the same runtime

Not at all. Channels can and often do exist in other threads, let alone runtimes, as long as the channel implementation is Send. Try swapping compio::runtime::block_on with futures::executor::block_on or virtually anything else.

And what do you mean about FuturesUnordered?

I was creating FuturesUnordered with a bunch of tasks that all do file.read_exact_at to run all of them concurrently and then tried to drive them with compio, but I believe the root issue is the same.

Berrysoft commented 1 year ago

The root issue may not be the same. Would you provide an MLE of the FuturesUnordered issue?

nazar-pc commented 1 year ago

Happened in bigger app, can't reproduce with smaller example, ignore it for now.

Berrysoft commented 1 year ago

OK, so this issue tracks about the futures channel issue. I think there are 3 solutions:

nazar-pc commented 1 year ago

Futures channels should work with any runtime. I tried a simple example to test waker and it does seem to work properly:

```rust use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; fn main() { struct Yield(bool); impl Yield { fn new() -> Self { Self(false) } } impl Future for Yield { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { if !self.0 { self.0 = true; cx.waker().wake_by_ref(); Poll::Pending } else { Poll::Ready(()) } } } compio::runtime::block_on(async { Yield::new().await; }); } ```

So I'm not sure what is wrong with channel usage specifically :thinking:

Berrysoft commented 1 year ago

The answer is simple: the waker doesn't wake up the driver, e.g. io-uring. The driver is still waiting for an operation to complete.

Berrysoft commented 1 year ago

Another problem is that our Waker is not safe to send to another thread. We may need to redesign some part to support your multi-threading case.

If you just want to notify a thread from another thread, EventHandle is what you want; if you would like a multi-threading pool, compio-dispatch provides a producer-consumer model.

This issue is far from easy to solve. I'll mark it with help-wanted label.

nazar-pc commented 1 year ago

Another problem is that our Waker is not safe to send to another thread. We may need to redesign some part to support your multi-threading case.

Why not? If it wasn't, it would not implement Send and safe Rust wouldn't compile, right? :thinking:

Berrysoft commented 1 year ago

Waker implements Send in std. If it's designed by us, we won't implement it:)

nazar-pc commented 1 year ago

If you implement std::future::Future you must use std::task::Waker, I do not understand what you mean by "we won't implement it".

Berrysoft commented 1 year ago

The runtime assumes single thread everywhere. It won't be easy to make the waker Send, especially to wake one driver from another thread.

I'll open a PR to try to make the waker Send later. Then we can see how to notify the driver.

nazar-pc commented 1 year ago

Well, if it is supposed to be Send, but you decided to not do that, then implementation must be unsound. That is kind of the point of Rust, to make unsound things fail to compile. But I'm not very knowledgable on such low level, so I'm only commenting on abstract level.

George-Miao commented 1 year ago

Well, if it is supposed to be Send, but you decided to not do that, then implementation must be unsound.

Not necessarily. It just means without careful handling, it's most likely unsound. But if we do make guarantee, then it's safe to work with. And that's the kind of the point of unsafe. For now we're thinking unsafe impl Send but check for thread id at runtime each time the waker is waked, thus forbidden cross thread access.