awestlake87 / pyo3-asyncio

Other
312 stars 48 forks source link

Await `!Send` Rust futures in Python #59

Open Kobzol opened 2 years ago

Kobzol commented 2 years ago

Hi! I want to use pyo3-asyncio to convert Rust futures into Python awaitables that can then be awaited in Python. For Send futures, this is quite easy, e.g.:

#[pyfunction]
fn make_awaitable(py: Python) -> PyResult<&PyAny> {
    pyo3_asyncio::tokio::future_into_py(py, async {
        something_async().await;
        Ok(())
    })
}

However, when the Rust Future is not Send, this gets problematic. I know that there is the local_future_into_py function, which is demonstrated here. However, to actually await that future, the context needs to be inside a LocalSet.

Is it possible to create a Python awaitable from a Rust !Send future and then await it in Python (and not in Rust using LocalSet)? My idea would be to use a global single-threaded tokio runtime and run everything inside a global LocalSet, but I'm not sure if that's possible currently.

Thank you for a great library btw! :) Even if it wouldn't support this use case, it's very useful.

awestlake87 commented 2 years ago

Is it possible to create a Python awaitable from a Rust !Send future and then await it in Python (and not in Rust using LocalSet)? My idea would be to use a global single-threaded tokio runtime and run everything inside a global LocalSet, but I'm not sure if that's possible currently

Unfortunately, I don't think it's possible without a LocalSet. There are two kinds of threads in PyO3 Asyncio - threads that are controlled by Python's event loop and threads that are controlled by Rust's event loop. In order to run a !Send future, you have to call local_future_into_py on a thread that is controlled by Rust's event loop because the future cannot be 'sent' to a Rust thread later. In order to await the future in a thread controlled by Python, you would need some kind of unified event loop like what is discussed here.

Maybe we can get something figured out for your use-case though. If you provide some more info about what you're trying to do I might be able to give you some pointers on how to get around it. Sometimes you can circumvent this issue by first spawning a Send future that creates the !Send future, but it varies from problem to problem.

Thank you for a great library btw! :) Even if it wouldn't support this use case, it's very useful.

Thanks, I appreciate it!

Kobzol commented 2 years ago

Basically I want to do exactly what I showed here:

#[pyfunction]
fn make_awaitable(py: Python) -> PyResult<&PyAny> {
    pyo3_asyncio::tokio::future_into_py(py, async {
        something_async().await;
        Ok(())
    })
}

just with a !Send future (my library uses single-threaded runtime and !Send futures for everything).

Maybe we can get something figured out for your use-case though. If you provide some more info about what you're trying to do I might be able to give you some pointers on how to get around it. Sometimes you can circumvent this issue by first spawning a Send future that creates the !Send future, but it varies from problem to problem.

I was trying to spawn a !Send future within a Send future, but couldn't get it to work without creating a nested runtime:

struct Fut(Rc<u32>);

impl Future for Fut {
    type Output = u32;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        todo!()
    }
}

#[pyfunction]
fn nonsend_fut(py: Python) -> PyResult<&PyAny> {
    future_into_py(py, async move {
        let rt = Builder::new_current_thread().enable_all().build().unwrap();

        let set = LocalSet::new();

        let fut = async move {
            let fut = Fut(Rc::new(0));
            Ok(fut.await)
        };

        rt.block_on(set.run_until(fut))
    })
}

Which seems to deadlock however.

It seems to me that creating a LocalSet within a Send future is a futile attempt. Maybe https://github.com/tokio-rs/tokio/pull/3370 could help.

awestlake87 commented 2 years ago

How is your project set up? Are you using pyo3_asyncio::tokio::main or is it a PyO3 native extension?

Kobzol commented 2 years ago

I'm not using the main macro, I'm writing a native extension. So I have a Rust library from which I want to expose several functions to Python, some of them should return awaitables.

awestlake87 commented 2 years ago

Ok, if you're using the current thread scheduler, does that mean that you're initializing tokio and spawning a thread for it somewhere in your native extension?

Kobzol commented 2 years ago

Well, I also have a binary that uses the library, and I do that in the binary.

But I hoped that for the extension I could just use the global tokio runtime from pyo3-asyncio, so I didn't initialize tokio explicitly in my extension so far.

I suppose that spawning a thread which would contain a single threaded-runtime and then communicating with it using e.g. mpsc queues from Send futures running inside of the pyo3-asyncio could work (would it be safe to create this thread in the #[pymodule] initialization code?). Of course I was wondering whether something like this could be avoided.

awestlake87 commented 2 years ago

I had an idea about runtime initialization, but then I remembered I had done something very similar in one of my tests:

Try something like this:


#[pyfunction]
fn return_3(py: Python) -> PyResult<&PyAny> {
    pyo3_asyncio::tokio::future_into_py(py, async move {
        let result = tokio::task::spawn_blocking(|| {
            let data = Rc::new(3i32);

            tokio::task::LocalSet::new().block_on(pyo3_asyncio::tokio::get_runtime(), async move {
                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                println!("using local data {}", *data);
                *data
            })
        });

        Ok(result.await.unwrap())
    })
}

Instead of creating a new runtime, you create a blocking task on the tokio runtime to run the localset.

It's not a workaround I've tested very well so I don't know how many local tasks you can run concurrently like this. Let me know if this works for you!

Kobzol commented 2 years ago

Right, this is conceptually similar to the "separate thread + single threaded runtime + queue" approach, but it's better if it's handled by tokio, of course.

Originally, I hoped to avoid something like this, but now I realized that it can be actually quite useful. If you use a single-threaded runtime and !Send futures, you will probably need to access some shared state via RefCell/PyCell (at least I have set it up like this, but it's probably unavoidable). You would need to protect this state via a mutex or something like that, to avoid concurrent Python calls of Rust functions that access the state and create futures from crashing with BorrowMut errors. By using spawn_blocking and setting the number of blocking threads in tokio to 1, it could solve this issue automatically.

Of course, an even better solution would be if Python and Rust event loops knew about each other and awaits in Rust and Python would cooperate, but that's probably utopia for now :)

I can't test it right now, but I'll experiment with it over the weekend and let you know if it works. Thank you very much for the suggestion!

Kobzol commented 2 years ago

Great, this solution seems to work! For some reason it deadlocks with a single-threaded runtime, maybe it does not support spawn_blocking or there's some other problem. But it's not a big deal for me at this moment.

Maybe this approach could be put into the documentation, because currently if someone has a !Send future, the docs immediately redirect him to local_future_into_py, which is however basically unusable from the Python side.

awestlake87 commented 2 years ago

Great, this solution seems to work! For some reason it deadlocks with a single-threaded runtime, maybe it does not support spawn_blocking or there's some other problem. But it's not a big deal for me at this moment.

That's strange. I didn't see any deadlocks when I tried it with the current thread scheduler. I didn't restrict the number of blocking threads in tokio though, so maybe that's why?

It does make me wonder why tokio doesn't support LocalSet in tasks. Seems like it would make things a lot easier if it was more like async-std where spawn_local ensures that the task you spawn can only run on the worker thread that spawned it (and panic if it wasn't spawned on a worker thread). If it worked like that, then you wouldn't need additional blocking threads like in tokio.

Maybe this approach could be put into the documentation, because currently if someone has a !Send future, the docs immediately redirect him to local_future_into_py, which is however basically unusable from the Python side.

Yeah it probably should. Honestly this is making me rethink having those conversions to begin with because as you say, they can really only be used from the Rust side, which pretty much defeats the purpose. Maybe just documenting the spawn blocking workaround is good enough.

awestlake87 commented 2 years ago

@TkTech opened the original issue that led to those conversions being created. I'd be curious to know if they still have a use-case for these conversions that wasn't mentioned here or if they have similar issues with these conversions.

If they're of the same opinion, I might just deprecate these conversions.

TkTech commented 2 years ago

Ultimately, I ended up moving away from pyo3-asyncio. I created my own event loop shims by implementing std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker} & futures::channel::oneshot and drive the event loop directly from asyncio, which works well for my simple use case.

Kobzol commented 2 years ago

@TkTech Is your solution open-sourced? I think that it might be useful for other people (myself included!).

awestlake87 commented 2 years ago

Sounds very similar to what @ThibaultLemaire was working on in #6. !Send futures was a use-case I hadn't really considered for an asyncio-driven event loop, but it makes sense now that I think about it.

Might be worth taking a second look at it!

ThibaultLemaire commented 2 years ago

if Python and Rust event loops knew about each other and awaits in Rust and Python would cooperate

That sounds a bit like what I implemented, yes. Here's the code for your convenience (to spare you the long github discussion).

In a nutshell, the idea is to write a wrapper that behaves like a Python awaitable/task but is able to drive a Rust future.

I eventually dropped the project because staying off the Python thread was much faster and I couldn't think of a situation where !Send futures would be required.

I cannot guarantee !Send support though, as I think to recall some issues I had with such futures (and indeed, looking at my code, I'm taking Future<Output = TOutput> + Send + 'static). It's possible it was just related to some issues I had when trying to combine it with tokio, but it could be something deeper. I don't remember, so you're welcome to take a look at the code and figure something more clever for yourself :slightly_smiling_face:

TkTech commented 2 years ago

@TkTech Is your solution open-sourced? I think that it might be useful for other people (myself included!).

Not yet, but it will be. It's part of a tool that allows users to provide their own scripts to interact with events on IRC (a rewrite of https://github.com/TkTech/notifico). It's very simplistic, and takes advantage of the fact that v8::Isolates are !Send, can be driven by a polling loop (which in this case is driven by asyncio), and is single threaded itself to make quite a few assumptions simplifying implementation.

AzureMarker commented 2 years ago

I don't have any context on this thread, but https://github.com/tokio-rs/tokio/pull/3370 merged and is released in 1.16.0. (the PR was referenced earlier as something that might help)

awestlake87 commented 2 years ago

@AzureMarker thanks! Looks like that would be a cleaner replacement for the spawn_blocking workaround.

AzureMarker commented 2 years ago

Oh, I just realized the spawn_pinned changes didn't release with 1.16.0 since they're in tokio_util which isn't included in the main tokio crate. But they did release recently in tokio_util 0.7.0: https://github.com/tokio-rs/tokio/pull/4486 https://lib.rs/crates/tokio-util/versions