maxfischer2781 / asyncstdlib

the missing toolbox for an async world
https://asyncstdlib.readthedocs.io
MIT License
234 stars 21 forks source link

`tee()`'s iterators throw RuntimeError when consumed in parallel #86

Closed h4l closed 2 years ago

h4l commented 2 years ago

Thanks for this library, I've been using it on a project recently and found it really useful.

The tee() function seems to have a bug. If the source is an async generator that awaits in-between yields, and the tee's iterators are consumed in parallel, the tee's iterators will throw RuntimeError: anext(): asynchronous generator is already running.

What's happening is that one of the tee()'s peer iterators calls __anext__() on the source, then while that call is being awaited, a second peer iterator also calls __anext__(). If the source is an async generator, it will throw a RuntimeError, as overlapping calls are not allowed.

Here's a test case that triggers the error: https://github.com/h4l/asyncstdlib/commit/680429354fce7ad22c535df74ff503c9f38ec313

And an asyncio program that triggers it:

# tee_bug_asyncio_repo.py
import asyncio
import sys

import asyncstdlib as a

async def slow_iterable():
    for x in range(3):
        await asyncio.sleep(0.1)
        yield x

async def main(*, n):
    print(f"tee() with {n=}")
    async with a.tee(slow_iterable(), n=n) as iterables:
        results = await asyncio.gather(
            *(asyncio.create_task(a.list(iterable)) for iterable in iterables)
        )
        print(results)

if __name__ == "__main__":
    n = 2 if len(sys.argv) < 2 else int(sys.argv[1])
    asyncio.run(main(n=n))
$ python tee_bug_asyncio_repo.py 1
tee() with n=1
[[0, 1, 2]]

$ python tee_bug_asyncio_repo.py 2
tee() with n=2
Traceback (most recent call last):
  File "/workspaces/asyncstdlib/tee_bug_asyncio_repo.py", line 16, in main
    results = await asyncio.gather(
  File "/workspaces/asyncstdlib/asyncstdlib/builtins.py", line 656, in list
    return [element async for element in aiter(iterable)]
  File "/workspaces/asyncstdlib/asyncstdlib/builtins.py", line 656, in <listcomp>
    return [element async for element in aiter(iterable)]
  File "/workspaces/asyncstdlib/asyncstdlib/itertools.py", line 307, in tee_peer
    item = await iterator.__anext__()
RuntimeError: anext(): asynchronous generator is already running

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/workspaces/asyncstdlib/tee_bug_asyncio_repo.py", line 24, in <module>
    asyncio.run(main(n=n))
  File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/workspaces/asyncstdlib/tee_bug_asyncio_repo.py", line 15, in main
    async with a.tee(slow_iterable(), n=n) as iterables:
  File "/workspaces/asyncstdlib/asyncstdlib/itertools.py", line 390, in __aexit__
    await self.aclose()
  File "/workspaces/asyncstdlib/asyncstdlib/itertools.py", line 395, in aclose
    await child.aclose()
RuntimeError: aclose(): asynchronous generator is already running
h4l commented 2 years ago

I'm not sure how this can be fixed in a framework-agnostic way. Is there a way to have two coroutines await a common awaitable without using a construct that has knowlege of the underlying event loop (like an asyncio.Future)?

For my project I'm only using asyncio so for the moment I've vendorised tee() and fixed my copy using an asyncio.Lock like this:

async def tee_peer(
    iterator: AsyncIterator[T],
    buffer: Deque[T],
    peers: List[Deque[T]],
    iterator_lock: Lock,
) -> AsyncGenerator[T, None]:
    """An individual iterator of a :py:func:`~.tee`"""
    try:
        while True:
            if not buffer:
                # We need to serialise access to iterator.__anext__ as it's an
                # error for an async generator to be run while it's already
                # running.
                async with iterator_lock:
                    # Check again now that we've acquired the lock, as another
                    # peer may have populated the buffer while we were waiting.
                    if not buffer:
                        try:
                            item = await iterator.__anext__()
                        except StopAsyncIteration:
                            break
                        else:
                            # Append to all buffers, including our own. We'll
                            # fetch our item from the buffer again, instead of
                            # yielding it directly. This ensures the proper item
                            # ordering if any of our peers are fetching items
                            # concurrently. They may have buffered their item
                            # already.
                            for peer_buffer in peers:
                                peer_buffer.append(item)
            yield buffer.popleft()
    finally:
        # this peer is done – remove its buffer
        for idx, peer_buffer in enumerate(peers):  # pragma: no branch
            if peer_buffer is buffer:
                peers.pop(idx)
                break
        if not peers and hasattr(iterator, "aclose"):
            await iterator.aclose()  # type: ignore[attr-defined]
class Tee(Generic[T]):
    # ...
    def __init__(self, iterable: AnyIterable[T], n: int = 2):
        self._iterator = aiter(iterable)
        self._buffers: List[Deque[T]] = [deque() for _ in range(n)]
        iterator_lock = Lock()
        self._children = tuple(
            tee_peer(
                iterator=self._iterator,
                buffer=buffer,
                peers=self._buffers,
                iterator_lock=iterator_lock,
            )
            for buffer in self._buffers
        )
maxfischer2781 commented 2 years ago

Hi @h4l, thanks for the report and digging into this a bit. Indeed, that is a known limitation of the way tee (and generally asyncstdlib) works. I have pondered adding a templating mechanism, but it's still a very early draft.

As a practical solution, Tee could simply take an optional Context/Lock that defaults to a nullcontext. This would leave things framework-agnostic while allowing to specialise this case as needed. Would that be suitable for your use-case?

h4l commented 2 years ago

Thanks @maxfischer2781. For my use-case (just needing to support asyncio) I could certainly pass an argument to specify an asyncio-specific lock implementation (or similar). From my point of view, I'd expect it to be a required option for tee(), or at least for tee() to loudly throw an exception if it was used in a way that required locking but without a lock implementation specified.

However it would be nice if a default framework-specific implementation could be used automatically. I'm not sure if this kind of thing is what you're investigating with a templating mechanism, but thinking about this just now, I wonder if the contextvars module might help to automatically resolve a framework-specific construct, like a lock implementation. For example, asyncstdlib could provide an API to enable framework-specific logic, which could use contextvars under the hood to maintain the active implementation to be used in the current context.

In principle it should be possible to provide a default context by inspecting the call stack to see if the current function was called from asyncio / trio etc. And to cache that result in the context, to avoid looking it up repeatedly.

And as well as bundling implementations for whichever frameworks were supported out of the box, the API could allow a user to pass their own implementation for whichever async framework they're using.

I can see this kind of thing could get pretty complex and magical though! So an explicit argument would be a pragmatic solution, especially if framework-specific behaviour isn't needed elsewhere.