erdewit / nest_asyncio

Patch asyncio to allow nested event loops
BSD 2-Clause "Simplified" License
681 stars 78 forks source link

Ready queue mutation, IndexError: pop from an empty deque #11

Closed corydolphin closed 4 years ago

corydolphin commented 5 years ago

Hey,

First, thanks for sharing the patch and publishing this library. I'm trying to workaround re-entrancy while converting a codebase incrementally to asyncio, where there are call chains of async > loop.run_until_complete(async).

I've hit an issue with the patch with handles from the _ready queue seemingly being popped from elsewhere. I.e. from python3.6/asyncio/base_events.py popping from the ready queue throws, because it iterates too far.

        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft() # throws.
            if handle._cancelled:
                continue

The full exception stack trace looks like this:

return aio_batcher.run_until_complete(aio_batcher.evaluate_coroutines(json_data))
  File "/Users/cory/src/nd/util/aio_batcher/decorators.py", line 50, in run_until_complete
    return loop.run_until_complete(awaitable)
  File "/var/lib/conda/envs/nextdoor3/lib/python3.6/site-packages/nest_asyncio.py", line 63, in run_until_complete
    return self._run_until_complete_orig(future)
  File "/var/lib/conda/envs/nextdoor3/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
    self.run_forever()
  File "/var/lib/conda/envs/nextdoor3/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
    self._run_once()
  File "/var/lib/conda/envs/nextdoor3/lib/python3.6/asyncio/base_events.py", line 1411, in _run_once
    handle = self._ready.popleft()
IndexError: pop from an empty deque

My hypothesis is that one of the enqueued items triggers the nest_async, or else an enqueued item is processing other elements from the queue.

It is the worst kind of problem where it is reproducible, but I have yet to isolate a simple test case to make it easy to understand. Do you have a hunch of what might be happening? Perhaps something you ran into while writing the patch?

Thanks for your help, Cory

erdewit commented 5 years ago

Hi Cory,

There could be an issue with async generators. Are you by any chance using async iterators? (async for syntax).

corydolphin commented 5 years ago

Hey Ewald,

We are not using async generators or iterators. Of note, if we change the code path to always use the patched loop, i.e. never to use the run_until_complete_orig path here Then the issue does not occur.

ostruk commented 4 years ago

I've got the same issue. Using ib_insync together with websockets server. Have loop.run_forver() at the end of my code. Happens sporadically so difficult to diagnose, but doesn't appear to happen if I cut websockets out of equation. I'll sprinkle logging in asyncio and nest_asyncio and try to catch it in the act.

I don't fully understand how nest_asyncio supposed to work, but it appears it's modifying the same collection as asyncio (self._ready) when running patched run_once and occasionally it removes items from deque while the loop in asyncio is running, thus causing the exception. Are there some restrictions or rules that need to be followed when nesting? Also my stack is different in a way where there's no nest_asyncio involved: it's just loop.run_forever() -> run_once() -> crash.

My code, basically:

util.patchAsyncio()  
loop = asyncio.get_event_loop()  
...
self.bars.updateEvent += self.onTick #receive periodic updates from IB
loop.run_until_complete(websockets.serve(self.tick, '127.0.0.1', port))  
loop.run_forever()  
erdewit commented 4 years ago

The repo has been updated to follow Cory's approach of always using the patched loop. It also patches run_forever now instead of run_until_complete and with this it can stay closer to the base asyncio code.

I believe this should fix the issues, if it doesn't let me know. I'll keep this bug open for a while and when it works okay a 1.1 version will be released.

ostruk commented 4 years ago

Thanks! But it appears to still be happening, now as such:

File "server.py", line 233, in start
    loop.run_forever()
  File "nest_asyncio.py", line 90, in run_forever_37
    self._run_once()
  File "base_events.py", line 1760, in _run_once
    handle = self._ready.popleft()
IndexError: pop from an empty deque

I'll see if I can get more detailed output next week.

ostruk commented 4 years ago

Ok, boiled it down to this base case. Gonna think about how to go around it, but if you've got any ideas it would be greatly appreciated.

import asyncio
import nest_asyncio
nest_asyncio.apply()
loop = asyncio.get_event_loop()

async def func1 ():
    loop.run_until_complete(asyncio.sleep(5))

async def func2 ():
    loop.run_until_complete(asyncio.sleep(0.1))
    await asyncio.sleep(0.5)

async def start():
    await asyncio.gather(func1(),func2())

asyncio.run(start())

Cheers

ostruk commented 4 years ago

I tagged loops with id's so I could track what they're doing as well as track any operations done on the tasks list (appends in _call_soon). What is happening is that you get a nested a loop where outer loop has 2 tasks and when the first task runs it clears the second task in the internal loop, so when internal loop finishes there's nothing to pop from deque. Here's my output log. Here id==2 is the outer loop and id==3 is the inner loop.


2019-08-15 17:06:14,742 - server - DEBUG - _call_soon append: <Handle _patch_task.<locals>.step()>
2019-08-15 17:06:14,743 - server - DEBUG - run_forever_37 start with id:1. self._ready length: 1
2019-08-15 17:06:14,743 - server - DEBUG - _run_once base_events with id: 1
2019-08-15 17:06:14,743 - server - DEBUG - popping 1 tasks in loop with id:1
2019-08-15 17:06:14,743 - server - DEBUG - about to pop task: 1/1 in loop with id:1. self._ready length: 1
2019-08-15 17:06:14,743 - server - DEBUG - popped task 1/1 in loop with id:1. self._ready length: 0
2019-08-15 17:06:14,743 - server - DEBUG - _call_soon append: <Handle _patch_task.<locals>.step()>
2019-08-15 17:06:14,743 - server - DEBUG - _call_soon append: <Handle _patch_task.<locals>.step()>
2019-08-15 17:06:14,744 - server - DEBUG - done pop of 1 in loop with id :1. self._ready length: 2
2019-08-15 17:06:14,744 - server - DEBUG - _run_once base_events with id: 1
2019-08-15 17:06:14,744 - server - DEBUG - popping 2 tasks in loop with id:1
2019-08-15 17:06:14,744 - server - DEBUG - about to pop task: 1/2 in loop with id:1. self._ready length: 2
2019-08-15 17:06:14,744 - server - DEBUG - popped task 1/2 in loop with id:1. self._ready length: 1
2019-08-15 17:06:14,744 - server - DEBUG - _call_soon append: <Handle _patch_task.<locals>.step()>
2019-08-15 17:06:14,744 - server - DEBUG - run_forever_37 start with id:2. self._ready length: 2
2019-08-15 17:06:14,744 - server - DEBUG - _run_once base_events with id: 2
2019-08-15 17:06:14,744 - server - DEBUG - popping 2 tasks in loop with id:2
2019-08-15 17:06:14,744 - server - DEBUG - about to pop task: 1/2 in loop with id:2. self._ready length: 2
2019-08-15 17:06:14,745 - server - DEBUG - popped task 1/2 in loop with id:2. self._ready length: 1
2019-08-15 17:06:14,745 - server - DEBUG - _call_soon append: <Handle _patch_task.<locals>.step()>
2019-08-15 17:06:14,745 - server - DEBUG - run_forever_37 start with id:3. self._ready length: 2
2019-08-15 17:06:14,745 - server - DEBUG - _run_once base_events with id: 3
2019-08-15 17:06:14,745 - server - DEBUG - popping 2 tasks in loop with id:3
2019-08-15 17:06:14,745 - server - DEBUG - about to pop task: 1/2 in loop with id:3. self._ready length: 2
2019-08-15 17:06:14,745 - server - DEBUG - popped task 1/2 in loop with id:3. self._ready length: 1
2019-08-15 17:06:14,745 - server - DEBUG - done pop of 1 in loop with id :3. self._ready length: 1
2019-08-15 17:06:14,745 - server - DEBUG - about to pop task: 2/2 in loop with id:3. self._ready length: 1
2019-08-15 17:06:14,746 - server - DEBUG - popped task 2/2 in loop with id:3. self._ready length: 0
2019-08-15 17:06:14,746 - server - DEBUG - done pop of 2 in loop with id :3. self._ready length: 0
2019-08-15 17:06:14,746 - server - DEBUG - _run_once base_events with id: 3
2019-08-15 17:06:16,746 - server - DEBUG - _run_once append
2019-08-15 17:06:16,746 - server - DEBUG - popping 1 tasks in loop with id:3
2019-08-15 17:06:16,746 - server - DEBUG - about to pop task: 1/1 in loop with id:3. self._ready length: 1
2019-08-15 17:06:16,746 - server - DEBUG - popped task 1/1 in loop with id:3. self._ready length: 0
2019-08-15 17:06:16,746 - server - DEBUG - _call_soon append: <Handle Task.__wakeup(<Future finished result=None>)>
2019-08-15 17:06:16,746 - server - DEBUG - done pop of 1 in loop with id :3. self._ready length: 1
2019-08-15 17:06:16,746 - server - DEBUG - _run_once base_events with id: 3
2019-08-15 17:06:16,747 - server - DEBUG - popping 1 tasks in loop with id:3
2019-08-15 17:06:16,747 - server - DEBUG - about to pop task: 1/1 in loop with id:3. self._ready length: 1
2019-08-15 17:06:16,747 - server - DEBUG - popped task 1/1 in loop with id:3. self._ready length: 0
2019-08-15 17:06:16,747 - server - DEBUG - _call_soon append: <Handle _run_until_complete_cb(<Task finishe...> result=None>) at base_events.py:158>
2019-08-15 17:06:16,747 - server - DEBUG - done pop of 1 in loop with id :3. self._ready length: 1
2019-08-15 17:06:16,747 - server - DEBUG - _run_once base_events with id: 3
2019-08-15 17:06:16,747 - server - DEBUG - popping 1 tasks in loop with id:3
2019-08-15 17:06:16,747 - server - DEBUG - about to pop task: 1/1 in loop with id:3. self._ready length: 1
2019-08-15 17:06:16,748 - server - DEBUG - popped task 1/1 in loop with id:3. self._ready length: 0
2019-08-15 17:06:16,748 - server - DEBUG - done pop of 1 in loop with id :3. self._ready length: 0
2019-08-15 17:06:16,748 - server - DEBUG - run_forever_37 done with id: 3. self._ready length: 0
2019-08-15 17:06:16,748 - server - DEBUG - done func2 1
2019-08-15 17:06:16,748 - server - DEBUG - done pop of 1 in loop with id :2. self._ready length: 0
2019-08-15 17:06:16,748 - server - DEBUG - about to pop task: 2/2 in loop with id:2. self._ready length: 0
2019-08-15 17:06:16,750 - server - ERROR - exception:
IndexError: pop from an empty deque

Hope this helps.

Edit: "done func2 1" runs right before await asyncio.sleep(0.5) in func2()

ostruk commented 4 years ago

For now I just wrapped it in a try catch, not sure if this will have some unintended side-effects though. I'm thinking not since it the task was already processed.

try:
    self._run_once()
except IndexError as e:
    pass

My question is: is it an issue in general that there are situations where loops cross-process each others tasks? A better solution might be to localize task lists to their own loops, so that each loop is only concerned with it's own task list. Thoughts?

erdewit commented 4 years ago

Thanks very much Ostruk for the test case, the analysis and the solution of catching the IndexError.

In the test case there are two run_until_completes running inside an outer run_until_complete. This is something that I have never considered. The hidden assumption was that the loops are always nicely nested, just like Russian dolls, and the test case is the equivalent of putting two dolls side by side inside another doll.

I agree that there should be no unintended side effects of ignoring the IndexError, since, as you say, all tasks have been processed already and it doesn't really matter in which nesting this happens.

What does matter though is that the handles are run in the order that they have become ready. This is why there is only one ready queue. Having a queue per nesting would mean that the inner nesting blocks the outer nesting for as long as the inner one runs.

There's a new release (v1.1.0) with the updated code. Thanks again Ostruk for having made this possible. I'll keep this bug open for a while in case there are any unforeseen consequences.

ostruk commented 4 years ago

Happy to help!