vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
801 stars 34 forks source link

RuntimeError: athrow(): asynchronous generator is already running #61

Closed o2genum closed 4 years ago

o2genum commented 4 years ago

On Python 3.8 this code throws `RuntimeError: athrow(): asynchronous generator is already running:

import asyncio
from aiostream.stream import merge

async def emit_events():
    while True:
        await asyncio.sleep(0.5)
        yield 'event'

async def run_stuff():
    async with merge(emit_events(), emit_events()).stream() as s:
        async for event in s:
            print(event)

async def main():
    await asyncio.wait_for(run_stuff(),timeout=3)

if __name__ == '__main__':
    asyncio.run(main())

Full stacktrace:

Traceback (most recent call last):
  File "kek.py", line 25, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
    return future.result()
  File "kek.py", line 20, in main
    await asyncio.wait_for(run_stuff(),timeout=3)
  File "/usr/lib/python3.8/asyncio/tasks.py", line 490, in wait_for
    raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<run_stuff() done, defined at kek.py:14> exception=RuntimeError('athrow(): asynchronous generator is already running')>
Traceback (most recent call last):
  File "/home/o2genum/Desktop/venv/lib/python3.8/site-packages/aiostream/stream/advanced.py", line 55, in base_combine
    streamer, task = await manager.wait_single_event(filters)
  File "/home/o2genum/Desktop/venv/lib/python3.8/site-packages/aiostream/manager.py", line 78, in wait_single_event
    done = await self.group.wait_any(tasks)
  File "/home/o2genum/Desktop/venv/lib/python3.8/site-packages/aiostream/manager.py", line 24, in wait_any
    done, _ = await asyncio.wait(tasks, return_when="FIRST_COMPLETED")
  File "/usr/lib/python3.8/asyncio/tasks.py", line 426, in wait
    return await _wait(fs, timeout, return_when, loop)
  File "/usr/lib/python3.8/asyncio/tasks.py", line 523, in _wait
    await waiter
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.8/contextlib.py", line 662, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/home/o2genum/Desktop/venv/lib/python3.8/site-packages/aiostream/aiter_utils.py", line 154, in __aexit__
    await self._aiterator.athrow(typ, value, traceback)
RuntimeError: athrow(): asynchronous generator is already running

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "kek.py", line 16, in run_stuff
    async for event in s:
  File "/home/o2genum/Desktop/venv/lib/python3.8/site-packages/aiostream/stream/advanced.py", line 96, in base_combine
    manager.create_task(streamer)
  File "/home/o2genum/Desktop/venv/lib/python3.8/site-packages/aiostream/manager.py", line 62, in __aexit__
    return await self.stack.__aexit__(*args)
  File "/usr/lib/python3.8/contextlib.py", line 679, in __aexit__
    raise exc_details[1]
  File "/usr/lib/python3.8/contextlib.py", line 662, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/home/o2genum/Desktop/venv/lib/python3.8/site-packages/aiostream/aiter_utils.py", line 154, in __aexit__
    await self._aiterator.athrow(typ, value, traceback)
RuntimeError: athrow(): asynchronous generator is already running
vxgmichel commented 4 years ago

Hi @o2genum and thanks for the report!

I'm investigating this issue. I've able to reproduce with python 3.8 and the latest master.

Also, I noticed it does not happen with on the v1 development branch.

I'll let you know as soon as it is fixed.

vxgmichel commented 4 years ago

Fixed in release v0.4.1 (PR #62).

Thanks again for the report @o2genum !