vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
800 stars 34 forks source link

Cancelling pending tasks in same TaskGroup causes RuntimeError #73

Closed cstruct closed 3 years ago

cstruct commented 3 years ago

If there are still pending tasks in a TaskGroup when its __aexit__ is called the self._pending set is modified during iteration.

import asyncio

import aiostream

async def failing():
    try:
        await asyncio.sleep(0.1)
        yield
        raise Exception()
    except asyncio.CancelledError:
        pass

async def working():
    for _ in range(4):
        await asyncio.sleep(0.1)
        yield

async def main():
    async with aiostream.stream.merge(*([failing() for _ in range(100)] + [working() in range(100)])).stream() as streams:
        async for _ in streams:
            pass
        pass

asyncio.run(main())

Causes:

Traceback (most recent call last):
...
  File "/home/a/.virtualenvs/dq/lib/python3.8/site-packages/aiostream/manager.py", line 20, in __aexit__
    for task in self._pending:
RuntimeError: Set changed size during iteration

Process finished with exit code 1
vxgmichel commented 3 years ago

Thanks again for the report @cstruct, I'm a bit horrified that the test suite didn't catch such an obvious mistake :sweat_smile:

I'll add a new test for that.

vxgmichel commented 3 years ago

@cstruct I realized while working with the example that you provided that it actually shows not one but three different issues with aiostream, now fixed with PR #75.

The first one is related to a typo:

[working() in range(100)]

This evaluates to False, which is not a valid source. This generated an exception after the failing() sources had been scheduled. With #75, this kind of typo will generate an exception much sooner, when the stream is instantiated.

Thanks again!

cstruct commented 3 years ago

Thank you for quick responses and fixes! With these fixes I can avoid a segfault in aioodbc.