aio-libs / aiohttp-sse

Server-sent events support for aiohttp
Other
197 stars 36 forks source link

SSE are batched #484

Open sola-ed opened 6 months ago

sola-ed commented 6 months ago

Hi guys,

I've been trying to test aiohttp-sse with the EventSource component in Dash. In a previous issue raised here, I failed to make it work since I forgot to include CORS handling. That is now solved, but I notice that the events arrive buffered to Dash. You can reproduce this with the following code.

# server.py

from aiohttp import web
import json
import asyncio
from datetime import datetime
from aiohttp_sse import sse_response
import aiohttp_cors

app = web.Application()
routes = web.RouteTableDef()

cors = aiohttp_cors.setup(app, defaults={
    "*": aiohttp_cors.ResourceOptions(
        allow_credentials=True,
        expose_headers="*",
        allow_methods="*",
        allow_headers="*",
        max_age=3600
    )
})

@routes.get("/hello")
async def hello(request: web.Request) -> web.StreamResponse:
    async with sse_response(request) as resp: 
        while resp.is_connected(): 
            services = json.dumps({
                "time": f"Server Time : {datetime.now()}"
            })
            await resp.send(services)
            await asyncio.sleep(1)
    return resp

app.router.add_routes(routes)
for route in app.router.routes():
    cors.add(route)

if __name__ == "__main__":
    web.run_app(app, host='127.0.0.1', port=5000)

Now the Dash client:

# client.py

from dash_extensions import EventSource
from dash_extensions.enrich import html, dcc, Output, Input, DashProxy
from dash.exceptions import PreventUpdate
import json

# Create small example app.
app = DashProxy(__name__)
app.layout = html.Div([
    EventSource(id="sse", url="http://127.0.0.1:5000/hello"),
    html.Span('SSE'),
    html.Div(id="display")
])

@app.callback(
    Output("display", "children"), 
    Input("sse", "message"),
)
def display(msg):
    if msg is not None:
        return msg
    else:
        raise PreventUpdate()

if __name__ == "__main__":
    app.run_server(debug=True)

When I run these scripts, I get chucks like this in the msg variable in Dash:

'{"time": "Server Time : 2024-04-05 09:44:52.022164"}\n{"time": "Server Time : 2024-04-05 09:44:53.023039"}\n{"time": "Server Time : 2024-04-05 09:44:54.023770"}\n{"time": "Server Time : 2024-04-05 09:44:55.025389"}\n{"time": "Server Time : 2024-04-05 09:44:56.027151"}\n{"time": "Server Time : 2024-04-05 09:44:57.029044"}\n{"time": "Server Time : 2024-04-05 09:44:58.030822"}\n{"time": "Server Time : 2024-04-05 09:44:59.032468"}\n{"time": "Server Time : 2024-04-05 09:45:00.033961"}\n{"time": "Server Time : 2024-04-05 09:45:01.035243"}\n{"time": "Server Time : 2024-04-05 09:45:02.036953"}\n{"time": "Server Time : 2024-04-05 09:45:03.038641"}\n{"time": "Server Time : 2024-04-05 09:45:04.040436"}\n{"time": "Server Time : 2024-04-05 09:45:05.041850"}\n{"time": "Server Time : 2024-04-05 09:45:06.043279"}'

Running the same thing in Starlette

import asyncio
import json
from datetime import datetime
import uvicorn
from sse_starlette import EventSourceResponse
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware

middleware = Middleware(CORSMiddleware, allow_origins=["*"], allow_headers=["*"])
server = Starlette(middleware=[middleware])

async def random_data():
    while True:
        await asyncio.sleep(1)
        yield json.dumps({
            "time": f"Server Time : {datetime.now()}"
        })

@server.route("/hello")
async def sse(request):
    generator = random_data()
    return EventSourceResponse(generator)

if __name__ == "__main__":
    uvicorn.run(server, port=5000)

gives atomic answers in the msg variable.

{"time": "Server Time : 2024-04-05 11:04:50.266653"}

Note that, when I visit http://127.0.0.1:5000/hello after running the aiohttp-sse example, the events are received atomically, as expected.

So my question is, is there something in the transmission process that Starlette is doing and aiohttp-sse is omitting?

Please note that I would like to avoid the question of where is the bug (Dash EventSource or aiohttp-sse), since this easily leads to a kind of chicken-or-egg dilemma ... since their stuff works using the Starlette example. I'm just raising this here to see if you have any hint regarding this problem. I'm still resisting to switch to Starlette (or FastAPI) just because of this issue. Thanks in advance!

Dreamsorcerer commented 6 months ago

So my question is, is there something in the transmission process that Starlette is doing and aiohttp-sse is omitting?

In your last issue, it seemed like starlette is sending a badly formatted SSE format and that Dash doesn't actually support SSE. Has something changed or have we found evidence that is not correct?

Without investigating, it seems likely to me that it is the exact same issue that it's expecting the wrong number of line breaks (i.e. it's not buffering, it's just assuming that they are all part of one message).

since this easily leads to a kind of chicken-or-egg dilemma

If it's correct that Dash doesn't support any valid SSE stream and only works with the broken streams produced by Starlette (as suggested by the posts linked to in your last issue), then I don't see any way that Dash can claim that their implementation is correct when it doesn't work with any other library. This needs to be fixed there.

If we were to match Starlette then we'd potentially break every conforming SSE client that interacts with the library, as every message would now suddenly have a newline at the start.

sola-ed commented 5 months ago

The confusion comes from the link that I sent in the previous issue: it is just wrong!

I have gone through some details and both starlette (left) and aiohttp-sse (right) use the same separators and protocol

sse_sep

That only 2x \r\n is used by starlette can be checked by breaking into its stream_response function

starlette_stream

A sample from the chunk variable is

b'data: {"time": "Server Time : 2024-04-08 10:59:30.249653"}\r\n\r\n'

which immediately appears in the browser (Dash app) after passing through the send method. On the other hand, doing the same with aiohttp-sse:

aiohttp_stream

one notices the first time that the chunk variable is

'{"time": "Server Time : 2024-04-08 11:29:29.951909"}'

but it is not immediately seen in the browser after finishing executing the send. It is kinda stuck for a while. Then the error

ConnectionResetError: Cannot write to closing transport

appears, and the whole batch

{"time": "Server Time : 2024-04-08 11:30:22.047700"} {"time": "Server Time : 2024-04-08 11:30:30.146010"} {"time": "Server Time : 2024-04-08 11:30:31.147563"} {"time": "Server Time : 2024-04-08 11:30:32.148499"} {"time": "Server Time : 2024-04-08 11:30:33.150090"} {"time": "Server Time : 2024-04-08 11:30:34.151646"} {"time": "Server Time : 2024-04-08 11:30:35.152449"} {"time": "Server Time : 2024-04-08 11:30:36.153865"} {"time": "Server Time : 2024-04-08 11:30:37.155267"} {"time": "Server Time : 2024-04-08 11:30:38.156443"} {"time": "Server Time : 2024-04-08 11:30:39.157877"} {"time": "Server Time : 2024-04-08 11:30:40.159344"} {"time": "Server Time : 2024-04-08 11:30:41.160452"} {"time": "Server Time : 2024-04-08 11:30:42.161835"} {"time": "Server Time : 2024-04-08 11:30:43.163273"}

is observed in the browser afterward. So there is definitely something worth taking a look at within aiohttp-sse. Is there some parameter that should be set in the aiohttp-sse API to avoid the batching?

Olegt0rr commented 5 months ago

Can't reproduce.

As soon as EventSourceResponse.send() become awaited, I can see new event in the browser.

https://github.com/aio-libs/aiohttp-sse/assets/25399456/14cd67e5-9faf-4b06-a0b5-71baf337e4dd

Dreamsorcerer commented 5 months ago

I have gone through some details and both starlette (left) and aiohttp-sse (right) use the same separators and protocol

Thanks for digging into this, but have you confirmed with a simple client the exact bytes received? It'd be really useful if you can create a reproducer in isolation.

If you look in our test file, we have tests that just read the body with aiohttp and verify what messages are received: https://github.com/aio-libs/aiohttp-sse/blob/master/tests/test_sse.py

So, if you can figure out a reproducer in the form of a test (and create a PR with it), then we can certainly figure out what's happening from there. I'd suggest initially comparing responses to a starlette app to verify they are exactly the same bytes. If it is actually buffering, then you'd need to tweak those tests to use the streaming API for the testing (instead of resp.text() which reads the full body).

Dreamsorcerer commented 5 months ago

Also, it might be useful to know what OS, event loop etc. you are running on. I've just found the code in Dash, and it looks to me like it's just using the regular browser API: https://github.com/emilhe/dash-extensions/blob/master/src/lib/components/EventSource.react.js#L9

So, surely you'd be able to produce this without Dash if that's the case?

sola-ed commented 5 months ago

Hey guys,

This problem is hard to reproduce, it happens kinda randomly. @Olegt0rr , you won't see it if you run the server and client in the same app. I tried this, and everything works as expected. @Dreamsorcerer , surely I have seen this even without using Dash.

# server
from aiohttp import web
import json
import asyncio
from datetime import datetime
from aiohttp_sse import sse_response
import aiohttp_cors

app = web.Application()
routes = web.RouteTableDef()

cors = aiohttp_cors.setup(app, defaults={
    "*": aiohttp_cors.ResourceOptions(
        allow_credentials=True,
        expose_headers="*",
        allow_methods="*",
        allow_headers="*",
        max_age=3600
    )
})

@routes.get("/hello")
async def hello(request: web.Request) -> web.StreamResponse:
    async with sse_response(request) as resp: 
        while resp.is_connected(): 
            data = json.dumps({
                "time": f"Server Time : {datetime.now()}"
            })
            await resp.send(data)
            await asyncio.sleep(1)
    return resp

app.router.add_routes(routes)
for route in app.router.routes():
    cors.add(route)

if __name__ == "__main__":
    web.run_app(app, host='127.0.0.1', port=5000)

and then

# client
from aiohttp import web

async def index(_request: web.Request) -> web.StreamResponse:
    html = """
        <html>
            <body>
                <script>
                    var eventSource = new EventSource("http://127.0.0.1:5000/hello");
                    eventSource.addEventListener("message", event => {
                        document.getElementById("response").innerText = event.data;
                    });
                </script>
                <h1>Response from server:</h1>
                <div id="response"></div>
            </body>
        </html>
    """
    return web.Response(text=html, content_type="text/html")

app = web.Application()
app.router.add_route("GET", "/", index)
web.run_app(app, host="127.0.0.1", port=8080)

My observations:

  1. Sometimes by just running the scripts in separate terminals and checking the browser, things work as expected.
  2. In another trial, I put a breakpoint in the await resp.send(data) line in the server and step line by line in the while loop, and then I observed the batched response in the browser.
  3. In another trial, I did 1) again, and observed the batched data in the browser.
  4. With Starlette, batched data is never seen.

I'm curious to see if you find this as well. I am running this with aiohttp-sse=2.1.0, in a machine with Ubuntu 20.04.6 LTS and python Python 3.10.13.