sanic-org / sanic

Accelerate your web app development | Build fast. Run fast.
https://sanic.dev
MIT License
18.13k stars 1.56k forks source link

Add a way to clean up background tasks #2015

Closed Hubro closed 3 years ago

Hubro commented 3 years ago

Is your feature request related to a problem? Please describe.

My goal is to be able to spin up a background task when the server starts, then cleanly stop the background task when the server stops. So far I haven't had any luck, because the Sanic.add_task function doesn't return the task object, so I don't have a reference to the task so it can be cancelled and await-ed.

I have attempted to work around it by cleaning up all background tasks when the server stops, as suggested by @ahopkins:

import asyncio
from sanic import Sanic
from sanic.request import Request
from sanic.response import text
from sanic.log import logger

app = Sanic(__name__)

@app.listener("after_server_start")
async def after_server_start(
    app: Sanic, loop: asyncio.AbstractEventLoop
) -> None:
    app.add_task(my_background_task())

@app.listener("before_server_stop")
async def before_server_stop(
    app: Sanic, loop: asyncio.AbstractEventLoop
) -> None:
    await cleanup_background_tasks()

@app.route("/")
def get_root(_: Request):
    return text("Success")

async def my_background_task():
    """Background task, should run as long as the server runs"""

    while True:
        try:
            logger.info("DOING BACKGROUND TASK STUFF")
            await asyncio.sleep(10)
        except Exception as e:
            if isinstance(e, asyncio.CancelledError):
                logger.info("CLEANING UP BACKGROUND TASK")
                # Cleanup stuff here
                return

            logger.exception("Unhandled exception during background task")

async def cleanup_background_tasks() -> None:
    """Attempts to cleanly stop all background tasks"""

    current = asyncio.current_task()

    for task in asyncio.all_tasks():
        if task is not current:
            task.cancel()
            await task

if __name__ == "__main__":
    app.run("127.0.0.1", 8155, debug=True)

But for some reason this violently explodes when I hit Ctrl+C:

$ python sanic-task-cleanup-example.py
[2021-02-01 13:21:48 +0100] [460527] [INFO] Goin' Fast @ http://127.0.0.1:8155
[2021-02-01 13:21:48 +0100] [460527] [INFO] DOING BACKGROUND TASK STUFF
[2021-02-01 13:21:48 +0100] [460527] [INFO] Starting worker [460527]
^C[2021-02-01 13:21:51 +0100] [460527] [INFO] Stopping worker [460527]
[2021-02-01 13:21:51 +0100] [460527] [ERROR] Experienced exception while trying to serve
Traceback (most recent call last):
  File "/home/tomas/Desktop/sanic-task-cleanup-example.py", line 35, in my_background_task
    await asyncio.sleep(10)
  File "/usr/lib/python3.9/asyncio/tasks.py", line 651, in sleep
    return await future
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/tomas/Desktop/sanic-task-cleanup-example.py", line 21, in before_server_stop
    await cleanup_background_tasks()
  File "/home/tomas/Desktop/sanic-task-cleanup-example.py", line 53, in cleanup_background_tasks
    await task
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/tomas/.cache/pypoetry/virtualenvs/device-onboarding-monitor-bWdVHQ8B-py3.9/lib/python3.9/site-packages/sanic/app.py", line 1094, in run
    serve(**server_settings)
  File "/home/tomas/.cache/pypoetry/virtualenvs/device-onboarding-monitor-bWdVHQ8B-py3.9/lib/python3.9/site-packages/sanic/server.py", line 917, in serve
    trigger_events(before_stop, loop)
  File "/home/tomas/.cache/pypoetry/virtualenvs/device-onboarding-monitor-bWdVHQ8B-py3.9/lib/python3.9/site-packages/sanic/server.py", line 696, in trigger_events
    loop.run_until_complete(result)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
asyncio.exceptions.CancelledError
Traceback (most recent call last):
  File "/home/tomas/Desktop/sanic-task-cleanup-example.py", line 35, in my_background_task
    await asyncio.sleep(10)
  File "/usr/lib/python3.9/asyncio/tasks.py", line 651, in sleep
    return await future
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/tomas/Desktop/sanic-task-cleanup-example.py", line 21, in before_server_stop
    await cleanup_background_tasks()
  File "/home/tomas/Desktop/sanic-task-cleanup-example.py", line 53, in cleanup_background_tasks
    await task
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/tomas/Desktop/sanic-task-cleanup-example.py", line 57, in <module>
    app.run("127.0.0.1", 8155, debug=True)
  File "/home/tomas/.cache/pypoetry/virtualenvs/device-onboarding-monitor-bWdVHQ8B-py3.9/lib/python3.9/site-packages/sanic/app.py", line 1094, in run
    serve(**server_settings)
  File "/home/tomas/.cache/pypoetry/virtualenvs/device-onboarding-monitor-bWdVHQ8B-py3.9/lib/python3.9/site-packages/sanic/server.py", line 917, in serve
    trigger_events(before_stop, loop)
  File "/home/tomas/.cache/pypoetry/virtualenvs/device-onboarding-monitor-bWdVHQ8B-py3.9/lib/python3.9/site-packages/sanic/server.py", line 696, in trigger_events
    loop.run_until_complete(result)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
asyncio.exceptions.CancelledError

Describe the solution you'd like

I would like a way to spin up one or more background in the Sanic event loop and cleanly exit them when Sanic shuts down. One way to do this would be to make Sanic.add_task return a reference to the new task, so I can call task.cancel() and await task in the Sanic "before_server_stop" listener.

Hubro commented 3 years ago

Alright, I suddenly figured out why the CancelledError isn't being caught, it isn't a subclass of Exception, but rather BaseException. That has also been the root cause of all my other failed attempts at working around this.

I now have a pretty clean working solution using loop.create_task:

import asyncio
from sanic import Sanic
from sanic.request import Request
from sanic.response import text
from sanic.log import logger

app = Sanic(__name__)

@app.listener("after_server_start")
async def after_server_start(
    app: Sanic, loop: asyncio.AbstractEventLoop
) -> None:
    app.background_task = loop.create_task(my_background_task())

@app.listener("before_server_stop")
async def before_server_stop(
    app: Sanic, loop: asyncio.AbstractEventLoop
) -> None:
    app.background_task.cancel()
    await app.background_task

@app.route("/")
def get_root(_: Request):
    return text("Success")

async def my_background_task():
    """Background task, should run as long as the server runs"""

    while True:
        try:
            await asyncio.sleep(10)
            logger.info("DOING BACKGROUND TASK STUFF")
        except asyncio.CancelledError:
            logger.info("CLEANING UP BACKGROUND TASK")
            # Cleanup stuff here
            return
        except Exception as e:
            logger.exception("Unhandled exception during background task")

if __name__ == "__main__":
    app.run("127.0.0.1", 8155, debug=True)

Is there any reason to use app.add_task rather than loop.create_task? Is it just a convenience function for when you don't have direct access to the loop? If so feel free to just close this feature request, my issue from the start was just my inexperience with asyncio.

ahopkins commented 3 years ago

Is there any reason to use app.add_task rather than loop.create_task? Is it just a convenience function for when you don't have direct access to the loop?

Yes, mainly a convenience. There is some safety built in to make sure that it doesn't start until after the server is started, but given your implementation in calling the task inside the listener, you will not have this problem.

Don't close this just yet. Some discussion is still to be had and I would like to think this over more if we should track these tasks for cleanup or not.

Hubro commented 3 years ago

Alright this turns out to not be a great solution after all :sweat_smile: I increased my worker count and it turns out my current code spawns one background task for each worker. I only need a single background task running. Is it possible to start the background task in the main process?

ahopkins commented 3 years ago

All workers receive the same listeners. If you are bound to having only a single process, then perhaps you may need another solution. Either running a task queue service, or trying to come up with a mechanism registering across processes and then checking to see if the task exists before starting.

Hubro commented 3 years ago

There is one "main" process and one process for each worker, right? Does the main process run an event loop too? Would it be possible to hook into that loop somehow? Perhaps with a couple of listeners like "main_process_setup" and "main_process_teardown" that trigger before the child processes are created and after they have exited, respectively, and give access to the main process event loop? These would not trigger in the child processes, of course.

@app.listener("main_process_setup")
async def setup(app: Sanic, loop: asyncio.AbstractEventLoop) -> None:
    # In here, "loop" is the event loop of the main process
    app.background_task = loop.create_task(my_background_task())

@app.listener("main_process_teardown")
async def teardown(app: Sanic, loop: asyncio.AbstractEventLoop) -> None:
    # Clean up the background task
    app.background_task.cancel()
    await app.background_task

Alternatively, could Sanic expose a simple, decorator-based API for creating worker processes? It already creates worker processes for handling requests, so perhaps that system could be expanded to allow users to add additional worker processes?

For example:

@app.worker(count=1)
async def my_worker_process():
    """Does background work while the server is running"""
    while True:
        try:
            do_background_work()
            asyncio.sleep(WORK_INTERVAL)
        except asyncio.CancelledError:
            do_worker_cleanup()
            return
        except Exception:
            logger.exception("Unhandled exception in worker process")

Basically what I need is a few processes running parallel to the web server, running some cleanup queries now and then, running some health checks etc.

I really want to avoid bloating my Docker image with a process supervisor and more advanced configuration when a simple background task running in the web server process would be more than sufficient.

ahopkins commented 3 years ago

One quick correction, Sanic does not create a new process per request. At startup, it will create one process for each worker. Each one creates a new server that is listening on the same port.

The first point about additional listeners makes some sense. I linked an RFC that is pending for release, but not until June.

Alternatively, could Sanic expose a simple, decorator-based API for creating worker processes? It already creates worker processes for handling requests, so perhaps that system could be expanded to allow users to add additional worker processes?

It could. It would likely be very basic, and almost undoubtedly would raise questions about how to share data across them. But that is an issue/discussion for a different day.

Hubro commented 3 years ago

One quick correction, Sanic does not create a new process per request. At startup, it will create one process for each worker. Each one creates a new server that is listening on the same port.

I realize that, and after re-reading my own post I'm not quite sure what gave the impression that I didn't :sweat_smile: But then again English is not my first language.

The first point about additional listeners makes some sense. I linked an RFC that is pending for release, but not until June.

Nice! I will be following that.

It could. It would likely be very basic, and almost undoubtedly would raise questions about how to share data across them. But that is an issue/discussion for a different day.

It would have the advantage of not sharing a GIL with the main process, which I suppose could be a big plus in some workloads.


My new workaround plan for this is to manually spawn a new process (subprocess/fork) to run my background task before I execute app.run. After app.run I kill the background process using a signal.

Considering my background tasks are extremely light, it feel like it would be much cleaner to just attach them to the main process event loop though. Is there any hacky workaround I can use to access that loop?

ahopkins commented 3 years ago

I realize that, and after re-reading my own post I'm not quite sure what gave the impression that I didn't sweat_smile But then again English is not my first language.

This: It already creates worker processes for handling requests

Nice! I will be following that.

Also look at #2018. It is a pretty small addition, so as long as we can get sufficient test coverage for it, we probably can add it to 21.3 as well. It is not a breaking change.

Considering my background tasks are extremely light, it feel like it would be much cleaner to just attach them to the main process event loop though. Is there any hacky workaround I can use to access that loop?

Well, not really. The main process itself does not run an event loop. It kicks off workers, and each worker has a one. The main process just hangs around waiting for CTRL+C, then to do some cleanup.

One thing you could try is to just go back to the original: @app.listener("before_server_stop"). Since you are only scaling within a single container, you could acquire a file system lock so only the first worker sets up the task.

Hubro commented 3 years ago

My new workaround plan for this is to manually spawn a new process (subprocess/fork) to run my background task before I execute app.run. After app.run I kill the background process using a signal.

Well that didn't work out. It turns out even the worked processes spawned by Sanic will trigger the if __name__ == "__main__" block, so I ended up spawning a background process for every worked process. In other words there doesn't seem to be a way for me to ever know if I'm in the main process or in a worker process. Back to square one...

One thing you could try is to just go back to the original: @app.listener("before_server_stop"). Since you are only scaling within a single container, you could acquire a file system lock so only the first worker sets up the task.

This would have the disadvantage that one of the worker processes is more heavily loaded than the other ones, but I don't really see any other possibilities at this point.

Hubro commented 3 years ago

One thing you could try is to just go back to the original: @app.listener("before_server_stop"). Since you are only scaling within a single container, you could acquire a file system lock so only the first worker sets up the task.

Success at last! This whole ordeal was absurdly more difficult than I ever imagined when starting this project :P

Here is the final working code:

app = Sanic("Device onboarding monitor", log_config=LOG_CONFIG)
background_tasks = []
background_task_lock = filelock.FileLock(BACKGROUND_TASK_LOCKFILE)

#
# Application setup/teardown
#

@app.listener("before_server_start")
async def setup(app: Sanic, loop: asyncio.AbstractEventLoop) -> None:
    """Application setup function"""

    await db.connect()
    await db.create_tables()

    logger.info("Database connection initialized")

    try:
        background_task_lock.acquire(0)

        # Start device connection testing task
        background_tasks.append(loop.create_task(test_device_connections()))
    except filelock.Timeout:
        pass  # Another process has already started background tasks

@app.listener("after_server_stop")
async def teardown(app: Sanic, loop: asyncio.AbstractEventLoop) -> None:
    """Application teardown function"""

    # Stop all background tasks
    for task in background_tasks:
        task.cancel()
        await task

    await db.disconnect()

    logger.info("Database connection closed")

I have been looking into FastAPI as it seems like a better fit for my project (a pure API) but it doesn't seem like it's possible to run a background process there either. I'm astounded how background processes doesn't seem to be a priority for anybody :thinking: How am I the only person with this requirement. Seems to me like running a few functions in the background at regular intervals would be extremely common.

stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. If this is incorrect, please respond with an update. Thank you for your contributions.