awestlake87 / pyo3-asyncio

Other
300 stars 45 forks source link

Can't run async function from Rust in Python #123

Open cj-zhukov opened 5 months ago

cj-zhukov commented 5 months ago

🐛 Bug Reports

Can't run async function from Rust in Python with error:

RuntimeError: Cannot run the event loop while another loop is running
sys:1: RuntimeWarning: coroutine 'BaseEventLoop.shutdown_asyncgens' was never awaited

What am I doing wrong?

🌍 Environment

💥 Reproducing

Cargo.toml

[package]
name = "bar"
version = "0.1.0"
edition = "2021"

[lib]
name = "bar"
crate-type = ["cdylib"]

[dependencies]
pyo3 = { version = "0.20", features = ["extension-module"] }
pyo3-asyncio = { version = "0.20", features = [
    "tokio-runtime",
    "attributes",
    "pyo3-asyncio-macros",
] }
tokio = { version = "1", features = ["full"] }
datafusion = { version = "36", features = ["pyarrow"] }

lib.rs

use std::sync::Arc;

use pyo3::prelude::*;
use datafusion::arrow::array::{Int32Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::prelude::*;

#[pyfunction]
fn run(py: Python<'_>) -> PyResult<PyArrowType<Vec<RecordBatch>>> {
    pyo3::prepare_freethreaded_python();
    pyo3_asyncio::tokio::run(py, async move {
        let batches = dev().await;
        Ok(batches.into())
    })
}

#[pymodule]
fn bar(_py: Python, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(run, m)?)?;
    Ok(())
}

pub async fn dev() -> Vec<RecordBatch> {
    let ctx = SessionContext::new();

    let schema = Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, true),
    ]);

    let batch = RecordBatch::try_new(
        schema.into(),
        vec![
            Arc::new(Int32Array::from(vec![1, 2, 3])),
            Arc::new(StringArray::from(vec!["foo", "bar", "baz"])),
        ],
    ).unwrap();

    let df = ctx.read_batch(batch).unwrap();
    let res = df.collect().await.unwrap();

    res
}

main.py

import asyncio
import bar

async def main():
    batch = await bar.run()
    df = batch.to_pandas()
    print(df)

asyncio.run(main())

Here is the example without async that works Cargo.toml

[package]
name = "foo"
version = "0.1.0"
edition = "2021"

[lib]
name = "foo"
crate-type = ["cdylib"]

[dependencies]
pyo3 = "0.20.0"
arrow = { version = "50", features = ["prettyprint", "pyarrow"] }

lib.rs

use std::sync::Arc;

use arrow::array::{RecordBatch, StringArray, Int32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::pyarrow;
use pyo3::prelude::*;

#[pyfunction]
fn run() -> PyResult<pyarrow::PyArrowType<RecordBatch>> {
    let batch = dev();
    Ok(batch.into())
}

#[pymodule]
fn foo(_py: Python, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(run, m)?)?;
    Ok(())
}

pub fn dev() -> RecordBatch {
    let id_array = Int32Array::from(vec![1, 2, 3]);
    let name_array = StringArray::from(vec!["foo", "bar", "baz"]);

    let schema = Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, false)
    ]);

    let batch = RecordBatch::try_new(
        Arc::new(schema), 
        vec![
            Arc::new(id_array),
            Arc::new(name_array)
            ])
            .unwrap();

    batch
}

main.py

import foo

def main():
    batch = foo.run()
    df = batch.to_pandas()
    print(df)

main()