wyfo / pyo3-async

PyO3 bindings to various Python asynchronous frameworks
MIT License
30 stars 1 forks source link

How to use allow_threads with existing stream to avoid dead lock? #4

Open npuichigo opened 10 months ago

npuichigo commented 10 months ago

Recently, I encountered a same issue mentioned here related to dead lock https://github.com/PyO3/pyo3/pull/3540#issuecomment-1789623562

Here I use the par_stream crate to provide a parallel async stream implementation and wrap that with pyo3-async. The par_map returns a flume RecvStream which internally has a lock for the queue.

#[pymodule]
fn snake(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(parallel_stream, m)?)?;
    Ok(())
}

fn tokio() -> &'static tokio::runtime::Runtime {
    use std::sync::OnceLock;
    static RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
    RT.get_or_init(|| tokio::runtime::Runtime::new().unwrap())
}

fn fib(n: u64) -> u64 {
    match n {
        0 => 0,
        1 => 1,
        _ => fib(n - 1) + fib(n - 2),
    }
}

fn map() -> impl Stream<Item = PyResult<u64>> + Send {
    let _guard = tokio().enter();
    futures::stream::iter(0..100).par_map(None, |i| move || Ok(fib(i)))
}

#[pyfunction]
fn parallel_stream() -> pyo3_async::asyncio::AsyncGenerator {
    pyo3_async::asyncio::AsyncGenerator::from_stream(map())
}

Now it has a dead lock and hangs in Python.

import snake
import asyncio

async def fn():
    async for i in snake.parallel_stream():
        print(i)

asyncio.run(fn())

微信图片_20231209225251 微信图片_20231209225245

I have no idea where to add allow_thread here, just adding to parallel_stream seems not work.

BTW, if I change to use a tokio mpsc channel like this:

fn map() -> impl Stream<Item = PyResult<u64>> + Send {
    let (tx, mut rx) = tokio::sync::mpsc::channel(100);
    tokio().spawn(async move {
        futures::stream::iter(0..1000)
            .par_map(
                ParParams {
                    num_workers: 32,
                    buf_size: Some(10),
                },
                |_| move || Ok(fib(45)),
            )
            .for_each(|item| {
                let tx = tx.clone();
                async move {
                    let _ = tx.send(item).await;
                }
            })
            .await;
    });

    stream! {
        while let Some(item) = rx.recv().await {
            yield item;
        }
    }

It works well and no dead lock occurs. I'm not quite sure the essential difference.

wyfo commented 10 months ago

Have you tried pyo3_async::asyncio::AsyncGenerator::from_stream(AllowThreads(map()))?

I've just released the version 0.3.2, because AllowThreads was bugged. Anyway, as you know, the crate is deprecated, so I invite you to use the PyO3 branches I mentioned in #2. I know there is no Stream support planned for now in PyO3, but I should add it soon in a new PR.

It works well and no dead lock occurs. I'm not quite sure the essential difference.

As written in the stack traces you shown, flume uses a mutex, while tokio::sync::mpsc don't. Your deadlock seems to be the following:

I've looked at flume implementation, and indeed, the channel lock is held for the wakeup (I don't understand why honestly). By using a intermediate tokio task with a mpsc, the flume channel doesn't wakeup the async generator __next__ coroutine but the tokio task, so it doesn't need to acquire the GIL.

Your use case is interesting though, because you're the second one I will to make a feedback on the async support in PyO3, and both of us encountered a deadlock. So it may indicate that the issue may be more common as I expected and should require a dedicated documentation. I will talk with PyO3 maintainers about that.

npuichigo commented 10 months ago

Thanks for your detailed reply. I will have a try with PyO3 master. One question is how is the performance? I don't which is better:

  1. use AllowThreads with stream which may have dead lock with GIL.
  2. Add one level of indirectness like my use of mpsc channel.

Since I would like to let Python user randomly combine the exposed stream algorithm, I need to make sure the final one will always avoid dead lock.