sysid / sse-starlette

BSD 3-Clause "New" or "Revised" License
504 stars 35 forks source link

Frozen Client Connections - SSE Send Timeout #89

Closed blodow closed 5 months ago

blodow commented 5 months ago

First off, thanks @sysid for this library!

The Problem

I have been running into a situation whereby the HTTP connection from a certain client was kept open, but the client -- through an unrelated bug -- stopped reading from the connection. This led to a problem where the SSE event generator, unaware of this issue of suspended reading, continued generating chunks to send on this connection, slowly saturating TCP buffers before finally simply hanging in the send call.

I would like my server app to be able to protect itself from this DOS-like attack vector by recognizing a live, stale client connection.

Example

An easy way to replicate the issue is e.g. a curl process that is suspended to the background with Ctrl+Z:

# silent flag and redirected only for demonstration
% curl -s -N localhost:8000/events > /dev/null
^Z
zsh: suspended  curl -s -N localhost:8000/events > /dev/null

A minimal server example:

import anyio
from sse_starlette import EventSourceResponse
from starlette.applications import Starlette
from starlette.routing import Route

async def events(request):
    async def _event_generator():
        try:
            i = 0
            while True:
                i += 1
                if i % 100 == 0:
                    print(i)
                yield dict(data={i: " " * 4096})
                await anyio.sleep(0.001)
        finally:
            print("disconnected")
    return EventSourceResponse(_event_generator())

app = Starlette(
    debug=True,
    routes=[
        Route("/events", events),
    ],
)

example output:

% uvicorn sse_timeout:app
INFO:     Started server process [18997]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     127.0.0.1:60359 - "GET /events HTTP/1.1" 200 OK
100
200
300
400
[ frozen at this point ]
blodow commented 5 months ago

Possible Solution

My only idea so far is to detect a long send call. Ideally, I would like to do this outside the sse-starlette library, but haven't found a way to do this inside _event_generator above. My solution is to add a send_timeout parameter to the EventSourceResponse initializer, see here for the diff:

https://github.com/sysid/sse-starlette/compare/main...blodow:sse-starlette:feat/add_send_timeout

The return statement in the server code would then read:

return EventSourceResponse(_event_generator(), send_timeout=1)  # very short for demonstration

Example output:

[...]
300
400
disconnected
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File ".../lib/python3.8/site-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File ".../lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
    return await self.app(scope, receive, send)
  File ".../lib/python3.8/site-packages/starlette/applications.py", line 122, in __call__
    await self.middleware_stack(scope, receive, send)
  File ".../lib/python3.8/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File ".../lib/python3.8/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File ".../lib/python3.8/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File ".../lib/python3.8/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File ".../lib/python3.8/site-packages/starlette/routing.py", line 718, in __call__
    await route.handle(scope, receive, send)
  File ".../lib/python3.8/site-packages/starlette/routing.py", line 276, in handle
    await self.app(scope, receive, send)
  File ".../lib/python3.8/site-packages/starlette/routing.py", line 69, in app
    await response(scope, receive, send)
  File ".../lib/python3.8/site-packages/sse_starlette/sse.py", line 259, in __call__
    await wrap(partial(self.listen_for_disconnect, receive))
  File ".../lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File ".../sse_starlette/sse.py", line 248, in wrap
    await func()
  File ".../sse_starlette/sse.py", line 236, in stream_response
    raise SendTimeoutError()
sse_starlette.sse.SendTimeoutError

If that is acceptable, I can create a PR for it.

If there are better ideas on how to do this, esp. from outside the library, I'm all ears.