awestlake87 / pyo3-asyncio

Other
312 stars 48 forks source link

Is it possible to await on PyModule::from_code #35

Closed glennpierce closed 3 years ago

glennpierce commented 3 years ago

I have embedded the Python interpreter into my rust application and exposed a couple on test methods defined in rust. One of them is an async function that I wish to await from the python code that I will interpret using PyModule::from_code. (The function is just a test delay for now)

The trouble is I don't know how to make the PyModule::from_code function await until the co-routines are finished.

Running the code gives

sys:1: RuntimeWarning: coroutine 'test' was never awaited RuntimeWarning: Enable tracemalloc to get the object allocation traceback

My example code is as follows. How can I get the python code to await ? Thanks

fn run_python() -> Result<(), ()> {

    let gil = Python::acquire_gil();
    let py = gil.python();

    let bmos = PyModule::new(py, "bmos").unwrap();
    bmos.add_function(pyo3::wrap_pyfunction!(sum_as_string, bmos).unwrap()).unwrap();
    bmos.add_function(pyo3::wrap_pyfunction!(rust_sleep, bmos).unwrap()).unwrap();

    add_module(py, bmos);

    run_python_script(py).map_err(|e| {
        // We can't display Python exceptions via std::fmt::Display,
        // so print the error here manually.
        e.print_and_set_sys_last_vars(py);
    })
}

fn add_module(py: Python, module: &PyModule) {
    py.import("sys")
        .expect("failed to import python sys module")
        .dict()
        .get_item("modules")
        .expect("failed to get python modules dictionary")
        .downcast::<PyDict>()
        .expect("failed to turn sys.modules into a PyDict")
        .set_item(module.name().expect("module missing name"), module)
        .expect("failed to inject module");
}

fn run_python_script(py: Python) -> PyResult<()> {

    let sys = py.import("sys")?;
    let version: String = sys.get("version")?.extract()?;
    let locals = [("os", py.import("os")?)].into_py_dict(py);
    let code = "os.getenv('USER') or os.getenv('USERNAME') or 'Unknown'";
    let user: String = py.eval(code, None, Some(&locals))?.extract()?;

    let activators = PyModule::from_code(py, r#"
def relu(x):
    return max(0.0, x)

async def test():
    import bmos
    print(bmos.sum_as_string(8, 5))
    await bmos.rust_sleep()
    print("done")

def leaky_relu(x, slope=0.01):
    return x if x >= 0 else x * slope
    "#, "activators.py", "activators")?;

    let relu_result = activators.getattr("test")?.call0()?;

    let relu_result: f64 = activators.getattr("relu")?.call1((-1.0,))?.extract()?;
    assert_eq!(relu_result, 0.0);
    println!("Hello {}, I'm Python {}", user, version);
    Ok(())
}

#[pyfunction]
fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
    Ok((a + b).to_string())
}

#[pyfunction]
fn rust_sleep(py: Python) -> PyResult<PyObject> {
    pyo3_asyncio::tokio::into_coroutine(py, async {
        tokio::time::sleep(std::time::Duration::from_secs(10)).await;
        Ok(Python::with_gil(|py| py.None()))
    })
}

#[pyo3_asyncio::tokio::main]
async fn main() -> PyResult<()> {

    run_python();

    loop {
        .// Would like todo other async things as well
         // tick_async();
         std::thread::sleep(std::time::Duration::from_millis(500));
    }

    Ok(())
}
glennpierce commented 3 years ago

Note I have tried adding

pyo3_asyncio::try_init(py).unwrap();
pyo3_asyncio::tokio::init_multi_thread_once();

and the following to the Python parts.

asyncio.get_event_loop().run_until_complete(rust_sleep())

but still have the same issue.

awestlake87 commented 3 years ago

You were almost there. All you needed was to await the test coroutine by converting it into a future with pyo3_asyncio::into_future. When I ran it unmodified, I got this error:

sys:1: RuntimeWarning: coroutine 'test' was never awaited

This means that you need to await the result of test() somewhere either in Python or in Rust. I went ahead and awaited it with pyo3_asyncio::into_future.

There was also a bit of GIL juggling to be done since the GIL cannot be held across await boundaries, so I had to wrap some sections of code with Python::with_gil and change some functions to async:

# Cargo.toml

[package]
name = "glennpierce"
version = "0.1.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures = "0.3"
pyo3 = "0.13"
pyo3-asyncio = { version = "0.13", features = ["attributes", "tokio-runtime"] }
tokio = "1.4"
//! main.rs

use pyo3::{
    prelude::*,
    types::{IntoPyDict, PyDict},
};

async fn run_python() -> Result<(), ()> {
    Python::with_gil(|py| {
        let bmos = PyModule::new(py, "bmos").unwrap();
        bmos.add_function(pyo3::wrap_pyfunction!(sum_as_string, bmos).unwrap())
            .unwrap();
        bmos.add_function(pyo3::wrap_pyfunction!(rust_sleep, bmos).unwrap())
            .unwrap();
        add_module(py, bmos);
    });

    run_python_script().await.map_err(|e| {
        Python::with_gil(|py| {
            // We can't display Python exceptions via std::fmt::Display,
            // so print the error here manually.
            e.print_and_set_sys_last_vars(py);
        })
    })
}

fn add_module(py: Python, module: &PyModule) {
    py.import("sys")
        .expect("failed to import python sys module")
        .dict()
        .get_item("modules")
        .expect("failed to get python modules dictionary")
        .downcast::<PyDict>()
        .expect("failed to turn sys.modules into a PyDict")
        .set_item(module.name().expect("module missing name"), module)
        .expect("failed to inject module");
}

async fn run_python_script() -> PyResult<()> {
    let (activators, user, version) =
        Python::with_gil(|py| -> PyResult<(PyObject, String, String)> {
            let sys = py.import("sys")?;
            let version: String = sys.get("version")?.extract()?;
            let locals = [("os", py.import("os")?)].into_py_dict(py);
            let code = "os.getenv('USER') or os.getenv('USERNAME') or 'Unknown'";
            let user: String = py.eval(code, None, Some(&locals))?.extract()?;

            let activators = PyModule::from_code(
                py,
                r#"
def relu(x):
    return max(0.0, x)

async def test():
    import bmos
    print(bmos.sum_as_string(8, 5))
    await bmos.rust_sleep()
    print("done")

def leaky_relu(x, slope=0.01):
    return x if x >= 0 else x * slope
    "#,
                "activators.py",
                "activators",
            )?;

            Ok((activators.into(), user, version))
        })?;

    // Main fix here!
    let relu_result = Python::with_gil(|py| -> PyResult<_> {
        Ok(pyo3_asyncio::into_future(
            activators.as_ref(py).getattr("test")?.call0()?,
        )?)
    })?
    .await?;

    let relu_result: f64 = Python::with_gil(|py| {
        activators
            .as_ref(py)
            .getattr("relu")?
            .call1((-1.0,))? // should this be called with relu_result from before?
            .extract()
    })?;
    assert_eq!(relu_result, 0.0);
    println!("Hello {}, I'm Python {}", user, version);
    Ok(())
}

#[pyfunction]
fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
    Ok((a + b).to_string())
}

#[pyfunction]
fn rust_sleep(py: Python) -> PyResult<PyObject> {
    pyo3_asyncio::tokio::into_coroutine(py, async {
        tokio::time::sleep(std::time::Duration::from_secs(10)).await;
        Ok(Python::with_gil(|py| py.None()))
    })
}

#[pyo3_asyncio::tokio::main]
async fn main() -> PyResult<()> {
    // if you need a second coroutine to do stuff in the background try something like this:
    futures::future::join(async { run_python().await.unwrap() }, async {
        // other async stuff here
    })
    .await;

    // if not, this will do it:
    // run_python().await.unwrap();

    Ok(())
}
glennpierce commented 3 years ago

Thank you that's great.