agronholm / anyio

High level asynchronous concurrency and networking framework that works on top of either trio or asyncio
MIT License
1.78k stars 135 forks source link

Memory object streams deadlock on asyncio #771

Closed gschaffner closed 1 month ago

gschaffner commented 1 month ago

Things to check first

AnyIO version

master (439951d9babf7615f0720fd43c93c2a695549bd4)

Python version

3.12.5 (CPython)

What happened?

Some applications started deadlocking recently when updating AnyIO to 4.4.0. It bisects to #735.

(Aside: Apologies I did not follow-up in the discussions about #728 in May. I got unexpectedly busy at the time :/ )

How can we reproduce the bug?

(This reproducer could be made smaller but I think it would sacrifice readability)

from __future__ import annotations

from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from types import TracebackType
from typing import Final
from typing import TypeVar
from typing import cast

from typing_extensions import Self

import anyio
import anyio.lowlevel
from anyio import CancelScope
from anyio.abc import TaskStatus
from anyio.streams.memory import MemoryObjectSendStream

E = TypeVar("E", bound=BaseException)

class A:
    # `A`'s API is pretty similar to `ObjectSendStream[str]`, except it requires
    # `__aexit__` (it doesn't support `aclose`).

    def __init__(self) -> None:
        super().__init__()
        self._acm: Final = self._acm_impl()

    async def __aenter__(self) -> Self:
        # TODO: Mypy bug?
        return await self._acm.__aenter__()  # type: ignore[return-value]

    async def __aexit__(
        self,
        exc_type: type[E] | None,
        exc_value: E | None,
        traceback: TracebackType | None,
    ) -> bool | None:
        return await self._acm.__aexit__(exc_type, exc_value, traceback)

    @asynccontextmanager
    async def _acm_impl(self) -> AsyncGenerator[Self]:
        async with anyio.create_task_group() as task_group:
            outgoing_service_cancel_scope, self._outgoing_send_stream = cast(
                tuple[CancelScope, MemoryObjectSendStream[str]],
                await task_group.start(self._serve_outgoing),
            )
            with self._outgoing_send_stream:
                try:
                    yield self
                finally:
                    # The `async with` body has exited, so if there is a pending
                    # cancellation request it can be allowed to reach the service task
                    # now.
                    outgoing_service_cancel_scope.shield = False
                    # If the `async with A()` isn't already in a cancelled scope, we also
                    # need to request the service task to shut down.
                    task_group.cancel_scope.cancel()

    async def _serve_outgoing(
        self,
        *,
        task_status: TaskStatus[tuple[CancelScope, MemoryObjectSendStream[str]]],
    ) -> None:
        (
            outgoing_send_stream,
            outgoing_receive_stream,
        ) = anyio.create_memory_object_stream[str](0)
        # The cancel scope here is a service task group pattern
        # (https://github.com/python-trio/trio/issues/1521). This service task must not
        # receive cancellation until the body of the `async with A()` block has finished
        # exiting first, because this service task must remain available in any
        # `finally`/etc. blocks of the `async with A()` body.
        with outgoing_receive_stream, CancelScope(shield=True) as cancel_scope:
            task_status.started((cancel_scope, outgoing_send_stream))
            async for msg in outgoing_receive_stream:
                print(f"outgoing stream driver handling outgoing message: {msg!r}")
                await anyio.lowlevel.checkpoint()

    async def send(self, msg: str, /) -> None:
        # Note: If `_serve_outgoing` crashes due to external causes, this will raise
        # `BrokenResourceError`.
        await self._outgoing_send_stream.send(msg)

async def main() -> None:
    with CancelScope() as cancel_scope:
        async with A() as a:
            try:
                await a.send("message during normal operation!")
                ...
                # Suppose that at some point there's a cancellation request from above
                # (e.g. a signal handler requesting process shutdown):
                cancel_scope.cancel()
            finally:
                # A bit of time is needed here for the scheduling order that triggers
                # the deadlock to be hit. (In the real code, there are more awaits here
                # and the deadlock is hit (empirically) every time.)
                n = 0
                while not a._outgoing_send_stream._state.waiting_receivers:
                    n += 1
                    await anyio.lowlevel.cancel_shielded_checkpoint()
                print(f"{n} scheduling rounds")

                with CancelScope(shield=True):
                    await a.send("message during cleanup!")

# Works on Trio; deadlocks on asyncio.
anyio.run(main, backend="asyncio")
gschaffner commented 1 month ago

The underlying bug is that TaskInfo.has_pending_cancellation is returning false positives with shields on asyncio. The false positive causes MemoryObjectSendStream.send to think that the receiver has a pending cancellation (even though the receiver is shielded), so it ignores the receiver:

https://github.com/agronholm/anyio/blob/439951d9babf7615f0720fd43c93c2a695549bd4/src/anyio/streams/memory.py#L215-L220