sysid / sse-starlette

BSD 3-Clause "New" or "Revised" License
504 stars 35 forks source link

Usage without Async Generators #67

Closed stevenjlm closed 11 months ago

stevenjlm commented 11 months ago

I am not entirely sure this is an SSE-starlette issue, but I think it may be discussion-worthy.

I have a very long running FastAPI SSE server that is sending out updates every 5 to 10 seconds. I need the server to be fetching state updates upstream regardless of whether or not there are active connections to the FastAPI route. The basic code looked something like,

state: Optional[dict] = None
async def process(timeout=0.0):
    global state

    while True:
        fetch_result = fetch_stuff()
        if fetch_result:
            state= fetch_result.dict()

        await asyncio.sleep(5)

@app.on_event("startup")
def startup_event():
    asyncio.create_task(process())

@app.get("/v1/stream")
async def stream():
    def event_stream():
        while True:
            yield json.dumps(state)
            time.sleep(5)
    return EventSourceResponse(event_stream())

This was working fine for a few hours, but eventually, the server would just keep sending the same state updates over the SSE connection (i.e. the state was no longer updating).

I had a hunch that the example on sse-startlette without async generators would help, so per the example code, I modified my code to,

@app.get("/v1/stream")
async def stream(req: Request):
    send_chan, recv_chan = anyio.create_memory_object_stream(10)

    async def event_publisher(inner_send_chan: MemoryObjectSendStream):
        async with inner_send_chan:
                try:
                    while True:
                         await inner_send_chan.send(dict(data=json.dumps(data)))
                         await anyio.sleep(5.0)
                except anyio.get_cancelled_exc_class() as e:
                    log.info(f"Disconnected from client (via refresh/close) {req.client}")
                    with anyio.move_on_after(1, shield=True):
                        await inner_send_chan.send(dict(closing=True))
                        raise e

    return EventSourceResponse(
        recv_chan, data_sender_callable=partial(event_publisher, send_chan)
    )

This was working for several days until I made some updates to the fetch_stuff function. I need to debug the function and figure out what's going on, but I also would like my server not to stop sending out updates if fetch_stuff has an error.

My first question is, does what I'm doing make any sense lol? I am still trying to wrap my head around all the async concepts involved here.. Please let me know if I'm misusing anything. Next, is there any reason why we don't pull the while True loop in the above code outside of the try/except clause?

while True:
    try:
        ...
    except:
        ...

Would this make the API more robust to errors in updating the state?

stevenjlm commented 11 months ago

Ok it looks like this is just entirely a problem with with my process() function. It seems to have nothing to do with sse-starlette.