awestlake87 / pyo3-asyncio

Other
300 stars 45 forks source link

[Discussion] Use rust async functions but expose sync functions to Python #89

Closed twitu closed 1 year ago

twitu commented 1 year ago

I've read the documentation but I still want to check if this library can be used in the way described below. It's part of an ongoing migration from cython to rust as tracked in https://github.com/nautechsystems/nautilus_trader/issues/991. The idea is to use https://github.com/apache/arrow-datafusion as a backend to query parquet files and exposing the data as an iterator to python.

To narrow down the problem, there's an async rust library that uses tokio and returns a stream of data. I want to expose the data as an iterator^1, a sync function that can be called by python.

shapes(1)

I've tried implementing this here - https://github.com/nautechsystems/nautilus_trader/pull/997/commits/86b549239bd5564de9d80edd78e0e331b7dc8882. It compiles and all but the tokio::spawn fails because it cannot detect the self managed runtime^2 that's calling block_on.

So is there anyway to do this? Do you suggest any alternatives?

awestlake87 commented 1 year ago

I think the main problem you're seeing is that you are trying to use the tokio context from a Python thread. Your Python code isn't running on a tokio worker thread, so it can't find the runtime. PyO3 Asyncio has a helper function that will allow you to get a reference to the tokio runtime from any thread, so you can try using that in your block_on call like this:

#[pyfunction]
fn rust_sleep(_py: Python) -> PyResult<()> {
    pyo3_asyncio::tokio::get_runtime().block_on(async move {
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        Ok(())
    })
}

To improve it a bit, you should release the GIL while you're blocking using py.allow_threads since you're likely blocking on a Rust operation. Unblocking the GIL can allow other Python threads to run in the background.

#[pyfunction]
fn rust_sleep(py: Python) -> PyResult<()> {
    py.allow_threads(|| {
        pyo3_asyncio::tokio::get_runtime().block_on(async move {
            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
            Ok(())
        })
    })
}

Edit: This is probably not the main issue actually, see below.

awestlake87 commented 1 year ago

Oh ok, I might have misunderstood. If your spawn calls are failing you might need to call rt.enter

awestlake87 commented 1 year ago

Something else that caught my eye is that you're making a new Runtime per iterator. You probably don't need to do this. You can reuse the tokio runtime for multiple iterators, basically make it global to your library or app.

It's also not really clear that you need pyo3-asyncio to be involved here at all since you want a sync interface. pyo3-asyncio is mainly to interact with async Python and the asyncio library specifically. Sync Python code can just go between PyO3 and tokio without needing pyo3-asyncio as a middleman.

If you are using pyo3-asyncio elsewhere in your app (or plan to), you can use the library-wide runtime provided by pyo3_asyncio::tokio::get_runtime as the runtime for your iterators as well. Otherwise you might want to create a similar function and use a static OnceCell for the runtime

twitu commented 1 year ago

Thanks for sharing this approach, it fits this specific use case well because the Python side doesn't need to know anything about the async. Moreover now that once_cell is in the standard library in https://github.com/rust-lang/rust/issues/74465, it can be implemented directly without any additional libraries.