erdewit / nest_asyncio

Patch asyncio to allow nested event loops
BSD 2-Clause "Simplified" License
693 stars 79 forks source link

strange hanging behavior with `nest_asyncio` when used in combination with `concurrent.futures `ProcessPoolExecutor` #81

Closed dotsdl closed 1 year ago

dotsdl commented 1 year ago

I am working on a user-facing Python client to a RESTful API (https://github.com/openforcefield/alchemiscale/pull/178), and for the kinds of objects the client must pull down from the server the operation quickly becomes CPU-bound due to deserialization and client-side processing. To make efficient use of resources, we are making use of a ProcessPoolExecutor from concurrent.futures to distribute related groups of requests across multiple processes, and within each process we are performing the requests themselves concurrently using asyncio.

This all seems to work really well, except there appears to be an odd interaction with nest_asyncio that causes the ProcessPoolExecutor to hang under certain, repeatable conditions. I've included an MWE below to illustrate, using two different async HTTP client libraries (httpx and aiohttp):

# mwe.py

import asyncio
from typing import List

import nest_asyncio
import httpx
import aiohttp

nest_asyncio.apply()

def get_urls_httpx(urls: List[str]):

    async def async_request():
        session = httpx.AsyncClient()
        try:
            coros = [_get_url_httpx(url, session) for url in urls]
            result = await asyncio.gather(*coros)
        finally:
            await session.aclose()

        return result

    return asyncio.run(async_request())

async def _get_url_httpx(url, session):
    resp = await session.get(url, timeout=None)
    return resp.status_code

def get_urls_aiohttp(urls: List[str]):

    async def async_request():
        async with aiohttp.ClientSession() as session:
            coros = [_get_url_aiohttp(url, session) for url in urls]
            result = await asyncio.gather(*coros)

        return result

    return asyncio.run(async_request())

async def _get_url_aiohttp(url, session):
    async with session.get(url) as resp:
        return resp.status

def get_all_urls(all_urls: List[List[str]], func):
    from concurrent.futures import ProcessPoolExecutor, as_completed

    with ProcessPoolExecutor() as executor:
        futures = [executor.submit(func, urls) for urls in all_urls]

        print("waiting on futures")
        results = []
        for future in as_completed(futures):
            print("future", future)
            result = future.result()
            results.append(result)

    return results

The get_all_urls function is effectively what we're doing to handle groups of requests. The following illustrates the problem:

>>> import mwe

# both of these work as expected
>>> mwe.get_all_urls([['https://google.com', 'https://archlinux.org']], mwe.get_urls_httpx)
waiting on futures
future <Future at 0x7fdde2baf990 state=finished returned list>
[[301, 200]]
>>> mwe.get_all_urls([['https://google.com', 'https://archlinux.org']], mwe.get_urls_aiohttp)
waiting on futures
future <Future at 0x7fdde2baf4d0 state=finished returned list>
[[200, 200]]

# if we call these directly, they also work
>>> mwe.get_urls_httpx(['https://google.com', 'https://archlinux.org'])
[301, 200]
>>> mwe.get_urls_aiohttp(['https://google.com', 'https://archlinux.org'])
[200, 200]

# but if we repeat either of the first two function calls, they will hang
>>> mwe.get_all_urls([['https://google.com', 'https://archlinux.org']], mwe.get_urls_aiohttp)
waiting on futures

The hanging above does not happen in the absence of nest_asyncio.apply(). However, since many of our users use Jupyter notebooks to do their work with the HTTP client I'm working on, this appears to be the only way currently to get calls to asyncio.run to work from within a Jupyter IPython kernel.

Any ideas as to what may be happening here? Is there some kind of strange state being left behind due to nest_asyncio.apply and the use of asyncio.run in the main process that's impacting use of a ProcessPoolExecutor?

erdewit commented 1 year ago

Not sure where the strange behavior stems from. But generally one can expect the strangest things with ProcessPool with the Unix default of process forking. It will fork the global state into the child processes. When using the spawn method for child processes, the problem becomes serialization of functions as used by the task.

I would suggest to use something else, for example the vastly superior (ahem) distex Pool. This simplifies the code and it becomes possible to run tasks on any other machine that happens to run a SSH server:

import asyncio
from typing import List

import distex

import nest_asyncio
nest_asyncio.apply()

async def get_urls_httpx(urls):
    import httpx
    import asyncio

    async def _get_url_httpx(url, session):
        resp = await session.get(url, timeout=None)
        return resp.status_code

    session = httpx.AsyncClient()
    try:
        coros = [_get_url_httpx(url, session) for url in urls]
        result = await asyncio.gather(*coros)
    finally:
        await session.aclose()
    return result

async def get_urls_aiohttp(urls: List[str]):
    import asyncio
    import aiohttp

    async def _get_url_aiohttp(url, session):
        async with session.get(url) as resp:
            return resp.status

    async with aiohttp.ClientSession() as session:
        coros = [_get_url_aiohttp(url, session) for url in urls]
        result = await asyncio.gather(*coros)
    return result

def get_all_urls(all_urls: List[List[str]], func):
    with distex.Pool() as pool:
        results = list(pool.map(func, all_urls, ordered=False))
    return results
dotsdl commented 1 year ago

Thanks for this @erdewit! I'll consider alternative approaches, in particular non-forking approaches for the process pool.

dotsdl commented 1 year ago

For the record: switching get_all_urls above to force use of 'spawn' as multiprocessing context doesn't show the problem this issue raises:

def get_all_urls(all_urls: List[List[str]], func):
    import multiprocessing as mp
    from concurrent.futures import ProcessPoolExecutor, as_completed

    ctx = mp.get_context('spawn')

    with ProcessPoolExecutor(mp_context=ctx) as executor:
        futures = [executor.submit(func, urls) for urls in all_urls]

        print("waiting on futures")
        results = []
        for future in as_completed(futures):
            print("future", future)
            result = future.result()
            results.append(result)

    return results