litestar-org / litestar

Production-ready, Light, Flexible and Extensible ASGI API framework | Effortlessly Build Performant APIs
https://litestar.dev/
MIT License
5.59k stars 380 forks source link

Bug: sope of asyncpg DI in sse route #2958

Closed euri10 closed 10 months ago

euri10 commented 10 months ago

Description

I would have expect this code to be ok, however it seems like whe inside the generator I try to access the db the connection has already been released to the pool. So I'm not sure if this classifies as a bug, ie shoul the db conneciton last the time of the stream response or not, If it is not then it looks like to me it's kind of impossible to fetch db inside a generator that will be streamed, and I have not been able to find a way so far.

the trace fro the mvce below:

❯ uvicorn test_apps.sse_asyncpg:app --reload
INFO:     Will watch for changes in these directories: ['/home/lotso/PycharmProjects/litestar']
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [75842] using WatchFiles
INFO:     Started server process [75844]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     127.0.0.1:37394 - "GET /stream HTTP/1.1" 200 OK
ERROR - &���,-472531846 - litestar - middleware - exception raised on http connection to route /stream

Traceback (most recent call last):
    | Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 78, in _call_next
    |     return next(self.iterator)
    |            ^^^^^^^^^^^^^^^^^^^
    | StopIteration
    | 
    | The above exception was the direct cause of the following exception:
    | 
    | Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 85, in _async_generator
    |     yield await sync_to_thread(self._call_next)
    |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/concurrency.py", line 62, in sync_to_thread
    |     return await _run_sync_asyncio(fn, *args, **kwargs)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/concurrency.py", line 38, in _run_sync_asyncio
    |     return await asyncio.get_running_loop().run_in_executor(get_asyncio_executor(), bound_fn)  # pyright: ignore
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/.asdf/installs/python/3.11.7/lib/python3.11/concurrent/futures/thread.py", line 58, in run
    |     result = self.fn(*self.args, **self.kwargs)
    |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 80, in _call_next
    |     raise ValueError from e
    | ValueError
    | 
    | During handling of the above exception, another exception occurred:
    | 
    | Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 111, in _stream
    |     async for chunk in self.iterator:
    |   File "/home/lotso/PycharmProjects/litestar/litestar/utils/sync.py", line 79, in __anext__
    |     return await self.generator.__anext__()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 87, in _async_generator
    |     async for value in self.content_async_iterator:
    |   File "/home/lotso/PycharmProjects/litestar/test_apps/sse_asyncpg.py", line 36, in stream_generator
    |     r = await conn.execute("select randon()")
    |               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/asyncpg/pool.py", line 50, in call_con_method
    |     raise exceptions.InterfaceError(
    | asyncpg.exceptions._base.InterfaceError: cannot call Connection.execute(): connection has been released back to the pool
    +------------------------------------
Traceback (most recent call last):
  File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 134, in send_body
    await self._listen_for_disconnect(cancel_scope=task_group.cancel_scope, receive=receive)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 100, in _listen_for_disconnect
    await self._listen_for_disconnect(cancel_scope=cancel_scope, receive=receive)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 94, in _listen_for_disconnect
    message = await receive()
              ^^^^^^^^^^^^^^^
  File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 587, in receive
    await self.message_event.wait()
  File "/home/lotso/.asdf/installs/python/3.11.7/lib/python3.11/asyncio/locks.py", line 213, in wait
    await fut
asyncio.exceptions.CancelledError: Cancelled by cancel scope 7f0e1fd5bf50

During handling of the above exception, another exception occurred:

  + Exception Group Traceback (most recent call last):
  |   File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 192, in __call__
  |     await self.app(scope, receive, send)
  |   File "/home/lotso/PycharmProjects/litestar/litestar/routes/http.py", line 86, in handle
  |     await response(scope, receive, send)
  |   File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 197, in __call__
  |     await self.send_body(send=send, receive=receive)
  |   File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 132, in send_body
  |     async with create_task_group() as task_group:
  |   File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 78, in _call_next
    |     return next(self.iterator)
    |            ^^^^^^^^^^^^^^^^^^^
    | StopIteration
    | 
    | The above exception was the direct cause of the following exception:
    | 
    | Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 85, in _async_generator
    |     yield await sync_to_thread(self._call_next)
    |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/concurrency.py", line 62, in sync_to_thread
    |     return await _run_sync_asyncio(fn, *args, **kwargs)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/concurrency.py", line 38, in _run_sync_asyncio
    |     return await asyncio.get_running_loop().run_in_executor(get_asyncio_executor(), bound_fn)  # pyright: ignore
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/.asdf/installs/python/3.11.7/lib/python3.11/concurrent/futures/thread.py", line 58, in run
    |     result = self.fn(*self.args, **self.kwargs)
    |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 80, in _call_next
    |     raise ValueError from e
    | ValueError
    | 
    | During handling of the above exception, another exception occurred:
    | 
    | Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 111, in _stream
    |     async for chunk in self.iterator:
    |   File "/home/lotso/PycharmProjects/litestar/litestar/utils/sync.py", line 79, in __anext__
    |     return await self.generator.__anext__()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 87, in _async_generator
    |     async for value in self.content_async_iterator:
    |   File "/home/lotso/PycharmProjects/litestar/test_apps/sse_asyncpg.py", line 36, in stream_generator
    |     r = await conn.execute("select randon()")
    |               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/asyncpg/pool.py", line 50, in call_con_method
    |     raise exceptions.InterfaceError(
    | asyncpg.exceptions._base.InterfaceError: cannot call Connection.execute(): connection has been released back to the pool
    +------------------------------------
ERROR - ,-472531813 - litestar - middleware - exception raised on http connection to route /stream

Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 80, in _call_next
    |     raise ValueError from e
    | ValueError
    | 
    | During handling of the above exception, another exception occurred:
    | 
    | Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 111, in _stream
    |     async for chunk in self.iterator:
    |   File "/home/lotso/PycharmProjects/litestar/litestar/utils/sync.py", line 79, in __anext__
    |     return await self.generator.__anext__()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 87, in _async_generator
    |     async for value in self.content_async_iterator:
    |   File "/home/lotso/PycharmProjects/litestar/test_apps/sse_asyncpg.py", line 36, in stream_generator
    |     r = await conn.execute("select randon()")
    |               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/asyncpg/pool.py", line 50, in call_con_method
    |     raise exceptions.InterfaceError(
    | asyncpg.exceptions._base.InterfaceError: cannot call Connection.execute(): connection has been released back to the pool
    +------------------------------------

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 192, in __call__
    await self.app(scope, receive, send)
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 206, in __call__
    await self.handle_request_exception(
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 236, in handle_request_exception
    await response.to_asgi_response(app=None, request=request)(scope=scope, receive=receive, send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 191, in __call__
    await self.start_response(send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 162, in start_response
    await send(event)
  File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 541, in send
    raise RuntimeError(msg % message_type)
RuntimeError: Expected ASGI message 'http.response.body', but got 'http.response.start'.
Traceback (most recent call last):
  File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 134, in send_body
    await self._listen_for_disconnect(cancel_scope=task_group.cancel_scope, receive=receive)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 100, in _listen_for_disconnect
    await self._listen_for_disconnect(cancel_scope=cancel_scope, receive=receive)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 94, in _listen_for_disconnect
    message = await receive()
              ^^^^^^^^^^^^^^^
  File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 587, in receive
    await self.message_event.wait()
  File "/home/lotso/.asdf/installs/python/3.11.7/lib/python3.11/asyncio/locks.py", line 213, in wait
    await fut
asyncio.exceptions.CancelledError: Cancelled by cancel scope 7f0e1fd5bf50

During handling of the above exception, another exception occurred:

  + Exception Group Traceback (most recent call last):
  |   File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 192, in __call__
  |     await self.app(scope, receive, send)
  |   File "/home/lotso/PycharmProjects/litestar/litestar/routes/http.py", line 86, in handle
  |     await response(scope, receive, send)
  |   File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 197, in __call__
  |     await self.send_body(send=send, receive=receive)
  |   File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 132, in send_body
  |     async with create_task_group() as task_group:
  |   File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 78, in _call_next
    |     return next(self.iterator)
    |            ^^^^^^^^^^^^^^^^^^^
    | StopIteration
    | 
    | The above exception was the direct cause of the following exception:
    | 
    | Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 85, in _async_generator
    |     yield await sync_to_thread(self._call_next)
    |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/concurrency.py", line 62, in sync_to_thread
    |     return await _run_sync_asyncio(fn, *args, **kwargs)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/concurrency.py", line 38, in _run_sync_asyncio
    |     return await asyncio.get_running_loop().run_in_executor(get_asyncio_executor(), bound_fn)  # pyright: ignore
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/.asdf/installs/python/3.11.7/lib/python3.11/concurrent/futures/thread.py", line 58, in run
    |     result = self.fn(*self.args, **self.kwargs)
    |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 80, in _call_next
    |     raise ValueError from e
    | ValueError
    | 
    | During handling of the above exception, another exception occurred:
    | 
    | Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 111, in _stream
    |     async for chunk in self.iterator:
    |   File "/home/lotso/PycharmProjects/litestar/litestar/utils/sync.py", line 79, in __anext__
    |     return await self.generator.__anext__()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 87, in _async_generator
    |     async for value in self.content_async_iterator:
    |   File "/home/lotso/PycharmProjects/litestar/test_apps/sse_asyncpg.py", line 36, in stream_generator
    |     r = await conn.execute("select randon()")
    |               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/asyncpg/pool.py", line 50, in call_con_method
    |     raise exceptions.InterfaceError(
    | asyncpg.exceptions._base.InterfaceError: cannot call Connection.execute(): connection has been released back to the pool
    +------------------------------------

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 192, in __call__
    await self.app(scope, receive, send)
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 206, in __call__
    await self.handle_request_exception(
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 236, in handle_request_exception
    await response.to_asgi_response(app=None, request=request)(scope=scope, receive=receive, send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 191, in __call__
    await self.start_response(send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 162, in start_response
    await send(event)
  File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 541, in send
    raise RuntimeError(msg % message_type)
RuntimeError: Expected ASGI message 'http.response.body', but got 'http.response.start'.
ERROR - ,-472531810 - litestar - middleware - exception raised on http connection to route /stream

Traceback (most recent call last):
    +------------------------------------

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 192, in __call__
    await self.app(scope, receive, send)
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 206, in __call__
    await self.handle_request_exception(
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 236, in handle_request_exception
    await response.to_asgi_response(app=None, request=request)(scope=scope, receive=receive, send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 191, in __call__
    await self.start_response(send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 162, in start_response
    await send(event)
  File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 541, in send
    raise RuntimeError(msg % message_type)
RuntimeError: Expected ASGI message 'http.response.body', but got 'http.response.start'.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 192, in __call__
    await self.app(scope, receive, send)
  File "/home/lotso/PycharmProjects/litestar/litestar/_asgi/asgi_router.py", line 84, in __call__
    await asgi_app(scope, receive, send)
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 206, in __call__
    await self.handle_request_exception(
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 236, in handle_request_exception
    await response.to_asgi_response(app=None, request=request)(scope=scope, receive=receive, send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 191, in __call__
    await self.start_response(send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 162, in start_response
    await send(event)
  File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 541, in send
    raise RuntimeError(msg % message_type)
RuntimeError: Expected ASGI message 'http.response.body', but got 'http.response.start'.
Traceback (most recent call last):
  File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 134, in send_body
    await self._listen_for_disconnect(cancel_scope=task_group.cancel_scope, receive=receive)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 100, in _listen_for_disconnect
    await self._listen_for_disconnect(cancel_scope=cancel_scope, receive=receive)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 94, in _listen_for_disconnect
    message = await receive()
              ^^^^^^^^^^^^^^^
  File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 587, in receive
    await self.message_event.wait()
  File "/home/lotso/.asdf/installs/python/3.11.7/lib/python3.11/asyncio/locks.py", line 213, in wait
    await fut
asyncio.exceptions.CancelledError: Cancelled by cancel scope 7f0e1fd5bf50

During handling of the above exception, another exception occurred:

  + Exception Group Traceback (most recent call last):
  |   File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 192, in __call__
  |     await self.app(scope, receive, send)
  |   File "/home/lotso/PycharmProjects/litestar/litestar/routes/http.py", line 86, in handle
  |     await response(scope, receive, send)
  |   File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 197, in __call__
  |     await self.send_body(send=send, receive=receive)
  |   File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 132, in send_body
  |     async with create_task_group() as task_group:
  |   File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 78, in _call_next
    |     return next(self.iterator)
    |            ^^^^^^^^^^^^^^^^^^^
    | StopIteration
    | 
    | The above exception was the direct cause of the following exception:
    | 
    | Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 85, in _async_generator
    |     yield await sync_to_thread(self._call_next)
    |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/concurrency.py", line 62, in sync_to_thread
    |     return await _run_sync_asyncio(fn, *args, **kwargs)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/concurrency.py", line 38, in _run_sync_asyncio
    |     return await asyncio.get_running_loop().run_in_executor(get_asyncio_executor(), bound_fn)  # pyright: ignore
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/.asdf/installs/python/3.11.7/lib/python3.11/concurrent/futures/thread.py", line 58, in run
    |     result = self.fn(*self.args, **self.kwargs)
    |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 80, in _call_next
    |     raise ValueError from e
    | ValueError
    | 
    | During handling of the above exception, another exception occurred:
    | 
    | Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 111, in _stream
    |     async for chunk in self.iterator:
    |   File "/home/lotso/PycharmProjects/litestar/litestar/utils/sync.py", line 79, in __anext__
    |     return await self.generator.__anext__()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 87, in _async_generator
    |     async for value in self.content_async_iterator:
    |   File "/home/lotso/PycharmProjects/litestar/test_apps/sse_asyncpg.py", line 36, in stream_generator
    |     r = await conn.execute("select randon()")
    |               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/asyncpg/pool.py", line 50, in call_con_method
    |     raise exceptions.InterfaceError(
    | asyncpg.exceptions._base.InterfaceError: cannot call Connection.execute(): connection has been released back to the pool
    +------------------------------------

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 192, in __call__
    await self.app(scope, receive, send)
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 206, in __call__
    await self.handle_request_exception(
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 236, in handle_request_exception
    await response.to_asgi_response(app=None, request=request)(scope=scope, receive=receive, send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 191, in __call__
    await self.start_response(send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 162, in start_response
    await send(event)
  File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 541, in send
    raise RuntimeError(msg % message_type)
RuntimeError: Expected ASGI message 'http.response.body', but got 'http.response.start'.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 192, in __call__
    await self.app(scope, receive, send)
  File "/home/lotso/PycharmProjects/litestar/litestar/_asgi/asgi_router.py", line 84, in __call__
    await asgi_app(scope, receive, send)
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 206, in __call__
    await self.handle_request_exception(
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 236, in handle_request_exception
    await response.to_asgi_response(app=None, request=request)(scope=scope, receive=receive, send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 191, in __call__
    await self.start_response(send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 162, in start_response
    await send(event)
  File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 541, in send
    raise RuntimeError(msg % message_type)
RuntimeError: Expected ASGI message 'http.response.body', but got 'http.response.start'.
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 134, in send_body
    await self._listen_for_disconnect(cancel_scope=task_group.cancel_scope, receive=receive)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 100, in _listen_for_disconnect
    await self._listen_for_disconnect(cancel_scope=cancel_scope, receive=receive)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 94, in _listen_for_disconnect
    message = await receive()
              ^^^^^^^^^^^^^^^
  File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 587, in receive
    await self.message_event.wait()
  File "/home/lotso/.asdf/installs/python/3.11.7/lib/python3.11/asyncio/locks.py", line 213, in wait
    await fut
asyncio.exceptions.CancelledError: Cancelled by cancel scope 7f0e1fd5bf50

During handling of the above exception, another exception occurred:

  + Exception Group Traceback (most recent call last):
  |   File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 192, in __call__
  |     await self.app(scope, receive, send)
  |   File "/home/lotso/PycharmProjects/litestar/litestar/routes/http.py", line 86, in handle
  |     await response(scope, receive, send)
  |   File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 197, in __call__
  |     await self.send_body(send=send, receive=receive)
  |   File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 132, in send_body
  |     async with create_task_group() as task_group:
  |   File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 78, in _call_next
    |     return next(self.iterator)
    |            ^^^^^^^^^^^^^^^^^^^
    | StopIteration
    | 
    | The above exception was the direct cause of the following exception:
    | 
    | Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 85, in _async_generator
    |     yield await sync_to_thread(self._call_next)
    |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/concurrency.py", line 62, in sync_to_thread
    |     return await _run_sync_asyncio(fn, *args, **kwargs)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/concurrency.py", line 38, in _run_sync_asyncio
    |     return await asyncio.get_running_loop().run_in_executor(get_asyncio_executor(), bound_fn)  # pyright: ignore
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/.asdf/installs/python/3.11.7/lib/python3.11/concurrent/futures/thread.py", line 58, in run
    |     result = self.fn(*self.args, **self.kwargs)
    |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 80, in _call_next
    |     raise ValueError from e
    | ValueError
    | 
    | During handling of the above exception, another exception occurred:
    | 
    | Traceback (most recent call last):
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/streaming.py", line 111, in _stream
    |     async for chunk in self.iterator:
    |   File "/home/lotso/PycharmProjects/litestar/litestar/utils/sync.py", line 79, in __anext__
    |     return await self.generator.__anext__()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/litestar/response/sse.py", line 87, in _async_generator
    |     async for value in self.content_async_iterator:
    |   File "/home/lotso/PycharmProjects/litestar/test_apps/sse_asyncpg.py", line 36, in stream_generator
    |     r = await conn.execute("select randon()")
    |               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/asyncpg/pool.py", line 50, in call_con_method
    |     raise exceptions.InterfaceError(
    | asyncpg.exceptions._base.InterfaceError: cannot call Connection.execute(): connection has been released back to the pool
    +------------------------------------

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 192, in __call__
    await self.app(scope, receive, send)
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 206, in __call__
    await self.handle_request_exception(
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 236, in handle_request_exception
    await response.to_asgi_response(app=None, request=request)(scope=scope, receive=receive, send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 191, in __call__
    await self.start_response(send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 162, in start_response
    await send(event)
  File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 541, in send
    raise RuntimeError(msg % message_type)
RuntimeError: Expected ASGI message 'http.response.body', but got 'http.response.start'.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 192, in __call__
    await self.app(scope, receive, send)
  File "/home/lotso/PycharmProjects/litestar/litestar/_asgi/asgi_router.py", line 84, in __call__
    await asgi_app(scope, receive, send)
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 206, in __call__
    await self.handle_request_exception(
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 236, in handle_request_exception
    await response.to_asgi_response(app=None, request=request)(scope=scope, receive=receive, send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 191, in __call__
    await self.start_response(send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 162, in start_response
    await send(event)
  File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 541, in send
    raise RuntimeError(msg % message_type)
RuntimeError: Expected ASGI message 'http.response.body', but got 'http.response.start'.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 426, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lotso/PycharmProjects/litestar/litestar/app.py", line 541, in __call__
    await self.asgi_handler(scope, receive, self._wrap_send(send=send, scope=scope))  # type: ignore[arg-type]
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 206, in __call__
    await self.handle_request_exception(
  File "/home/lotso/PycharmProjects/litestar/litestar/middleware/exceptions/middleware.py", line 236, in handle_request_exception
    await response.to_asgi_response(app=None, request=request)(scope=scope, receive=receive, send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 191, in __call__
    await self.start_response(send=send)
  File "/home/lotso/PycharmProjects/litestar/litestar/response/base.py", line 162, in start_response
    await send(event)
  File "/home/lotso/PycharmProjects/litestar/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 541, in send
    raise RuntimeError(msg % message_type)
RuntimeError: Expected ASGI message 'http.response.body', but got 'http.response.start'.

URL to code causing the issue

No response

MCVE

import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator

from asyncpg import Connection, Pool, create_pool

from litestar import Litestar, get
from litestar.datastructures import State
from litestar.di import Provide
from litestar.response.sse import ServerSentEventMessage, ServerSentEvent

@asynccontextmanager
async def lifespan(
        _app: Litestar,
) -> AsyncGenerator[None, None]:
    dbpool = await create_pool("postgresql://postgres:postgres@localhost:5432/postgres")
    _app.state.dbpool = dbpool
    try:
        yield
    finally:
        dbpool.terminate()
        await dbpool.close()

async def get_connection(state: State) -> AsyncGenerator[Connection, None]:
    pool: Pool = state.dbpool
    async with pool.acquire() as conn:
        yield conn

@get("/stream", name="stream")
async def stream(conn: Connection) -> ServerSentEvent:
    async def stream_generator(conn: Connection) -> None:
        await asyncio.sleep(1)
        r = await conn.execute("select randon()")
        yield ServerSentEventMessage(data=r)

    return ServerSentEvent(stream_generator(conn))

app = Litestar(debug=True, route_handlers=[stream], lifespan=[lifespan], dependencies={"conn": Provide(get_connection)})

Steps to reproduce

1. Go to '...'
2. Click on '....'
3. Scroll down to '....'
4. See error

Screenshots

"![SCREENSHOT_DESCRIPTION](SCREENSHOT_LINK.png)"

Logs

No response

Litestar Version

2.5.0

Platform


[!NOTE]
While we are open for sponsoring on GitHub Sponsors and OpenCollective, we also utilize Polar.sh to engage in pledge-based sponsorship.

Check out all issues funded or available for funding on our Polar.sh dashboard

  • If you would like to see an issue prioritized, make a pledge towards it!
  • We receive the pledge once the issue is completed & verified
  • This, along with engagement in the community, helps us know which features are a priority to our users.

Fund with Polar

provinzkraut commented 10 months ago

This is not a bug and is actually mentioned in the docs, but I can see how it might be unexpected.

The reason for this is that the scope of the dependencies is tied to the handler function and not the response being sent.

The cleanup stage is executed after the handler function returns, but before the response is sent (in case of HTTP requests)

https://docs.litestar.dev/latest/usage/dependency-injection.html#dependencies-with-yield-cleanup-step

While this is in line with how other frameworks (FastAPI and Sanic as well afaik) handle it, I think ideally, you'd be able to define this scope yourself and change the default behaviour. We could revisit this when we do the DI overhaul.

Currently, the only proper solution would be to manage the connection inside the generator yourself, something like:


async def get_connection(state: State) -> AsyncGenerator[Connection, None]:
    pool: Pool = state.dbpool
    async with pool.acquire() as conn:
        yield conn

@get("/stream", name="stream")
async def stream(state: State) -> ServerSentEvent:
    async def stream_generator(pool: Pool) -> None:
        async with pool.acquire() as conn:
            await asyncio.sleep(1)
            r = await conn.execute("select randon()")
            yield ServerSentEventMessage(data=r)

    return ServerSentEvent(stream_generator(state.pool))

Closing this for now as it's expected behaviour and there's nothing we can do about it at the moment.

euri10 commented 10 months ago

ok got it, I didn't think about getting the connection that way but this is exactly how I do it in guards :man_facepalming:

I'm fine doing it this way, I just dont know given the stream can last a while if there wont be timeout issues but I guess testing it will probably be the best answer