sysid / sse-starlette

BSD 3-Clause "New" or "Revised" License
544 stars 37 forks source link

What's the propper way to kill all ongoing generators? #99

Open ferulisses opened 3 months ago

ferulisses commented 3 months ago

In docs there is the warning about the need to stop all running generators that may result in: "Waiting for background tasks to complete. (CTRL+C to force quit).", and using FastAPI I got "Waiting for connections to close. (CTRL+C to force quit)"

I looked at #8 that appears to be related, but the solution is to press Ctrl+C again?

So, what's the proper way to kill ongoing generators?

In my dev env, uvicorn won't reload unless the client disconnects, and in production a SIGTERM won't kill the process.

I tried a variation from https://stackoverflow.com/a/59089890 cited on #8 without success (captured Ctrl+C and never finishes):

    async def shutdown(s: signal):
        """
        try to shut down gracefully
        """
        print(f"Received exit signal {s.name}...", )
        tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task() and t.get_name() == "sse_starlette.sse.EventSourceResponse.__call__.<locals>.wrap"]
        [task.cancel() for task in tasks]
        await asyncio.gather(*tasks)

    loop = asyncio.get_event_loop()
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda _=s: asyncio.create_task(shutdown(_))
        )
truh commented 3 months ago

I don't think you can cancel async generators that are currently being awaited. https://bugs.python.org/issue38559

My current approach to having a cancel-able async generator is to have a generator that returns futures from a sync function.

class EventPublisher:
    def __init__(self):
        self.queue = asyncio.Queue(1)
        self.closed = False
        self.tasks: typing.List[asyncio.Task] = []

    def __aiter__(self):
        return self

    def __anext__(self):
        self.tasks = [t for t in self.tasks if not t.done()]
        if self.closed:
            raise StopAsyncIteration("EventPublisher has been closed")
        task = asyncio.create_task(self.queue.get())
        self.tasks.append(task)
        return task

    def put_nowait(self, item):
        if self.closed:
            raise RuntimeError(f"EventPublisher has been closed")
        self.queue.put_nowait(item)

    async def aclose(self):
        self.closed = True
        for t in self.tasks:
            t.cancel()
        await asyncio.gather(*self.tasks, return_exceptions=True)

Unlike async generators, awaited futures can be cancelled.

I'm not really happy with this solution but it seems to work so far.

This is inspired by this stackoverflow answer https://stackoverflow.com/a/60233813 but putting the future inside the publisher (my code) rather than the consumer (sse-starlette).