agronholm / anyio

High level asynchronous concurrency and networking framework that works on top of either trio or asyncio
MIT License
1.74k stars 134 forks source link

AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item' #754

Closed mecampbellsoup closed 1 week ago

mecampbellsoup commented 2 months ago

Things to check first

AnyIO version

4.4.0

Python version

3.11.9

What happened?

After bumping anyio from 4.3.0 to 4.4.0, one of our tests that depends on starlette.testclient.TestClient began to fail.

Here is the ASGI lifespan code under test:

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    async with database, task_queue.open_async():
        # TODO only run this worker if there is no separate, dedicated worker process
        worker = asyncio.create_task(
            task_queue.run_worker_async(install_signal_handlers=False)
        )

        yield

        worker.cancel()
        try:
            await asyncio.wait_for(worker, timeout=10)
        except TimeoutError:
            logger.error("Ungraceful shutdown")
        except asyncio.CancelledError:
            logger.error("Graceful shutdown")

And here is the test itself:

@patch("cloud_console.server.app.verify_clients")
@patch("cloud_console.server.app.database")
async def test_initialize_app_success(
    mock_database: AsyncMock, mock_verify_clients: AsyncMock
):
    mock_database.__aenter__ = AsyncMock()
    mock_database.__aexit__ = AsyncMock()
    app: FastAPI = initialize_app()

    with TestClient(app):
        pass

    mock_verify_clients.assert_awaited_once()
    mock_database.__aenter__.assert_awaited_once()
    mock_database.__aexit__.assert_awaited_once()

Finally, here is the newly-encountered exception:

===================================================================================================================== FAILURES ======================================================================================================================
____________________________________________________________________________________________________________ test_initialize_app_success ____________________________________________________________________________________________________________

mock_database = <MagicMock name='database' id='130423892726736'>, mock_verify_clients = <AsyncMock name='verify_clients' id='130423892725968'>

    @patch("cloud_console.server.app.verify_clients")
    @patch("cloud_console.server.app.database")
    async def test_initialize_app_success(
        mock_database: AsyncMock, mock_verify_clients: AsyncMock
    ):
        mock_database.__aenter__ = AsyncMock()
        mock_database.__aexit__ = AsyncMock()
        app: FastAPI = initialize_app()

>       with TestClient(app):

tests/server/test_app.py:20:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.venv/lib/python3.11/site-packages/starlette/testclient.py:798: in __exit__
    self.exit_stack.close()
../../../.pyenv/versions/3.11.9/lib/python3.11/contextlib.py:609: in close
    self.__exit__(None, None, None)
../../../.pyenv/versions/3.11.9/lib/python3.11/contextlib.py:601: in __exit__
    raise exc_details[1]
../../../.pyenv/versions/3.11.9/lib/python3.11/contextlib.py:586: in __exit__
    if cb(*exc_details):
../../../.pyenv/versions/3.11.9/lib/python3.11/contextlib.py:469: in _exit_wrapper
    callback(*args, **kwds)
.venv/lib/python3.11/site-packages/starlette/testclient.py:791: in wait_shutdown
    portal.call(self.wait_shutdown)
.venv/lib/python3.11/site-packages/anyio/from_thread.py:287: in call
    return cast(T_Retval, self.start_task_soon(func, *args).result())
../../../.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py:456: in result
    return self.__get_result()
../../../.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py:401: in __get_result
    raise self._exception
.venv/lib/python3.11/site-packages/anyio/from_thread.py:218: in _call_func
    retval = await retval_or_awaitable
.venv/lib/python3.11/site-packages/starlette/testclient.py:833: in wait_shutdown
    message = await receive()
.venv/lib/python3.11/site-packages/starlette/testclient.py:828: in receive
    self.task.result()
../../../.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py:449: in result
    return self.__get_result()
../../../.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py:401: in __get_result
    raise self._exception
.venv/lib/python3.11/site-packages/anyio/from_thread.py:218: in _call_func
    retval = await retval_or_awaitable
.venv/lib/python3.11/site-packages/starlette/testclient.py:803: in lifespan
    await self.app(scope, self.stream_receive.receive, self.stream_send.send)
.venv/lib/python3.11/site-packages/fastapi/applications.py:1054: in __call__
    await super().__call__(scope, receive, send)
.venv/lib/python3.11/site-packages/sentry_sdk/integrations/starlette.py:374: in _sentry_patched_asgi_app
    return await middleware(scope, receive, send)
.venv/lib/python3.11/site-packages/sentry_sdk/integrations/asgi.py:152: in _run_asgi3
    return await self._run_app(scope, receive, send, asgi_version=3)
.venv/lib/python3.11/site-packages/sentry_sdk/integrations/asgi.py:167: in _run_app
    raise exc from None
.venv/lib/python3.11/site-packages/sentry_sdk/integrations/asgi.py:163: in _run_app
    return await self.app(scope, receive, send)
.venv/lib/python3.11/site-packages/starlette/applications.py:123: in __call__
    await self.middleware_stack(scope, receive, send)
.venv/lib/python3.11/site-packages/sentry_sdk/integrations/starlette.py:169: in _create_span_call
    return await old_call(app, scope, new_receive, new_send, **kwargs)
.venv/lib/python3.11/site-packages/starlette/middleware/errors.py:151: in __call__
    await self.app(scope, receive, send)
.venv/lib/python3.11/site-packages/sentry_sdk/integrations/starlette.py:268: in _sentry_exceptionmiddleware_call
    await old_call(self, scope, receive, send)
.venv/lib/python3.11/site-packages/sentry_sdk/integrations/starlette.py:169: in _create_span_call
    return await old_call(app, scope, new_receive, new_send, **kwargs)
.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py:51: in __call__
    await self.app(scope, receive, send)
.venv/lib/python3.11/site-packages/starlette/routing.py:756: in __call__
    await self.middleware_stack(scope, receive, send)
.venv/lib/python3.11/site-packages/starlette/routing.py:765: in app
    await self.lifespan(scope, receive, send)
.venv/lib/python3.11/site-packages/starlette/routing.py:750: in lifespan
    await send({"type": "lifespan.shutdown.complete"})
.venv/lib/python3.11/site-packages/sentry_sdk/integrations/starlette.py:159: in _sentry_send
    description=getattr(send, "__qualname__", str(send)),
../../../.pyenv/versions/3.11.9/lib/python3.11/dataclasses.py:240: in wrapper
    result = user_function(self)
<string>:3: in __repr__
    ???
../../../.pyenv/versions/3.11.9/lib/python3.11/dataclasses.py:240: in wrapper
    result = user_function(self)
<string>:3: in __repr__
    ???
../../../.pyenv/versions/3.11.9/lib/python3.11/dataclasses.py:240: in wrapper
    result = user_function(self)
<string>:3: in __repr__
    ???
../../../.pyenv/versions/3.11.9/lib/python3.11/dataclasses.py:240: in wrapper
    result = user_function(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = MemoryObjectItemReceiver(task_info=AsyncIOTaskInfo(id=130423912790720, name='anyio.from_thread.BlockingPortal._call_func'), item=None)

>   ???
E   AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item'

<string>:3: AttributeError
--------------------------------------------------------------------------------------------------------------- Captured stderr call ----------------------------------------------------------------------------------------------------------------
ERROR:cloud_console.server.app:Graceful shutdown
----------------------------------------------------------------------------------------------------------------- Captured log call -----------------------------------------------------------------------------------------------------------------
ERROR    cloud_console.server.app:app.py:42 Graceful shutdown

I added some print statements around anyio/from_thread.py to see what is failing internally:

----------------------------------------------------------------------------------------------------------------------------- Captured stdout call -----------------------------------------------------------------------------------------------------------------------------
retval_or_awaitable <class 'coroutine'> <coroutine object TestClient.lifespan at 0x70a707fcb840>
scope {'type': 'lifespan', 'state': {}, 'app': <fastapi.applications.FastAPI object at 0x70a7080e7d10>, 'router': <fastapi.routing.APIRouter object at 0x70a707f68590>} <class 'dict'>
receive <function _enable_span_for_middleware.<locals>._create_span_call.<locals>._sentry_receive at 0x70a707e4d260> <class 'function'>
send <function _enable_span_for_middleware.<locals>._create_span_call.<locals>._sentry_send at 0x70a707e4d760> <class 'function'>
retval_or_awaitable <class 'coroutine'> <coroutine object TestClient.wait_startup at 0x70a707f8b140>
ret <class 'procrastinate.utils.AwaitableContext'> <procrastinate.utils.AwaitableContext object at 0x70a707e5a990>
yielding...
retval <class 'NoneType'> None
retval_or_awaitable <class 'coroutine'> <coroutine object TestClient.wait_shutdown at 0x70a707fa7c40>
done yielding.
retval_or_awaitable <class 'coroutine'> <coroutine object BlockingPortal.stop at 0x70a707f039f0>
retval <class 'NoneType'> None

How can we reproduce the bug?

Create a starlette/fastapi application with a test or otherwise that uses starlette.testclient.TestClient; in the ASGI lifespan async context manager, create/yield/cancel a task like so:

        worker = asyncio.create_task(
            task_queue.run_worker_async(install_signal_handlers=False)
        )

        yield

        worker.cancel()

Pin anyio to 4.3.0. The test should pass. Bump anyio to 4.4.0. The test should now error.

agronholm commented 2 months ago

Would you be able to create a minimal workable example that demonstrates this issue? I'm not convinced that AnyIO is the real culprit here.

lordoffreaks commented 1 month ago

This might be helpful @agronholm https://github.com/tiangolo/fastapi/discussions/11652 ... I"m getting the same behaviour

Pin anyio to 4.3.0. The test should pass.
Bump anyio to 4.4.0. The test should now error.
agronholm commented 3 weeks ago

There is one place in MemoryObjectReceiveStream where such an AttributeError occurs, but it is caught and an EndOfStream is raised in its place. So it doesn't make sense that the AttributeError would be uncaught, as is implied in this bug report.

agronholm commented 1 week ago

This was fixed by #767. Let me know if it wasn't.

lordoffreaks commented 1 week ago

@agronholm tested using commit f6750c802e862941fd356bae28753c441aea2ab5 and works perfect, thanks a lot for the work!!!

williamjamir commented 1 week ago

@agronholm Can we have a new release (4.4.1) with this bug fix?

agronholm commented 1 week ago

The next release will be v4.5.0 which is only pending my finalization of a fix for #695, and the review and merging of a number of outstanding PRs.