awestlake87 / pyo3-asyncio

Other
302 stars 46 forks source link

Creating awaitable methods #50

Open 1frag opened 2 years ago

1frag commented 2 years ago

Hi, I haven't found any information on how to write asynchronous methods. e.g. I'd like to use module like:

await RustSleeper(42).sleep()  # sleep for 42 secs

I write the next rust code:

use std::time::Duration;
use pyo3::prelude::*;

#[pyclass]
struct RustSleeper(u64);

#[pymethods]
impl RustSleeper {
    fn sleep<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
        pyo3_asyncio::tokio::future_into_py(py, async move {
            tokio::time::sleep(Duration::from_secs(self.0)).await;
            Python::with_gil(|py| Ok(py.None()))
        })
    }
}

#[pymodule]
fn sleeper_rs(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
    m.add_class::<RustSleeper>()?;
    Ok(())
}

It doesn't work. The compiler's output:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> src/lib.rs:10:60
   |
9  |       fn sleep<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
   |                    ----- this data with an anonymous lifetime `'_`...
10 |           pyo3_asyncio::tokio::future_into_py(py, async move {
   |  ____________________________________________________________^
11 | |             tokio::time::sleep(Duration::from_secs(self.0)).await;
12 | |             Python::with_gil(|py| Ok(py.None()))
13 | |         })
   | |_________^ ...is captured here...
   |
note: ...and is required to live as long as `'static` here
  --> src/lib.rs:10:9
   |
10 |         pyo3_asyncio::tokio::future_into_py(py, async move {
   |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Pretty understandable, but I cannot set 'static for self due to specific handling in pymethods:

Error when set `'static` for `self` ``` error[E0495]: cannot infer an appropriate lifetime for lifetime parameter `'p` due to conflicting requirements --> src/lib.rs:7:1 | 7 | #[pymethods] | ^^^^^^^^^^^^ | note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the body at 7:1... --> src/lib.rs:7:1 | 7 | #[pymethods] | ^^^^^^^^^^^^ note: ...so that the types are compatible --> src/lib.rs:7:1 | 7 | #[pymethods] | ^^^^^^^^^^^^ = note: expected `pyo3::Python<'_>` found `pyo3::Python<'_>` = note: but, the lifetime must be valid for the static lifetime... note: ...so that reference does not outlive borrowed content --> src/lib.rs:7:1 | 7 | #[pymethods] | ^^^^^^^^^^^^ = note: this error originates in the attribute macro `pymethods` (in Nightly builds, run with -Z macro-backtrace for more info) ```
or (with `-Z macro-backtrace`) ``` error[E0495]: cannot infer an appropriate lifetime for lifetime parameter `'p` due to conflicting requirements --> src/lib.rs:7:1 | 7 | #[pymethods] | ^^^^^^^^^^^^ in this procedural macro expansion | ::: ~/.cargo/registry/src/github.com-1ecc6299db9ec823/pyo3-macros-0.14.5/src/lib.rs:190:1 | 190 | pub fn pymethods(_: TokenStream, input: TokenStream) -> TokenStream { | ------------------------------------------------------------------- in this expansion of `#[pymethods]` | note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the body at 7:1... --> src/lib.rs:7:1 | 7 | #[pymethods] | ^^^^^^^^^^^^ in this procedural macro expansion | ::: ~/.cargo/registry/src/github.com-1ecc6299db9ec823/pyo3-macros-0.14.5/src/lib.rs:190:1 | 190 | pub fn pymethods(_: TokenStream, input: TokenStream) -> TokenStream { | ------------------------------------------------------------------- in this expansion of `#[pymethods]` note: ...so that the types are compatible --> src/lib.rs:7:1 | 7 | #[pymethods] | ^^^^^^^^^^^^ in this procedural macro expansion | ::: ~/.cargo/registry/src/github.com-1ecc6299db9ec823/pyo3-macros-0.14.5/src/lib.rs:190:1 | 190 | pub fn pymethods(_: TokenStream, input: TokenStream) -> TokenStream { | ------------------------------------------------------------------- in this expansion of `#[pymethods]` = note: expected `pyo3::Python<'_>` found `pyo3::Python<'_>` = note: but, the lifetime must be valid for the static lifetime... note: ...so that reference does not outlive borrowed content --> src/lib.rs:7:1 | 7 | #[pymethods] | ^^^^^^^^^^^^ in this procedural macro expansion | ::: ~/.cargo/registry/src/github.com-1ecc6299db9ec823/pyo3-macros-0.14.5/src/lib.rs:190:1 | 190 | pub fn pymethods(_: TokenStream, input: TokenStream) -> TokenStream { | ------------------------------------------------------------------- in this expansion of `#[pymethods]` ```
Dependencies: ```toml [dependencies] tokio = "1.12.0" [dependencies.pyo3-asyncio] version = "0.14.0" features = ["tokio-runtime"] [dependencies.pyo3] version = "0.14.5" features = ["extension-module", "serde"] ```

So, is it possible to write async method? And if yes, maybe extend the documentation or examples of this library?

awestlake87 commented 2 years ago

Yeah, I've gotten this question a few times so it's probably worth adding to the docs. Essentially the problem you're facing is that the &self can only last as long as a GIL borrow since the structure is owned by the Python interpreter when it's a #[pyclass]. You have a couple options in this case:

  1. Make a clone of the internal data that you want to reference in your async method. This can be easy for your example since self.0 implements Copy (in more complex situations wrapping the struct's data in an Arc<T> would work as well). All you need to do is store the value before the move so you aren't referencing self in your closure:
#[pymethods]
impl RustSleeper {
    fn sleep<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
        let secs = self.0;
        pyo3_asyncio::tokio::future_into_py(py, async move {
            tokio::time::sleep(Duration::from_secs(secs)).await;
            Python::with_gil(|py| Ok(py.None()))
        })
    }
}
  1. Make self a Py<Self> so that it's not tied to the lifetime of the GIL. Once you need a value out of it, you can borrow the value:
    #[pymethods]
    impl RustSleeper {
    fn sleep<'p>(this: Py<Self>, py: Python<'p>) -> PyResult<&'p PyAny> {
        pyo3_asyncio::tokio::future_into_py(py, async move {
            tokio::time::sleep(Duration::from_secs(Python::with_gil(|py| {
                this.borrow(py).0
            })))
            .await;
            Python::with_gil(|py| Ok(py.None()))
        })
    }
    }

Full disclosure, I haven't actually tried option 2 before, but I've been told that it should work. I got a compiler error when I declared the receiver type as self: Py<Self> instead of this: Py<Self>, so I'm not certain my example will work the way I intended it to. Might be worth a try though.

1frag commented 2 years ago

Thanks, it's working now. This explanation is very helpful!

relsunkaev commented 2 years ago

Would it be possible to mutate self inside of the async move block? I need to attach a value that needs to be awaited to one of the fields.

1frag commented 2 years ago

In a simple case, it can be done with Py<Self>:

use pyo3::prelude::*;
use tokio::fs::File;
use tokio::io::AsyncReadExt;

#[pyclass]
struct RustSleeper {
    #[pyo3(get, set)]
    n: u64,
}

async fn get_random_u64() -> u64 {
    let mut file = File::open("/dev/urandom").await.unwrap();
    file.read_u64().await.unwrap() % 10
}

#[pymethods]
impl RustSleeper {
    #[new]
    fn new(n: u64) -> Self {
        RustSleeper {n}
    }

    fn change_n(this: Py<Self>, py: Python) -> PyResult<&PyAny> {
        pyo3_asyncio::tokio::future_into_py(py, async move {
            let n = get_random_u64().await;
            Python::with_gil(|py| {
                let cell: &PyCell<RustSleeper> = this.as_ref(py);
                let mut slf = cell.try_borrow_mut().unwrap();
                slf.n = n;
                Ok(py.None())
            })
        })
    }
}

#[pymodule]
fn sleeper_rs(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
    m.add_class::<RustSleeper>()?;
    Ok(())
}

And use like:

import asyncio
import sleeper_rs

sl = sleeper_rs.RustSleeper(2)

async def main():
    await sl.change_n()

print(sl.n)  # 2
asyncio.run(main())
print(sl.n)  # from 0 to 9

But I couldn't find a way to use method like this:

impl RustSleeper {
    async fn async_set_n(&mut self, n: u64) {
        self.n = n;
    }
}

inside async move, because py is available only in sync functions.

awestlake87 commented 2 years ago

Yeah the example that @1frag provided should work.

Another alternative that I've used in my projects is to create an inner structure protected with anArc<Mutex<Inner>>. The Arc can be cloned and passed into an async move { } and any async methods on the Inner struct can be safely awaited. You can also use an async-capable Mutex for the lock too. This would be a workaround for the problem mentioned by @1frag

I'm away from my computer right now, but I can provide some snippets to elaborate later if needed.

relsunkaev commented 2 years ago

Thanks, that worked great!

RoastVeg commented 2 years ago

Another alternative that I've used in my projects is to create an inner structure protected with anArc<Mutex<Inner>>. The Arc can be cloned and passed into an async move { } and any async methods on the Inner struct can be safely awaited. You can also use an async-capable Mutex for the lock too. This would be a workaround for the problem mentioned by @1frag

Here's an example for those (like me) that came here to find one:

use pyo3::prelude::*;
use std::sync::Arc;
use tokio::sync::Mutex;

struct MyType {
    value: bool
}

impl MyType {
    pub async fn my_async_fn(&self, or: bool) -> bool {
        self.value | or
    }
}

#[pyclass(name = "MyType")]
struct PyMyType(Arc<Mutex<MyType>>);

#[pymethods]
impl PyMyType {
    #[new]
    pub fn new() -> Self {
        Self(Arc::new(Mutex::new(MyType { value: false })))
    }

    pub fn my_async_fn<'a>(&self, py: Python<'a>, or: bool) -> PyResult<&'a PyAny> {
        let inner = self.0.clone();
        pyo3_asyncio::tokio::future_into_py(py, async move {
            Ok(inner.lock().await.my_async_fn(or).await)
        })
    }
}
igiloh-pinecone commented 1 year ago

@RoastVeg thank you very much for posting this detailed solution!

I tried it in my case, which is similar (though slightly more complex). If I don't define a clear 'a lifetime - the compiler says

associated function was supposed to return data with lifetime `'2` but it is returning data with lifetime `'1`

where '2 refers to self and '1 to py.

If I do define an explicit lifetime, the compiler now says

`py` escapes the associated function body here
argument requires that `'a` must outlive `'static`

Maybe it's because the return type of the inner async function is not a simple type that implements Copy (like the bool you've shown here), but a Vec<pyclass>.

Any ideas?

awestlake87 commented 1 year ago

@igiloh-pinecone can you share your function? From what you're saying, I think you're capturing a value with the lifetime of the GIL in your async block (which requires 'static), but without context I couldn't really tell you where + how to get around the error.

igiloh-pinecone commented 1 year ago

@awestlake87 thank you, and apologies for the delayed response!
I'm afraid this code isn't public yet (it will be soon), so I can't refer you to it directly.

The code itself is slightly convoluted, involving a cascade of wrapped struct method calls.
I tried to create the most minimal gist depicting this code, please see here.
As I mentioned, the error I get is:

|        pub fn upsert_async<'a>(&mut self, py: Python<'a>, vectors: Vec<pinecone_core::Vector>, namespace: &'a str) -> PyResult<&'a PyAny>{
|                            --             -- `py` is a reference that is only valid in the associated function body
|                            |
|                            lifetime `'a` defined here
|            let mut inner_index = self.inner.clone();
| /              pyo3_asyncio::tokio::future_into_py(py, async move {
| |                  let res = inner_index.lock().await.upsert(&namespace, &vectors, None).await?;
| |                  Ok(res)
| |                  // Ok(Python::with_gil(|py| res.into_py(py)))
| |
| |              })
| |               ^
| |               |
| |_______________`py` escapes the associated function body here
|                 argument requires that `'a` must outlive `'static`

I then suspected that maybe py "escapes" since Vector is a pyclass, so it somehow require the GIL.
I changed the code structure a bit, making Vector a simple Rust struct, and wrapping it with an pyclass which is stripped before the async move. You can see this slightly different version here (this is the diff).
But sadly that still gave the exact same error.

Thank you again for taking the time and effort to look into this problem!
I've been wrapping my had around this problem for days now, but I still can't figure out what the lifetime problem is.

awestlake87 commented 1 year ago

I think your issue is namespace actually. It is a &'a str since you're potentially borrowing a string from Python. What you need to do is take ownership of it before it's captured by the async move { } block.

You can do this by converting it to a Rust String or using a Py<T> reference like PyObject or maybe Py<PyString> outside of the async move { } so it will have a 'static reference to the data inside the block. Converting it to a String is pretty easy, so I'd give that a try first.

igiloh-pinecone commented 1 year ago

@awestlake87 thank you very much, you're absolutely right!
I've totally missed that 🤦 .

However, I only now realized there's a deeper problem with this approach.
In my code, the whole point was to call the same my_async_fn() from python multiple times with different inputs, having these calls run concurrently. So adding a Mutex sort of misses the point.

I ended up simply cloning self.inner itself, which is "cheap" in my particular use case. But I'm not sure that will be a viable solution for other users who encounter the same problem.

awestlake87 commented 1 year ago

From what I can see, your mutex is locked within the async block, so the bigger question IMO is whether you can perform upsert concurrently.

Your function can be called multiple times from python without blocking because it essentially just schedules the task on tokio. The lock occurs inside the async block so these scheduled tasks will run at the same time, but they will wait on each other so that only one is doing the upsert at any given time.

If your struct can perform upserts concurrently there's no need for the mutex and you might be able to get away with just an Arc, but if it can't you may need to find a different solution with your library. You may be able to queue the items and perform a batch of upserts for example. The solution to this problem can be pretty library-specific though