sysid / sse-starlette

BSD 3-Clause "New" or "Revised" License
545 stars 37 forks source link

Ping task exception was never retrieved #26

Closed katichev closed 2 years ago

katichev commented 2 years ago

Hi folks, I'm using sse-starlette in my project and I've recently tried to upgrade starlette 0.14.2 -> 0.16.0 After this I'm getting sporadic warning message:

Task exception was never retrieved
future: <Task finished name='Task-31378' coro=<EventSourceResponse._ping() done, defined at sse_starlette/sse.py:268> exception=ClosedResourceError()>"

Unfortunately I cannot reproduce this issue locally. I did some investigations and found out that run_until_first_complete at sse.py throws CancelledError so stop_streaming is not executed.

My guess is that behaviour is due to anyio in starlette 0.16.0. I've tried to wrap code in __call__ in CancelScope like so:

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
       with CancelScope(shield=True):
           await run_until_first_complete(
                (self.stream_response, {"send": send}),
                (self.listen_for_disconnect, {"receive": receive}),
                (self.listen_for_exit_signal, {}),
            )
            self.stop_streaming()
            # ... and so on

This seems to work but I'm not pretty sure this is the right way to fix the issue.

Any ideas how to deal with it?

sysid commented 2 years ago

Thank you @katichev for your detailed analysis! Will try to reproduce the issue on my setup and investigate it.

daxartio commented 2 years ago

https://github.com/encode/starlette/blob/0.14.2/starlette/responses.py#L223 https://github.com/encode/starlette/blob/0.16.0/starlette/responses.py#L219

daxartio commented 2 years ago

it should be like this

...
    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        async with anyio.create_task_group() as task_group:

            async def wrap(func: Callable[[], Coroutine[None, None, None]]) -> None:
                await func()
                task_group.cancel_scope.cancel()

            task_group.start_soon(wrap, partial(self.stream_response, send))
            task_group.start_soon(wrap, partial(self._ping, send))
            task_group.start_soon(wrap, self.listen_for_exit_signal)
            await wrap(partial(self.listen_for_disconnect, receive))
        if self.background is not None:
            await self.background()
sysid commented 2 years ago

Thanks @daxartio for pointing into the right direction! I updated the code accordingly.