firebase / firebase-functions-python

Apache License 2.0
129 stars 22 forks source link

Async support #135

Open antont opened 9 months ago

antont commented 9 months ago

Would it make sense to support async functions here, so that the event handlers could be called in an async manner and consequently call async libraries?

This way a single instance could be handling more requests in parallel, for example if the cloud function calls several external APIs in a sequence to complete a process, and spends most of the time just waiting for responses, and uses only a little memory and CPU in general.

It is possible to call async code from the current sync functions, but it does not help with scalability, as the cloud function itself still blocks during the whole operation. This can help to do things in parallel in a cloud function, but not with sequential operations, which are common at least in our use case.

The original JS runtime is async, like all things Node, but also Python has standard async in version 3. Consequently, firebase-admin-python has worked on adding async support, since google-auth added async support in 2021. https://github.com/firebase/firebase-admin-python/issues/104

Hey Folks, We just released v5.3.0 which added async support for the Cloud Firestore API. This is the first step into fulfilling this FR so please give it a try and let us know what you think!

Starlette is a popular, lightweight ASGI framework which could maybe used for this, the same way Flask is for the sync version? https://www.starlette.io/

exaby73 commented 9 months ago

Hello @antont. I'm curious if using asyncio.run() or asyncio.run_until_complete() is a solution here

antont commented 9 months ago

Hello @antont. I'm curious if using asyncio.run() or asyncio.run_until_complete() is a solution here

Not in a user-written cloud function, because it makes the function block the instance, even while the underlying async calls are idle e.g. waiting for response from external calls.

The firebase-functions-core would need to call the user functions in an async way, so that the instance could have multiple call handlers running in parallel.

I don't know the details of Cloud Run, whether using a single instance and process there for processing multiple function calls make sense, but I think it would in such cases where the function is light on resources like mem, and mostly just waits for network responses. And AFAIK the node runtime already works like that there.

ekalosak commented 9 months ago

I would also like to see Cloud Run's existing Python asyncio support implemented in FB Functions. It's of course a serious concern for scalability, considering many functions are IO-bound.

antont commented 9 months ago

A very simple example to illustrate the requested feature, a bit simplified but close to real code:

Sync version:

@on_document_created(document="my_collection/{document_id}")
def document_created(event: Event[DocumentSnapshot]):
    document_id = event.params['document_id']
    result_A = requests.post(api_url_A, json = {'id': document_id}) #call to API A, takes 10s
    result_B = requests.post(api_url_B, json = {'a': result_A}) #call API B, requires result_A, takes 5s
    #do something with result_B, like store it to Firestore

AFAIK this blocks the instance for the 15 seconds, so that if more requests come in, Firebase / Cloud Run needs to spin up new instances for the function.

Contrast this with an async version:

@on_document_created(document="my_collection/{document_id}")
async def document_created(event: Event[DocumentSnapshot]):
    document_id = event.params['document_id']
    result_A = await aio_request.post(api_url_A, json = {'id': document_id}) #call to API A, takes 10s
    result_B = await aio_request.post(api_url_B, json = {'a': result_A}) #call API B, requires result_A, takes 5s
    #do something with result_B, like store it to Firestore

This could be called with from an async dispatcher multiple times, handle maybe hundreds of requests in a single instance, and not be any slower to respond for the user.

Even with say 1 new document creation per second, the handler would get 15 triggers during the processing of a single one, so the sync version ends up starting up more instances (i think), while the async one would handle them easily on a single instance, in a single core and thread.

antont commented 9 months ago

FYI, I made an experimental hack to allow concurrent execution of multiple asynchronous functions, triggered by firebase function callbacks, so that they keep running and finish after the callback has returned.

This allows a single instance to serve lots of requests, because after the callback returns, it will get called again in the same instance for new requests.

Maybe @ekalosak knows about caveats here, like if Cloud Run will still kill the instance sometimes before the functions are completed. I did read from the docs that it's possible to conf Run so that it keeps an instance alive always, and that seems to be available in this lib with min_instances param. For us now however it's better that the instance shuts down eventually.

I tested this with a test endpoint that simply prints log info every 10 secs, with total duration of 1 minute. I call the endpoint 100 times in a sequence, each call takes 0.2s over the net from where I am. The instance stays steadily responsive, and completes the background processing of each call. After a few minutes without more calls, it will shut down.

The test endpoint and worker function:

@https_fn.on_request(timeout_sec=100, concurrency=100, max_instances=1)
def test_concurrent(req: https_fn.Request):
    asynctask_thread.concurrent_handler_id += 1
    job = do_concurrent(asynctask_thread.concurrent_handler_id)

    if asynctask_thread.runner is None: # or not asynctask_thread.runner.is_alive:
        asynctask_thread.init() #sets new runner
    asynctask_thread.runner.add_job(job)

    return https_fn.Response("done")

async def do_concurrent(task_id: int):
    """a long running thing to test concurrency"""
    for i in range(6):
        print(f"[do_concurrent] {task_id}: sleeping at {i} in", os.getpid())
        await asyncio.sleep(10)

And the async task runner thread is in https://gist.github.com/antont/237c23b3edf35f1a9861727c52cfb3fe

I do realize that:

So, this works in my tests now, but am curious if there are places where this would be known to break. Am also considering adding proper async support here, but I suspect that it would be more work, while this hack was pretty quick to do.

ekalosak commented 9 months ago

All I'll say re: rolling your own asyncio event loop is that it's a project unto itself, and there are odd failure conditions your preliminary testing likely doesn't catch. This is a big request for the Fb team, but a high value one.

exaby73 commented 8 months ago

I believe to fully support this, https://github.com/GoogleCloudPlatform/functions-framework-python will have to support async first. Until that happens, I don't think we can implement this without that first

antont commented 8 months ago

FYI, as a workaround, I finally ended up using a vanilla Run instance, with a python ASGI server, to handle Tasks triggered by firebase functions.

So for the long-running task handler, I'm not using firebase_functions at all, but just defined a Docker container with Uvicorn & Starlette (via FastAPI), following the instructions in https://dev.to/0xnari/deploying-fastapi-app-with-google-cloud-run-13f3

I first created a task handler using firebase_functions, but then deployed the vanilla Run instance, and switched the task url to point there, keeping the same queue that the lib nicely created.

I reuse the same codebase in both deployments, so it's all async, but I just use the asyncio.run and friends to run them from the sync firebase triggers.

I'm happy, because tasks are a good way to run these long running processes anyway, also because the external API calls may fail, so the retry mechanism is very nice. And Starlette runs the async code perfectly so a single instance & process can easily handle I think at least hundreds of such jobs concurrently.

It would be sweet to use the nice helpers in firebase_functions to define and deploy also async Run instances, but we can live with these two deployment paths ok.

I'm curious, though, about how hard it would be to just handle also the Firebase triggers in the ASGI server? Can I somehow skip using firebase_functions style deployment, and have Firebase call some self defined URL? Then I would have only a single deployment target and native async running for the whole thing.

For supporting ASGI in firebase_functions itself, it would seem simplest to me if one could optionally just skip the Flask things in internal.serving, and provide an alternative 'serving'. Makes sense though that the support would come from the functions-framework level, but I didn't really figure out what it would mean there.

ekalosak commented 8 months ago

Toni, nice - thanks for sharing, too.

It seems like the current state of play for async on Firebase Functions is "roll your own server on Cloud Run." This makes sense, and is a really solid solution to "hey I need this now."

Looking forward, just repeating Nabeel here, the underlying functions framework needs async support.

Ultimately, it would be amazing to write something like this:

@firestore_fn.on_document_created('...')
async def calls_an_api(...)
  t1 = asyncio.create_task(...)  # call an API
  t2 = asyncio.create_task(...)  # call a different API
  return do_a_thing(await asyncio.gather([t1, t2]))

because at present, while one can run outgoing tasks concurrently (see EDIT below), firebase functions require scaling out to handle multiple concurrent requests - even if the response is merely IO-bound i.e. doesn't require the extra CPU available via scale-out.


Toni, regarding min_instance, that will likely only keep a particular instance alive, from my reading. It wouldn't stop Cloud Run from killing other running instances if they aren't the "lucky" ones to be selected by Cloud Run's scaler. From a quick read of the docs, Cloud Run instances are considered "not idle" if they have an open request.

This is where your use-case and mine differ, I believe. I just want to allow a single instance to handle multiple IO-bound requests simultaneously so I don't erroneously scale out due to a slow upstream API. Your case is, I believe, a bit more complex, with long-running jobs using callbacks - that is, without open HTTP connections corresponding to ongoing jobs.


EDIT: well, I solved part of my use case. Unfortunately it still requires a single compute unit (thread, Pod, whatever GCP is using) per request. But using asyncio to execute multiple outgoing HTTP requests is easy:

def my_sync_fb_fn(...)
  import asyncio
  t1 = asyncio.to_thread(sync_fn1, *args1)
  t2 = asyncio.to_thread(sync_fn2, *args2)
  return asyncio.run(await asyncio.gather(t1, t2))

Still, it would be good to have the option to do async my_async_fb_fn(...) so multiple incoming requests that rely on blocking network tasks could be done concurrently.