vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
801 stars 34 forks source link

merge causes anext(): asynchronous generator is already running Exception #114

Closed sebastianjanisch closed 5 months ago

sebastianjanisch commented 5 months ago

Hi,

I am trying to iterate over several async generators using the merge function. I have tried reproducing the behavior I see in a minimal example but so far haven't managed, so I'll try to describe what I observe as best as I can.

I am using aiostream together with the reflex.dev UI library, which allows the launching of background tasks in an async setting (see here: https://reflex.dev/docs/events/background-events/#background-tasks)

Below code is conceptually what i do in my real example, yet it doesn't produce the error I'm seeing.

The setup is that I have several nested async generators which each at some point acquire a lock using async with self. In reality these tasks make async httpx calls so it makes sense to run them asynchronously.

The error I do observe seems to happen at the async with self stage:

   async with self:
  File "/Users/sjanisch/Work/projects/test/bayesline/code/app/reflexgui/.venv/lib/python3.11/site-packages/reflex/state.py", line 2028, in __aexit__
    await self._self_actx.__aexit__(*exc_info)
  File "/Users/sjanisch/.pyenv/versions/3.11.8/lib/python3.11/contextlib.py", line 217, in __aexit__
    await anext(self.gen)
RuntimeError: anext(): asynchronous generator is already running

It's unfortunate that I can't reliably reproduce this but maybe there are thoughts around what the cause might be.

class State(rx.State):

    task1_text: str = "Task 1"
    task2_text: str = "Task 2"

    async def on_load(self):
        pass

    async def task1(self):
        async with self:
            self.task1_text = "Task 1 Running"
            yield

        await asyncio.sleep(2)

        async with self:
            self.task1_text = "Task 1 50%"
            yield

        await asyncio.sleep(2)

        async with self:
            self.task1_text = "Task 1 Done"
            yield

    async def task2(self):
        async with self:
            self.task2_text = "Task 2 Running"
            yield

        await asyncio.sleep(2)

        async with self:
            self.task2_text = "Task 2 50%"
            yield

        await asyncio.sleep(2)

        async with self:
            self.task2_text = "Task 2 Done"
            yield

    @rx.background
    async def start_tasks(self):
        tasks = stream.merge(self.task1(), self.task2())
        async with tasks.stream() as streamer:
            async for result in streamer:
                yield result

@rx.page(route="/test", on_load=State.on_load)
def page() -> rx.Component:
    return rx.vstack(
        rx.text(State.task1_text),
        rx.text(State.task2_text),
        rx.button("Start Tasks", on_click=State.start_tasks),
    )
vxgmichel commented 5 months ago

Hi @sebastianjanisch,

My guess is that an async generator somehow gets shared between task1 and task2, most probably because both tasks use the same async with self.

So if both tasks try to enter or exit at the same time, the second task won't be able to access the async generator as it's already running in the first one.

I guess you could easily check that by protecting self with an async lock and see whether the problem remains. However, the sharing of the async generator is very suspicious so there's probably another problem somewhere else.

I'm not familiar with reflex but I don't think the problem comes from aiostream. My guess is that you would observe the same issue using asyncio.gather(self.task1(), self.task2()).

Hope this helps, and feel free to re-open the issue if you have more elements pointing towards aiostream.

sebastianjanisch commented 5 months ago

Hi @vxgmichel thanks for responding so quickly. You are right, I slapped a asyncio lock around the async with self block (i.e. async with _lock, self: which did the trick. I'll relay to the guys from Reflex to see if they want to investigate on their side.