vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
800 stars 34 forks source link

UnboundLocalError when trying to reuse a stream #76

Closed konapun closed 3 years ago

konapun commented 3 years ago

I'm trying to send a stream to multiple processors in series and am getting the following error when the second processor receives the stream:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/aiostream-0.4.3-py3.8.egg/aiostream/core.py", line 35, in wait_stream
UnboundLocalError: local variable 'item' referenced before assignment

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "project/__main__.py", line 50, in <module>
    compare.run(sources)
  File "project/project/simulation/__init__.py", line 31, in run
    comparison.run(sources)
  File "project/project/bot.py", line 24, in run
    asyncio.run(loop())
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "project/project/bot.py", line 22, in loop
    await self._dispatcher.dispatch(sources)
  File "project/project/core/dispatch.py", line 20, in dispatch
    await stream.map(funnel, self._route_data)
  File "/usr/local/lib/python3.8/dist-packages/aiostream-0.4.3-py3.8.egg/aiostream/core.py", line 37, in wait_stream
aiostream.core.StreamEmpty

The project isn't open source at the moment so I can't refer you to the code that produces this error but I will try to create a simplified version to reproduce it.

vxgmichel commented 3 years ago

Hi @konapun and thanks for the report!

I agree that this trace is confusing and it's going to get fixed with #77. But the reason why you're getting the StreamEmpty exception is because you're awaiting an empty stream (stream.map(funnel, self._route_data)). Since there is no last item to return, a StreamEmpty exception is raised instead.

I hope that helps.