cirospaciari / socketify.py

Bringing Http/Https and WebSockets High Performance servers for PyPy3 and Python3
https://www.socketify.dev
MIT License
1.4k stars 50 forks source link

Support Server Sent Events #83

Open Siecje opened 1 year ago

cirospaciari commented 1 year ago

before_response and after_response I think would allow a lot of new extensions. Today you can achieve this using extensions + middleware, but will be a great addition to add more events for sure! I will add this as a new feature in the future and implement it, in a way that not affects performance, if you choose to not use it.

Siecje commented 1 year ago

How do you have multiple responses for one request? Or are you saying to have a response to a heartbeat and store the events until then?

cirospaciari commented 1 year ago

you can send multiple res.write and one res.end or multiple res.send_chunks or res.try_end. Using write you are basically using chunked encoded. with send_chunks and try_end you are streaming data.

nickchomey commented 11 months ago

In addition to SSE, might it be possible to do Pub/Sub with SSE? My application is read-heavy, so SSE + AJAX is more appropriate than websockets (also SSE can be intercepted by service workers, is better on battery life, and more).

I see that PubSub is embedded within the uWS websockets mechanism, so don't know if it could be exposed to an SSE mechanism... I'm happy to help implement all of this with some guidance!

cirospaciari commented 11 months ago

SSE is a one-way channel (Server to Client) the Client can use the Last-Event-ID header to resume the stream but cannot send data. Pub/Sub will not be possible (to publish from 1 Client to Server), but you can easily replicate it on the client side by using routes, and sending URL params and headers.

If you mean publishing from Server -> Client using topics with is possible, we can add it in the future basic SSE with socketify example:

import aiohttp
from socketify import App

app = App()

async def get_pokemon(number):
    async with aiohttp.ClientSession() as session:
        async with session.get(
            f"https://pokeapi.co/api/v2/pokemon/{number}"
        ) as response:
            pokemon = await response.text()
            return pokemon.encode("utf-8")

async def home(res, req):
    try:
        lastEventID = req.get_header("Last-Event-ID")
        if lastEventID is None:
            lastEventID = 1
        else:
            lastEventID = int(lastEventID)

        res.write_status(200)
        res.write_header("Content-Type", "text/event-stream")
        res.write_header("Cache-Control", "no-cache")

        lastPokemonID = lastEventID % 151
        # stream pokemons data until we are aborted
        while not res.aborted:
            for pokemon_id in range(lastPokemonID, 151):
                pokemon = await get_pokemon(pokemon_id)
                res.write(f"data: {pokemon}\nid: {lastEventID}\n\n")
                lastEventID += 1
            lastPokemonID = 1

    except Exception as e:
        print(e)
        return res.end("Error")

app.get("/", home)

app.listen(
    3000,
    lambda config: print(
        "Listening on port http://localhost:%s now\n" % str(config.port)
    ),
)
app.run()

consuming it:

const source = new EventSource("http://localhost:3000/");
source.onmessage = (event) => {
   console.log(event.data);
};

using some form of pub/sub:

import aiohttp
import asyncio
from socketify import App

app = App()

channels = {}

def subscribe(channel, client):
    subscribers = channels.get(channel, None)
    if subscribers is None:
        subscribers = []
        channels[channel] = subscribers
    subscribers.append(client)

def unsubscribe(channel, client):
    subscribers = channels.get(channel, None)
    if subscribers is None:
        return False
    for c in subscribers:
        if c == client:
            subscribers.remove(c)
            return True
    return False

def publish(channel, message, id=None):
    subscribers = channels.get(channel, None)
    if subscribers is None:
        return False
    # broadcast to all subscribers
    for c in subscribers:
        if c.aborted:
            unsubscribe(channel, c)
        elif id is None:
            c.write(f"data: {message}\n\n")
        else:
            c.write(f"data: {message}\nid: {id}\n\n")

async def get_pokemon(number):
    async with aiohttp.ClientSession() as session:
        async with session.get(
            f"https://pokeapi.co/api/v2/pokemon/{number}"
        ) as response:
            pokemon = await response.text()
            # cache only works with strings/bytes
            # we will not change nothing here so no needs to parse json
            return pokemon.encode("utf-8")

def home(res, req):
    res.send("Hello World subscribe to event stream /pokemon and to broadcast /broadcast_pokemon?id=1")

async def broadcast_pokemon(res, req):
    pokemon_id = req.get_query("id")
    if pokemon_id is None:
        return res.send("id is required", status=400)

    pokemon = await get_pokemon(pokemon_id)
    # broadcast to all subscribers
    publish("pokemon", pokemon)
    return res.send("ACK")

async def pokemon(res, req):
    try:
        res.on_aborted(lambda res: unsubscribe("pokemon", res))
        subscribe("pokemon", res)

        res.write_status(200)
        res.write_header("Content-Type", "text/event-stream")
        res.write_header("Cache-Control", "no-cache")

        # wait for broadcast
        while not res.aborted:
            await asyncio.sleep(1)

    except Exception as e:
        print(e)
        res.end("Error")

app.get("/", home)
app.get("/pokemon", pokemon)
app.get("/broadcast_pokemon", broadcast_pokemon)

app.listen(
    3000,
    lambda config: print(
        "Listening on port http://localhost:%s now\n" % str(config.port)
    ),
)
app.run()