falconry / falcon

The no-magic web data plane API and microservices framework for Python developers, with a focus on reliability, correctness, and performance at scale.
https://falcon.readthedocs.io/en/stable/
Apache License 2.0
9.51k stars 935 forks source link

ASGI resp.stream is iterated until completion despite client disconnect #2015

Open vytas7 opened 2 years ago

vytas7 commented 2 years ago

At the time of writing, setting resp.stream to an async generator results in it being iterated by the framework until completion (or, obviously, until an exception is raised) regardless whether the client has already disconnected. The SSE code path already has a mechanism to poll and detect the http.disconnect ASGI message.

This issue intersects with #1808, however this one proposes that the framework should detect the disconnect event and stop iteration itself, as it is done for the SSE case. Maybe both cases could be generalized that setting resp.sse would simply set the Content-Type and resp.stream, and further streaming would use the same logic?

If implemented, this behaviour would probably interfere with "true" full-duplex HTTP streaming, i.e., where the request's body is being streamed to the server, while the server is streaming response simultaneously. I've tried skimming through RFC 2616, however it is not crystal clear whether this is allowed or not. The RFC just states it's forbidden for the client to continue streaming if it has received an error response. Some people claim it is possible, see, e.g., Does HTTP 1.1 Support Full Duplex Communication? Obviously, not all clients and servers supports this. For instance, the popular Uvicorn states that:

Once a response has been sent, Uvicorn will no longer buffer any remaining request body. Any later calls to receive will return an http.disconnect message.

However, theoretically other ASGI servers might still choose to support this scenario... Is it maybe possible to detect this somehow (that receive() is still returning data after send() and buffer a couple of messages :thinking:)?

https://github.com/falconry/falcon/issues/1956 might also be loosely related.

vytas7 commented 2 years ago

A simple test app from Gitter used by @vytas7 (in a discussion with @maxking) to debug the current behaviour:

import asyncio
import logging
import uuid

import falcon.asgi

# NOTE(vytas): Useful since ASGI otherwise has nothing like wsgierrors.
logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO)

class RequestID:
    async def process_request(self, req, resp):
        req.context.request_id = str(uuid.uuid4())

class EventStream:
    async def on_get(self, req, resp):
        async def emitter():
            try:
                for i in range(1, 30+1):
                    logging.info(f'<{req.context.request_id}>: i=#{i}')
                    await asyncio.sleep(1)

                    yield f'Hello! (#{i})\n'.encode()
            finally:
                logging.info(f'<{req.context.request_id}>: generator exiting')

        resp.content_type = falcon.MEDIA_TEXT
        resp.stream = emitter()

    async def on_get_sse(self, req, resp):
        async def emitter():
            try:
                for i in range(1, 30+1):
                    logging.info(f'<{req.context.request_id}>: i=#{i}')
                    await asyncio.sleep(1)

                    if i % 2 == 1:
                        yield falcon.asgi.SSEvent(json={'i': i}, retry=5000)
                    else:
                        # Just an empty "ping" event
                        yield falcon.asgi.SSEvent()
            finally:
                logging.info(f'<{req.context.request_id}>: generator exiting')

        resp.sse = emitter()

app = falcon.asgi.App(middleware=[RequestID()])

events = EventStream()
app.add_route('/stream', events)
app.add_route('/sse', events, suffix='sse')
CaselIT commented 2 years ago

I think it would make sense to unify the logic.

I don't think this would classify as a breaking change

maxking commented 2 years ago

In ASGI, we probably should also cancel the task that is returning the response too. I am not too aware of the the ASGI/async python or Falcon internals to comment on how the response is run in an event loop and cancellation semantics.

I've been trying out some other web frameworks to see how they handle disconnections in ASGI, and seems like Quart will cancel the task (usually, ~2 seconds after, even though there is a single client) and Sanic won't do anything and continue like Falcon to try to send the response. I can't really understand Django's async/asgi support enough to figure out how to even write a test program.

Quart ``` import asyncio from quart import Quart app = Quart(__name__) @app.route('/') async def hello(): return 'hello' async def emitter(): for i in range(1, 30+1): app.logger.error(f'Emitting event: i=#{i}') await asyncio.sleep(1) yield bytes((f"somedata {i} \r\n\r\n").encode()) @app.route('/events') async def sse(): return emitter(), { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Transfer-Encoding': 'chunked', } app.run() ```
Sanic ``` import asyncio import logging from sanic import Sanic from sanic.response import stream logging_format = "[%(asctime)s] %(process)d-%(levelname)s " logging_format += "%(module)s::%(funcName)s():l%(lineno)d: " logging_format += "%(message)s" logging.basicConfig( format=logging_format, level=logging.DEBUG ) log = logging.getLogger() app = Sanic('stream_test') async def emitter(resp): for i in range(1, 30+1): log.error(f'Emitting event: i=#{i}') await asyncio.sleep(1) await resp.write(bytes((f"somedata {i} \r\n\r\n").encode())) @app.route("/", methods=["GET"]) async def test(request): return stream(emitter) ```