agronholm / anyio

High level asynchronous concurrency and networking framework that works on top of either trio or asyncio
MIT License
1.8k stars 137 forks source link

pytest-anyio and crashed background task in taskgroup fixture #805

Open jakkdl opened 2 weeks ago

jakkdl commented 2 weeks ago

Things to check first

AnyIO version

4.6.0

Python version

3.12.4

What happened?

I'm encountering several weird things, where it will either hang in weird places or crash.

This is from trying to rewrite pytest-trio and encountering the test that was added after https://github.com/python-trio/pytest-trio/pull/77 in https://github.com/python-trio/pytest-trio/pull/83

How can we reproduce the bug?

import anyio
import pytest
from contextlib import asynccontextmanager

my_event = anyio.Event()
async def die_soon(task_status):
    task_status.started()
    await my_event.wait()
    raise RuntimeError('OOPS')

@asynccontextmanager
async def my_simple_fixture():
    async with anyio.create_task_group() as tg:
        await tg.start(die_soon)
        yield

@pytest.mark.anyio
async def test_try():
    async with my_simple_fixture():
        my_event.set()

Running this with trio as the backend gives:

[...]
  |   File "/tmp/anyio_pytest/bar.py", line 14, in my_simple_fixture
  |     async with anyio.create_task_group() as tg:
  |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/anyio/_backends/_trio.py", line 187, in __aexit__
  |     return await self._nursery_manager.__aexit__(exc_type, exc_val, exc_tb)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/trio/_core/_run.py", line 959, in __aexit__
  |     raise combined_error_from_nursery
  | ExceptionGroup: Exceptions from Trio nursery (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/tmp/anyio_pytest/bar.py", line 8, in die_soon
    |     await my_event.wait()
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/anyio/_core/_synchronization.py", line 130, in wait
    |     await self._event.wait()
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 1716, in wait
    |     await AsyncIOBackend.checkpoint()
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 2264, in checkpoint
    |     await sleep(0)
    |   File "/usr/lib/python3.12/asyncio/tasks.py", line 656, in sleep
    |     await __sleep0()
    |   File "/usr/lib/python3.12/asyncio/tasks.py", line 650, in __sleep0
    |     yield
    | TypeError: trio.run received unrecognized yield message None. Are you trying to use a library written for some other framework like asyncio? That won't work without some kind of compatibility shim.

if I remove the decorator and directly run anyio.run(test_try, backend="trio") it correctly gives a group with our "OOPS" RuntimeError, same if running anyio-pytest with asyncio as backend.

2

This gives a teardown error and a messy traceback

import anyio
import pytest

@pytest.fixture
def anyio_backend():
    return 'asyncio'

async def die_soon():
    raise RuntimeError('OOPS')

@pytest.fixture
async def my_simple_fixture():
    async with anyio.create_task_group() as tg:
        tg.start_soon(die_soon)
        yield

async def test_try(my_simple_fixture, anyio_backend):
    ...

Error:

``` $ pytest bar.py -sv ===================================== test session starts ===================================== platform linux -- Python 3.12.4, pytest-8.3.3, pluggy-1.5.0 -- /tmp/anyio_pytest/.venv/bin/python cachedir: .pytest_cache rootdir: /tmp/anyio_pytest plugins: anyio-4.6.0, trio-0.8.0 collected 1 item bar.py::test_try FAILED bar.py::test_try ERROR =========================================== ERRORS ============================================ ________________________________ ERROR at teardown of test_try ________________________________ anyio_backend = 'asyncio', args = (), kwargs = {}, backend_name = 'asyncio' backend_options = {}, runner = def wrapper(*args, anyio_backend, **kwargs): # type: ignore[no-untyped-def] backend_name, backend_options = extract_backend_and_options(anyio_backend) if has_backend_arg: kwargs["anyio_backend"] = anyio_backend with get_runner(backend_name, backend_options) as runner: if isasyncgenfunction(func): > yield from runner.run_asyncgen_fixture(func, kwargs) .venv/lib/python3.12/site-packages/anyio/pytest_plugin.py:81: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py:2187: in run_asyncgen_fixture self.get_loop().run_until_complete( /usr/lib/python3.12/asyncio/base_events.py:687: in run_until_complete return future.result() .venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py:2170: in _call_in_runner_task self._send_stream.send_nowait((coro, future)) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = MemoryObjectSendStream(_state=MemoryObjectStreamState(max_buffer_size=1, buffer=deque([]), open_send_channels=0, open_receive_channels=0, waiting_receivers=OrderedDict(), waiting_senders=OrderedDict()), _closed=True) item = (, ) def send_nowait(self, item: T_contra) -> None: """ Send an item immediately if it can be done without waiting. :param item: the item to send :raises ~anyio.ClosedResourceError: if this send stream has been closed :raises ~anyio.BrokenResourceError: if the stream has been closed from the receiving end :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting to receive """ if self._closed: > raise ClosedResourceError E anyio.ClosedResourceError .venv/lib/python3.12/site-packages/anyio/streams/memory.py:211: ClosedResourceError ========================================== FAILURES =========================================== __________________________________________ test_try ___________________________________________ pyfuncitem = @pytest.hookimpl(tryfirst=True) def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None: def run_with_hypothesis(**kwargs: Any) -> None: with get_runner(backend_name, backend_options) as runner: runner.run_test(original_func, kwargs) backend = pyfuncitem.funcargs.get("anyio_backend") if backend: backend_name, backend_options = extract_backend_and_options(backend) if hasattr(pyfuncitem.obj, "hypothesis"): # Wrap the inner test function unless it's already wrapped original_func = pyfuncitem.obj.hypothesis.inner_test if original_func.__qualname__ != run_with_hypothesis.__qualname__: if iscoroutinefunction(original_func): pyfuncitem.obj.hypothesis.inner_test = run_with_hypothesis return None if iscoroutinefunction(pyfuncitem.obj): funcargs = pyfuncitem.funcargs testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames} with get_runner(backend_name, backend_options) as runner: try: > runner.run_test(pyfuncitem.obj, testargs) .venv/lib/python3.12/site-packages/anyio/pytest_plugin.py:131: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py:2217: in run_test self._raise_async_exceptions() .venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py:2121: in _raise_async_exceptions raise exceptions[0] .venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py:2211: in run_test self.get_loop().run_until_complete( /usr/lib/python3.12/asyncio/base_events.py:687: in run_until_complete return future.result() .venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py:2170: in _call_in_runner_task self._send_stream.send_nowait((coro, future)) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = MemoryObjectSendStream(_state=MemoryObjectStreamState(max_buffer_size=1, buffer=deque([]), open_send_channels=0, open_receive_channels=0, waiting_receivers=OrderedDict(), waiting_senders=OrderedDict()), _closed=True) item = (, ) def send_nowait(self, item: T_contra) -> None: """ Send an item immediately if it can be done without waiting. :param item: the item to send :raises ~anyio.ClosedResourceError: if this send stream has been closed :raises ~anyio.BrokenResourceError: if the stream has been closed from the receiving end :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting to receive """ if self._closed: > raise ClosedResourceError E anyio.ClosedResourceError .venv/lib/python3.12/site-packages/anyio/streams/memory.py:211: ClosedResourceError =================================== short test summary info =================================== FAILED bar.py::test_try - anyio.ClosedResourceError ERROR bar.py::test_try - anyio.ClosedResourceError ================================= 1 failed, 1 error in 0.30s ================================== ```

3

but if we make anyio_backend return "trio" we instead get a hang. KeyboardInterrupt traceback ends with

``` self = , 0)>, timeout = None def wait(self, timeout=None): """Wait until notified or until a timeout occurs. If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised. This method releases the underlying lock, and then blocks until it is awakened by a notify() or notify_all() call for the same condition variable in another thread, or until the optional timeout occurs. Once awakened or timed out, it re-acquires the lock and returns. When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). When the underlying lock is an RLock, it is not released using its release() method, since this may not actually unlock the lock when it was acquired multiple times recursively. Instead, an internal interface of the RLock class is used, which really unlocks it even when it has been recursively acquired several times. Another internal interface is then used to restore the recursion level when the lock is reacquired. """ if not self._is_owned(): raise RuntimeError("cannot wait on un-acquired lock") waiter = _allocate_lock() waiter.acquire() self._waiters.append(waiter) saved_state = self._release_save() gotit = False try: # restore state no matter what (e.g., KeyboardInterrupt) if timeout is None: > waiter.acquire() E KeyboardInterrupt /usr/lib/python3.12/threading.py:355: KeyboardInterrupt ====================================== 1 passed in 1.16s ====================================== Exception ignored in: Traceback (most recent call last): File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/trio/_core/_asyncgens.py", line 123, in finalizer raise RuntimeError( RuntimeError: Non-Trio async generator 'bar.my_simple_fixture' awaited something during finalization; install a finalization hook to support this, or wrap it in 'async with aclosing(...):' ```
agronholm commented 2 weeks ago

How did you run the first snippet with trio as backend? I put it in a single test module, and ran pytest with the module as argument plus -k trio. I didn't see any issues there. I can repro the issue with snippet 2, but as for snippet 3, I don't get that RuntimeError.

jakkdl commented 1 week ago

How did you run the first snippet with trio as backend? I put it in a single test module, and ran pytest with the module as argument plus -k trio. I didn't see any issues there. I can repro the issue with snippet 2, but as for snippet 3, I don't get that RuntimeError.

ah hmm, for snippet 1 I only get it when I run both asyncio&trio, so I suppose it's something about the fixture not being torn down & re-set up properly? I tested it just now with pytest-random-order and if running trio 1st and asyncio 2nd I get a similar error:

  |   File "/tmp/anyio_pytest/foo_1.py", line 14, in my_simple_fixture
  |     async with anyio.create_task_group() as tg:
  |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 736, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/tmp/anyio_pytest/foo_1.py", line 8, in die_soon
    |     await my_event.wait()
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/anyio/_core/_synchronization.py", line 130, in wait
    |     await self._event.wait()
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/anyio/_backends/_trio.py", line 647, in wait
    |     return await self.__original.wait()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/trio/_sync.py", line 86, in wait
    |     await trio.lowlevel.checkpoint()
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/trio/_core/_run.py", line 2788, in checkpoint
    |     await cancel_shielded_checkpoint()
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/trio/_core/_traps.py", line 51, in cancel_shielded_checkpoint
    |     (await _async_yield(CancelShieldedCheckpoint)).unwrap()
    |      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/trio/_core/_traps.py", line 29, in _async_yield
    |     return (yield obj)
    |             ^^^^^^^^^
    | RuntimeError: Task got bad yield: <class 'trio._core._traps.CancelShieldedCheckpoint'>
    +------------------------------------

Not sure how to help you repro the 2nd snippet, I can reliably re-repro it in new venv's.

agronholm commented 1 week ago

I noticed that you had a global Event object, but it will bind to the first back-end it's used with, so that won't work in a test suite involving two back-ends. Make it a fixture instead.

jakkdl commented 1 week ago

I noticed that you had a global Event object, but it will bind to the first back-end it's used with, so that won't work in a test suite involving two back-ends. Make it a fixture instead.

Ah thanks. Making it a fixture resolved it when using my_simple_fixture as a context manager, but switching it back to being a fixture resurfaces problems

import anyio
import pytest

@pytest.fixture
def my_event():
    return anyio.Event()

async def die_soon(my_event, task_status):
    print("entering die_soon")
    task_status.started()
    await my_event.wait()
    print("crashing")
    raise RuntimeError('OOPS')

@pytest.fixture
async def my_simple_fixture(my_event):
    async with anyio.create_task_group() as tg:
        await tg.start(die_soon, my_event)
        print("yielding")
        yield
        print("continuing")

@pytest.mark.anyio
async def test_try(my_simple_fixture, my_event):
    print("in test")
    my_event.set()
    print("end of test")

I'm now getting anyio.ClosedResourceError on asyncio and a hang on trio, but if i random-order to get trio first I get trio passing and then a hang.

agronholm commented 1 week ago

This randomness is due to Trio's non-deterministic scheduling. If you know how to turn that off, please let me know! It's supremely annoying when running tests.

jakkdl commented 1 week ago

This randomness is due to Trio's non-deterministic scheduling. If you know how to turn that off, please let me know! It's supremely annoying when running tests.

sorry, with random-order I meant running with https://pypi.org/project/pytest-random-order/ in order to see if running trio before asyncio was different from running asyncio before trio.

smurfix commented 1 week ago

If you know how to turn that off, please let me know! It's supremely annoying when running tests.

Well, that's easy.

env PYTHONHASHSEED=123 pytest …

Also, in conftest.py:

from trio._core import _run
_run._ALLOW_DETERMINISTIC_SCHEDULING = True
agronholm commented 1 week ago

I think I'll ask around about this particular variable and its future.

smurfix commented 1 week ago

On second look a better solution is to call _run._r.seed(123) instead, but the same caveat applies.

Trio should probably export a way to explicitly set that seed. I'd recommend to submit a PR.

agronholm commented 1 week ago

It's not just the random scheduling I'd like to tackle, but I'd really like to make trio schedule tasks in the FIFO order. Does _ALLOW_DETERMINISTIC_SCHEDULING = True do that?

smurfix commented 1 week ago

No. If you want that, you need to monkeypatch run._r.random() to always return >= 0.5.