django / asgiref

ASGI specification and utilities
https://asgi.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.47k stars 208 forks source link

Proposal: ASGI background task handling #200

Open carrieedwards opened 3 years ago

carrieedwards commented 3 years ago

Background

Currently, background tasks in ASGI applications are handled by the framework, rather than the server. To instrument ASGI applications, it is important to know if a background task is scheduled, whether it successfully ran and completed, and the time taken to run the background task versus the taken taken to run other code. This context is currently missing, which makes separating web requests and background tasks difficult for instrumentation. Handling background tasks on the server side would help to provide this context.

Proposal

ASGI may provide a standard protocol for handling of background tasks on the server side, where the background tasks can be tracked and scheduled. This could potentially be implemented as part of the scope, where background tasks could be sent via an API.

andrewgodwin commented 3 years ago

So, I've definitely pondered this in the past, but it kind of conflates what an ASGI server is.

Specifically, I would put forward that background tasks must always be secondary to the servers function and never block exit; to do otherwise turns it from a web server into an everything server, and down that path some awful code lies. Given that brief, we introduced the lifespan signals, to allow background tasks to be launched at server start and have a chance to do quick cleanup at shutdown.

What I'd want to establish is this, specifically:

Thanks for bringing this up - I think it's important to at least have a canonical view on this!

a-feld commented 3 years ago

The specific problem that we (at New Relic) have been struggling with is from a monitoring standpoint. Consider this ASGI application:

async def app(scope, receive, send):
        await send({"type": "http.response.start", "status": 200})
        await send({"type": "http.response.body"})

        # simulate a background task
        await asyncio.sleep(10)
        time.sleep(0.001)
        await asyncio.sleep(10)

Here, there are two parts running under the same context / scope:

To an observer/monitor, how is the background task distinguished? One could say the background task is everything that runs past the end of the response (send body with more_body=False). But there might not be a background task there. Or there might be?

As far as I can tell, without doing some more significant introspection of the coroutine, it's not really possible to tell if there's code after the last send.

Currently, we have to report the web request/response and the background task all under the same context since the web component and the background task are not easily separable (context-wise, timing is another story). We can play certain tricks around indicating the time that the HTTP response was given, but creating an intuitive representation of the execution flow is difficult.

davidism commented 3 years ago

I guess I wouldn't be surprised if it was used this way, but I don't think you're supposed to be using async to replace a background task queue, at least in the context of ASGI applications. I would expect one request to be one context, whatever that request triggers is part of that context. Isn't a big point of ASGI that you can kick off a request and hold it open for a long time while a task completes before returning a response? While I personally wouldn't directly kick off more processing after finishing the response, I don't see why it should be considered a different context than if the task ended before the response did.

andrewgodwin commented 3 years ago

Right - we really discourage "background tasks" as a general concept within ASGI servers, as there's no execution guarantees or proper queuing/backpressure. The only thing I would want to encourage is "do a little cleanup after the request has been sent", but certainly it should not be a replacement for celery/AMQP/kafka/etc.

a-feld commented 3 years ago

Starlette implements background tasks using this ASGI mechanism. Persistence/reliability problems aside, we're struggling with how to represent execution flow in this application:

import asyncio

from starlette.applications import Starlette
from starlette.background import BackgroundTask
from starlette.responses import PlainTextResponse
from starlette.routing import Route

async def bgtask():
    await asyncio.sleep(1)

async def run_background_task(request):
    task = BackgroundTask(bgtask)
    return PlainTextResponse("Task Scheduled!", background=task)

app = Starlette(routes=[Route("/", run_background_task)])

This is what the execution might look like for this route: image

Note the middleware execution time / parent-child structure (which is accurate, but confusing).

Having the background task execute as a child of the route could be confusing to users, but we represent it this way because the code is actually technically running within the scope of the ASGI handler for that particular request.

It's also not entirely clear where the HTTP response was served (but that's a tractable problem to solve).

If this approach is discouraged, should ASGI codify that handlers are not to execute code after the HTTP response is sent? If codified, perhaps uvicorn could implement task cancellation after the HTTP response is sent as an option?

davidism commented 3 years ago

Regardless of whether kicking off more work is a good pattern, it seems expected that coroutines awaited from a request handler are part of that request's context. Besides the name "background", there's no distinction between that coroutine and any other coroutine awaited during that request.

Thinking from a synchronous/WSGI perspective, if a view called other functions that did work, that work would be part of that request. It's only when the work is passed of to a real task queue like RQ or Celery that the context would change in a meaningful way. And the request would take as long as all the blocking work took, just like the request would take as long as awaiting all those coroutines took.

a-feld commented 3 years ago

Agreed 😄 Which does come back to the question though, is there value in ASGI providing a mechanism for scheduling tasks outside the scope of an ASGI request?

Something like await scope["schedule_task"](task)? (this is only illustrative, not a recommendation 😛 )

andrewgodwin commented 3 years ago

I personally don't think there's value in that, since task scheduling is an area that has such a broad scope of types of scheduling and guarantees that anything we add is likely going to be poorly understood, misused, or both. I think it's best to leave task scheduling to something dedicated to that problem, and more importantly a system that can survive a single machine crash.

a-feld commented 3 years ago

Yes, I tend to agree with that sentiment. I'm actually leaning towards ASGI being more prescriptive here in that there mustn't be any work after the final request body component is sent in an HTTP request.

Generally speaking, for background task scheduling, resiliency requirements dictate that any tasks required to run as a result of a web request be persisted prior to returning any HTTP response.

This question is semi-related to https://github.com/django/asgiref/issues/140 - the life cycle of an ASGI coroutine is not strictly defined.

andrewgodwin commented 3 years ago

I wouldn't say that there shouldn't be any work, but I feel comfortable saying there is no guarantee that any code after the request body is closed will be executed - though, of course, this is just a more specific case of "all software may crash at any point".

People can and will do whatever they want inside servers, and this has been true since WSGI - I've seen plenty of sneaky background threads shoved into WSGI processes - but I think it's worth making it clear that ASGI a) does not recommend this and b) a better way to do this is to use an actual tasking mechanism.

dholth commented 3 years ago

This reminds me of trio's nurseries. Probably there should be one nursery per ASGI request so that when the request ends all tasks started under it also end. You might pass in a longer-lived nursery to serve as the parent for longer-lived tasks.

graingert commented 3 years ago

how about introducing a new ASGI4Application interface?

it would look like

ASGI4Application = Callable[[], AsyncContextManager[ASGI3Application]]

you'd use it with a taskgroup like this

import anyio.abc

class SomeApp:
    def __init__(self, tg: anyio.abc.TaskGroup):
        self._tg = tg

    async def __call__(self, scope, receive, send) -> None: ...
        if scope["path"] == "fire/the/missiles":
            await anyio.sleep(10)
            await self._tg.spawn(fire_the_missiles)

@Asgi3AppWrapper
@contextlib.asynccontextmanager
async def create_app_v4() -> AsyncGenerator[ASGI3Application, None]
    """
    asgiref v4 style app
    """
    async with anyio.create_task_group() as tg:
        yield App(tg)

and could make a compatibility wrapper like this:

async def _enter(v):
    cls = type(v)
    return functools.partial(cls.__aexit__, v), await cls.__aenter__(v)

class Asgi3AppWrapper:
    def __init__(self, app: ASGI4Application):
        self._app4 = app
        self._lock = anyio.create_lock()
        self._app3 = None
        self._exit = None

    async def __call__(self, scope, receive, send):
        if scope["type"] == "lifespan":
            while True:
                message = await recieve()
                if message["type"] == "lifespan.startup":
                    async with self._lock:
                        try:
                            self._exit, self._app3 = await _aenter(self._app4())
                        except Exception as e:
                            await send(
                                {"type": "lifespan.startup.failed", "message": str(e)}
                            )
                        else:
                            await send({"type": "lifespan.startup.complete"})

                elif message["type"] == "lifespan.shutdown":
                    async with self._lock:
                        if self._exit is None:
                            await send(
                                {
                                    "type": "lifespan.shutdown.failed",
                                    "message": "already closed",
                                }
                            )
                            return
                        try:
                            await self._exit(None, None, None)
                        except Exception as e:
                            await send({"type": "lifespan.shutdown.failed", "message": str(e)})
                        finally:
                            self._exit = self._app3 = self._app4 = None
                        await send({"type": "lifespan.shutdown.complete"})
                        return
            return
        async with self._lock:
            app = self._app3
        if app is None:
            raise Exception("application shutdown")
        await app(scope, receive, send)
jefer94 commented 8 months ago

Who need this feature should use periodic tasks within Celery