sysid / sse-starlette

BSD 3-Clause "New" or "Revised" License
544 stars 37 forks source link

Question: SSE multicast? #81

Closed ptrhbt closed 10 months ago

ptrhbt commented 10 months ago

Hi,

Is it possible to send SSE to multiple clients?

I see that if there are 2 clients connected, each of them receives every 2nd message. I would like to broadcast SSE to each client connected to given endpoint.

Thanks!

paxcodes commented 10 months ago

IIUC what you're asking, it's definitely possible by using some pub-sub mechanism. Look at this repo: https://github.com/paxcodes/dicery_backend

It uses sse-starlette to broadcast an "event" (the dice-roll result) to multiple clients.

ptrhbt commented 10 months ago

IIUC what you're asking, it's definitely possible by using some pub-sub mechanism. Look at this repo: https://github.com/paxcodes/dicery_backend

It uses sse-starlette to broadcast an "event" (the dice-roll result) to multiple clients.

Good to know, thank you. As an alternative: Would it be possible to iterate over connected clients? Pseudo-code:

    async def get_data(self):
        """Get data from queue"""
        while True:
            item = await self.queue.get()
            # TODO: yield the same responspe N times for all clents
            (await) for _ in connected_clients(...):
                yield json.dumps({"data": item})

    async def sse(self, _):
        """Send SSE"""
        generator = self.get_data()
        return EventSourceResponse(generator)

Is information about current connections available anywhere in sse-starlette?

sysid commented 10 months ago

Closed due to lack of activity.

Arawasu commented 5 months ago

Wow, 11 days isn't a lot of room for activity! Took 4 months for someone to stumble on the same problem I guess 😎.

I dug through the Dicery backend and extracted a simple example for broadcasting to multiple clients. It might be useful for future reference:

from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
from broadcaster import Broadcast

broadcast = Broadcast("memory://")
app = FastAPI(on_startup=[broadcast.connect], on_shutdown=[broadcast.disconnect])

@app.get("/publish")
async def publish():
    await broadcast.publish(channel="msg", message="Hello world!")
    return "OK"

@app.get("/subscribe")
async def subscribe():
    async def get_message():
        while True:
            async with broadcast.subscribe(channel="msg") as subscriber:
                async for event in subscriber:
                    yield event.message

    return EventSourceResponse(get_message())

The broadcast object needs to then be passed to any other parts in your application that might need broadcasting functionality. Instantiating a new broadcast object will not work. You could use i.e. dependency injection for this. Then, just call await broadcast.publish(channel="msg", message="Hello world!") anywhere in your application to send a message.

This is the easiest and most concise method of broadcasting I could find (in the context of Starlette-SSE).

ptrhbt commented 5 months ago

@Arawasu thanks for your post, good to know that I am not the only one struggling ;) And thanks for your solution. Please let me know if it works reliably for you in the longer run.

Background: From my side, the story was as follows: We sent SSE from multiple sources, written in python and C++ to Dash Plotly app. We indeed implemented broadcasting mechanism in C++ (giving up SSE-starlette). At first, it seemed to work, but later we noticed, that no SSE messages were received after refreshing browser page few times. It happened more often if there were inactive SSE sources (i.e. channels subscribed for, but publishing nothing). Finally, we moved away from SSE to MQTT-over-websocket, which seems to work stable for now.

For future reference, I would be curious to see if you encounter similar behaviour with sse-starlette.