erdewit / nest_asyncio

Patch asyncio to allow nested event loops
BSD 2-Clause "Simplified" License
693 stars 79 forks source link

Two or more calls to run_until_complete causes run_until_complete to not exit. #17

Closed allComputableThings closed 4 years ago

allComputableThings commented 4 years ago

In the following, I call n function that re-enter run_until_complete. When n=1, the program exits. When n=2, the program never exits. Is it expected?

import asyncio
import functools
import time

import nest_asyncio
nest_asyncio.apply()

def sync(corot):
    """
    Make a synchronous function from an asynchronous one.
    :param corot:
    :return:
    """
    result, = asyncio.get_event_loop().run_until_complete(asyncio.gather(corot))
    return result

async def sync_to_corountine(func, *args, **kw):
    """
    Make a coroutine from a synchronous function.
    """
    try:
        return func(*args, *kw)
    finally:
        # every async needs an await.
        await asyncio.sleep(0)

def main():
    async def background(timeout):
        await asyncio.sleep(timeout)
        print(f"Background: {timeout}")

    loop = asyncio.get_event_loop()
    # Run some bacground work to check we are never blocked
    bg_tasks = [
        loop.create_task(background(i))
        for i in range(10)
    ]

    async def long_running_async_task(result):
        # Simulate slow IO
        print(f"...START long_running_async_task [{result}]")
        await asyncio.sleep(4)
        print(f"...END   long_running_async_task [{result}]")
        return result

    def sync_function_with_async_dependency(result):
        print(f"...START sync_function_with_async_dependency [{result}]")
        result = sync(long_running_async_task(result))
        print(f"...END   sync_function_with_async_dependency [{result}]")
        return result

    # Call sync_function_with_async_dependency
    # One reentrant task is OK
    # Multiple reentrant tasks->fails to exit
    n = 2
    for i in range(n):
       bg_tasks.append(sync_to_corountine(sync_function_with_async_dependency, i))

    # OK
    # bg_tasks.append(long_running_async_task(123))
    # bg_tasks.append(long_running_async_task(456))

    task = asyncio.gather(*bg_tasks)  # , loop=loop)
    loop.run_until_complete(task)

if __name__ == '__main__':
    main()
allComputableThings commented 4 years ago

Output

...START sync_function_with_async_dependency [0]
...START sync_function_with_async_dependency [1]
Background: 0
...START long_running_async_task [0]
...START long_running_async_task [1]
Background: 1
Background: 2
Background: 3
Background: 4
...END   long_running_async_task [0]
...END   long_running_async_task [1]
...END   sync_function_with_async_dependency [1]
Background: 5
Background: 6
Background: 7
Background: 8
Background: 9

The following never called:

...END   sync_function_with_async_dependency [0]
allComputableThings commented 4 years ago

Also, changing the code to (removing the nested calls), .... everything works fine.

    n = 2
    # for i in range(n):
    #     bg_tasks.append(sync_to_corountine(sync_function_with_async_dependency, i))
    for i in range(n):
        bg_tasks.append(long_running_async_task(i))
allComputableThings commented 4 years ago

The above worked fine witth nest_asyncio==1.0.0 and failed in each release after. (with this list of releases: https://pypi.org/project/nest-asyncio/1.1.0/#history)

allComputableThings commented 4 years ago

Although 1.0.0 also fails with a more complex example, which seems to execute serially (the whole task should take no more than 5 seconds to complete since sync_function_with_async_dependency is recursed at most once.)

import time
from random import randint

    tstart = time.time()

    async def long_running_async_task(result):
        # Simulate slow IO
        print(f"...START long_running_async_task [{result}]")
        await asyncio.sleep(randint(0,100)/50.0)
        print(f"...END   long_running_async_task [{result}]")
        return result

    def sync_function_with_async_dependency(result):
        print(f"...START sync_function_with_async_dependency [{result}]")
        result = sync(long_running_async_task(result), loop=loop)
        result = int(result)
        if (result%4)==0:
            _result = result+1
            assert (_result%4)!=0
            result = sync(long_running_async_task(_result), loop=loop)

        print(f"...END   sync_function_with_async_dependency [{result}] dur={time.time()-tstart}")
        return result

    # Call sync_function_with_async_dependency
    # One reentrant task is OK
    # Multiple reentrant tasks=>fails to exit
    n = 200
    for i in range(n):
        bg_tasks.append(sync_to_corountine(sync_function_with_async_dependency, i))
    # for i in range(n):

... finally fails with:

...END   sync_function_with_async_dependency [5] dur=54.902220487594604
...END   sync_function_with_async_dependency [3] dur=54.90251874923706
...END   sync_function_with_async_dependency [2] dur=54.90284085273743
...END   sync_function_with_async_dependency [1] dur=54.90317440032959
...START long_running_async_task [1]
...END   long_running_async_task [1]
...END   sync_function_with_async_dependency [1] dur=55.66579794883728
Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/base_events.py", line 566, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.7/asyncio/base_events.py", line 534, in run_forever
    self._run_once()
  File "/usr/lib/python3.7/asyncio/base_events.py", line 1756, in _run_once
    handle = self._ready.popleft()
IndexError: pop from an empty deque
erdewit commented 4 years ago

Thanks for the detailed investigation. The issue is with run_until_complete waiting indefinitely on a future that is already finished.

I'll have to think of a solution for this.

erdewit commented 4 years ago

This has been fixed in v1.3, which now uses a design more similar to the earlier 1.0 version.

allComputableThings commented 4 years ago

Many thanks