Closed maves-knx closed 9 months ago
@maves-knx , cool analysis, much appreciated!
Your first option, providing a dedicated high-performance class with no ping seems like a promising approach. I am happy to take a PR along this idea.
However, when I am running your example with EventSourceResponseNoPing
I get socket.send() raised exception.
on the uvicorn console when I CTRL-C the curl
consumer.
When using EventSourceResponse
this does not happen.
I did not have a lot of time diving into this, but I guess this requires some fine tuning.
Ok, found the problem in your test code (see my comments):
@app.get("/stream")
async def message_stream(request: Request):
async def event_generator():
global sse_clients
sse_clients += 1
print(f"{sse_clients} sse clients connected", flush=True)
while True:
# If client closes connection, stop sending events
if await request.is_disconnected():
break
for p in positions:
# fixes socket.send() raised exception, but makes it very slow!!
# if await request.is_disconnected():
# break
yield p
return EventSourceResponse(event_generator())
See https://github.com/sysid/sse-starlette/pull/55#issuecomment-1732374113 for a suggestion of a way to improve performance roughly within the current setup. The code would roughly be (haven't run this):
async def _ping(self, send: Send) -> None:
# Legacy proxy servers are known to, in certain cases, drop HTTP connections after a short timeout.
# To protect against such proxy servers, authors can send a custom (ping) event
# every 15 seconds or so.
# Alternatively one can send periodically a comment line
# (one starting with a ':' character)
while self.active:
await anyio.sleep(self._ping_interval)
if self.ping_message_factory:
assert isinstance(self.ping_message_factory, Callable) # type: ignore # https://github.com/python/mypy/issues/6864
ping = (
ServerSentEvent(comment=f"ping - {datetime.utcnow()}").encode()
if self.ping_message_factory is None
else ensure_bytes(self.ping_message_factory(), self.sep)
)
_log.debug(f"ping: {ping.decode()}")
async with self._send_lock:
if self.active:
await send({"type": "http.response.body", "body": ping, "more_body": True})
async def stream_response(self, send) -> None:
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)
async for data in self.body_iterator:
chunk = ensure_bytes(data, self.sep)
_log.debug(f"chunk: {chunk.decode()}")
await send({"type": "http.response.body", "body": chunk, "more_body": True})
async with self._send_lock:
self.active = False
await send({"type": "http.response.body", "body": b"", "more_body": False})
And then you just get rid of safe_send
in the __call__
method entirely.
Thanks @ejlangev , this seems to be to do the trick. I am getting almost same throughput here as with the ping-free solution.
@maves-knx , can you confirm this observations with your tests as well? Created branch for it: https://github.com/sysid/sse-starlette/tree/feat/performance
Hey, sorry for the late reply :(
I gotta admit I am very surprised how I could have missed that socket exception. Sorry for that :(
But in my new tests, the socket exception also occurs on the feat/performance
branch, if I don't adjust my test script above.
So I concluded, that I'll run two tests, one with the socket exception and one without. I also repeated the old test, to get "fresh" results:
(all tests via go 20 clients (running for 5 min))
The original solution doesn't have the problem with the socket exception. No check if is_disconnected
is needed (Or should that check always be present)?
Average number of received events per second: 4878 Max number of received events per second: 4878 min number of received events per second: 4878
Average number of received events per second: 12531 Max number of received events per second: 12554 min number of received events per second: 12525
Average number of received events per second: 12388 Max number of received events per second: 12410 min number of received events per second: 12380
So both solutions seem pretty much equivalent here.
I fixed the socket exception, as was mentioned above, by adding the is_disconnected
check.
Average number of received events per second: 3115 Max number of received events per second: 3115 min number of received events per second: 3115
Average number of received events per second: 4508 Max number of received events per second: 4508 min number of received events per second: 4508
Average number of received events per second: 4460 Max number of received events per second: 4460 min number of received events per second: 4460
So, those numbers are a little meh? If there is no other way of preventing the socket error, we produced a slower result?
Or am I missing something, is the feat/performance
not raising that exception for you?
Lock contention has been resolved. Thanks @ejlangev for the solution and @maves-knx for bringing it to attention!
The socket exception is a different topic and actually works as designed.
Hey @sysid, thanks for providing this library :)
Overview
We (the company I work for) are currently using sse-starlette to build some of our services. With one somewhat high load one, we discovered a potential performance bottleneck.
It seems that this pull request https://github.com/sysid/sse-starlette/pull/55/files introduced an
anyio.Lock
to prevent a race condition between the_ping
and thestream_reponse
task from happening. This lock seems to be a bit slow.Proposal
We experimented with two solutions:
1: Remove the ping task altogether
After reading (skimming? ^^) https://html.spec.whatwg.org/multipage/server-sent-events.html and the comment of the
_ping
function in your code, it seems that a ping is not really required by the SSE protocol, so we could provide anEventSourceResponse
subclass, which just doesn't do it. If there is still some ping required, the user of the library could integrate it themselves in theirstream_response
Example implementation
2: Only ping on timeout
An alternative approach we investigated was moving the ping inside the
stream_response
function, so that the same loop would send the data and the ping, therefore not requiring a lock. This turned out a bit tricky, since we usedanyio.fail_after
which cancels the running tasks and requires they async generator provided by the users of the library to be able to handle those cancellations. It seems that non-class based async generators struggle with this.Example code:
It seems the first proposal would be an easy option to implement and provide some flexibility to the users of the library, what do you think?
If we agree on the approach and some naming, we can provide a pull request :)
Here are some results of the tests we performed.
Tests
I ran the following tests:
I started this script via
uvicorn sse_test.py:app
(I removed some details here, I know I wouldn't have to repeat the position 500 times)And then I connect some clients and count the returned lines.
For example one client via
curl http://localhost:8000/stream | pv --line-mode --average-rate > /dev/null
Or 20 clients with a custom go script.
Result 1: current implementation with anyio.Lock
test via one curl connection (avg over 5 min):
test via go 20 clients (running for 5 min):
Result 2: removing the ping task and the lock
test via one curl connection (avg over 5 min):
test via go 20 clients (running for 5 min):
Result 3: handling the ping when a timeout occurs
test via one curl connection (avg over 5 min):
test via go 20 clients (running for 5 min):
Speed-up
Since the actual numbers are not too relevant, here the speed up:
Speedup from test 1 to test 2:
Speedup from test 1 to test 3: