vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
800 stars 34 forks source link

Unexpected error when returning from merged stream #65

Closed samfrances closed 3 years ago

samfrances commented 3 years ago

I'm getting an unexpected error when I run a program with the following structure:

import asyncio

from aiostream import stream

async def foo():

    inbox = stream.iterate([1, 2, 3, 4])
    inbox2 = stream.iterate([4, 5, 6, 7])

    merged = stream.merge(inbox, inbox2)

    async with merged.stream() as s:
        async for message in s:
            if message > 5:
                return

async def main():
    await foo()

asyncio.run(main())

The error is as follows:

$ python tester.py 
Traceback (most recent call last):
  File "/home/sam/projects/aiostream_error_example/venv/lib/python3.7/site-packages/aiostream/stream/advanced.py", line 93, in base_combine
    yield result
GeneratorExit

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/sam/projects/aiostream_error_example/venv/lib/python3.7/site-packages/aiostream/stream/advanced.py", line 96, in base_combine
    manager.create_task(streamer)
  File "/home/sam/projects/aiostream_error_example/venv/lib/python3.7/site-packages/aiostream/manager.py", line 62, in __aexit__
    return await self.stack.__aexit__(*args)
  File "/usr/local/lib/python3.7/contextlib.py", line 652, in __aexit__
    raise exc_details[1]
  File "/usr/local/lib/python3.7/contextlib.py", line 635, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/usr/local/lib/python3.7/contextlib.py", line 545, in _exit_wrapper
    await callback(*args, **kwds)
  File "/home/sam/projects/aiostream_error_example/venv/lib/python3.7/site-packages/aiostream/manager.py", line 36, in cancel_task
    await task
StopAsyncIteration

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "tester.py", line 20, in <module>
    asyncio.run(main())
  File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "tester.py", line 18, in main
    await foo()
  File "tester.py", line 15, in foo
    return
  File "/home/sam/projects/aiostream_error_example/venv/lib/python3.7/site-packages/aiostream/aiter_utils.py", line 163, in __aexit__
    await self._aiterator.aclose()
RuntimeError: async generator raised StopAsyncIteration

This error occurs in version 0.3.2 but not in version 0.3.1.

Any help much appreciated.

samfrances commented 3 years ago

For the avoidance of doubt, this also happens if you use pipe.merge():

import asyncio

from aiostream import pipe, stream

async def foo():

    inbox = stream.iterate([1, 2, 3, 4])
    inbox2 = stream.iterate([4, 5, 6, 7])

    merged = inbox | pipe.merge(inbox2)

    async with merged.stream() as s:
        async for message in s:
            print(message)
            if message > 5:
                return

async def main():
    await foo()

asyncio.run(main())
vxgmichel commented 3 years ago

Hi @samfrances,

This is definitely a bug (a pretty big one in my opinion), thanks for reporting it.

I'll try to investigate that as soon as possible, I'll let you know how it goes.

samfrances commented 3 years ago

Thanks @vxgmichel. I did try to have a look at the code in question, but its beyond my current understanding of asyncio, I think. Sorry not to be more help.

vxgmichel commented 3 years ago

@samfrances Well the small reproducible example you provided helped a lot, the problem should now be fixed!

I'll let you know when the new release is available.

vxgmichel commented 3 years ago

v0.4.2 is out, thanks again for the report !

samfrances commented 3 years ago

Thanks for the quick bug fix!