MagicStack / uvloop

Ultra fast asyncio event loop.
Apache License 2.0
10.46k stars 550 forks source link

"RuntimeError: Racing with another loop to spawn a process" when spawning many processes from multiple threads #508

Open tyilo opened 2 years ago

tyilo commented 2 years ago

We have a multi-threaded application using uvloop that sometimes needs to spawn a lot of processes simultaneously. This results in a lot of "RuntimeError: Racing with another loop to spawn a process" exceptions being raised.

The following is a minimal reproducible example that shows the problem:

import asyncio
from threading import Thread

import uvloop

def create_processes(i):
    async def inner():
        processes = []
        for _ in range(100):
            p = await asyncio.create_subprocess_exec("true")
            processes.append(p)

        for p in processes:
            await p.wait()

    try:
        asyncio.run(inner())
        print(f"[{i}] Success.")
    except Exception as e:
        print(f"[{i}] Fail: {repr(e)}")

def main():
    threads = []
    for i in range(10):
        t = Thread(target=lambda: create_processes(i))
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

uvloop.install()
main()

When running this with PYTHONASYNCIODEBUG=1 I get something like:

Executing <Task pending name='Task-3' coro=<create_processes.<locals>.inner() running at /home/tyilo/repos/uvloop-race/test.py:11> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/lib/python3.10/asyncio/subprocess.py:218> cb=[run_until_complete.<locals>.done_cb()] created at /usr/lib/python3.10/asyncio/tasks.py:636> took 0.132 seconds
Executing <Handle UVProcessTransport._call_connection_made created at /usr/lib/python3.10/asyncio/subprocess.py:218> took 0.133 seconds
Executing <Task pending name='Task-4' coro=<create_processes.<locals>.inner() running at /home/tyilo/repos/uvloop-race/test.py:11> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/lib/python3.10/asyncio/subprocess.py:218> cb=[run_until_complete.<locals>.done_cb()] created at /usr/lib/python3.10/asyncio/tasks.py:636> took 0.142 seconds
Executing <Task pending name='Task-6' coro=<create_processes.<locals>.inner() running at /home/tyilo/repos/uvloop-race/test.py:11> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/lib/python3.10/asyncio/subprocess.py:218> cb=[run_until_complete.<locals>.done_cb()] created at /usr/lib/python3.10/asyncio/tasks.py:636> took 0.147 seconds
[0] Fail: RuntimeError('Racing with another loop to spawn a process.')
Executing <Task pending name='Task-9' coro=<create_processes.<locals>.inner() running at /home/tyilo/repos/uvloop-race/test.py:11> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/lib/python3.10/asyncio/subprocess.py:218> cb=[run_until_complete.<locals>.done_cb()] created at /usr/lib/python3.10/asyncio/tasks.py:636> took 0.157 seconds
Executing <Task pending name='Task-8' coro=<create_processes.<locals>.inner() running at /home/tyilo/repos/uvloop-race/test.py:11> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/lib/python3.10/asyncio/subprocess.py:218> cb=[run_until_complete.<locals>.done_cb()] created at /usr/lib/python3.10/asyncio/tasks.py:636> took 0.175 seconds
Executing <Task pending name='Task-7' coro=<create_processes.<locals>.inner() running at /home/tyilo/repos/uvloop-race/test.py:11> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/lib/python3.10/asyncio/subprocess.py:218> cb=[run_until_complete.<locals>.done_cb()] created at /usr/lib/python3.10/asyncio/tasks.py:636> took 0.176 seconds
[2] Fail: RuntimeError('Racing with another loop to spawn a process.')
[9] Fail: RuntimeError('Racing with another loop to spawn a process.')
[1] Fail: RuntimeError('Racing with another loop to spawn a process.')
[4] Fail: RuntimeError('Racing with another loop to spawn a process.')
[3] Success.
[6] Success.
[8] Success.
[5] Success.
[7] Success.

Without uvloop I get something like:

[3] Success.
[9] Success.
[5] Success.
[7] Success.
[2] Success.
[4] Success.
[8] Success.
[0] Success.
[6] Success.
[1] Success.
ljluestc commented 10 months ago
import asyncio
from threading import Thread

import uvloop

def create_processes(i):
    async def inner():
        processes = []
        for _ in range(100):
            p = await asyncio.create_subprocess_exec("true")
            processes.append(p)

        for p in processes:
            await p.wait()

    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(inner())
        print(f"[{i}] Success.")
    except Exception as e:
        print(f"[{i}] Fail: {repr(e)}")

def main():
    threads = []
    for i in range(10):
        t = Thread(target=lambda: create_processes(i))
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

if __name__ == "__main__":
    uvloop.install()
    main()