PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16k stars 1.57k forks source link

Allow Prefect client to be used from synchronous contexts #9695

Closed MattDelac closed 1 month ago

MattDelac commented 1 year ago

First check

Prefect Version

2.x

Describe the current behavior

get_client() from the python SDK is async and using a context manager. So it means that all the functions using it would have to be async too (whether or not the program runs (a)synchronously)

Using the following does not work with a context manager

asyncio.run(with get_client()) as client:
    ...

Describe the proposed behavior

client = asyncio.run(get_client())
client.another_method()
# OR
asyncio.run(client.another_method())

And ideally I could get the client by using asyncio.run() so that after I don't need to do any async operation. And my script could remain totally synchronous.

Example Use

No response

Additional context

As per our convo with the support

Is there a way to run the client SDK without async? this just adds complexity in our codebase for no good reason. Also I would have to make most of our functions async too ... For us there is no gain is using the SDK async. Our scripts' performance is unimportant as we use the SDK for light operations only.

I don't think I can use the asyncio.run() function with a context manager. Basically the following does not work

asyncio.run(with get_client()) as client:
   ...

And as soon as I make anything "async" or "await", I would need to mark the calling function also async ... Which is like a big refactor of our codebase and how our users were using our internal SDK

zanieb commented 1 year ago

This is on our roadmap.

In the meantime, you can use asynchronous tasks from synchronous flows without changing the rest of your code. This would allow you to use the asynchronous client without changing the rest of your code.

this just adds complexity in our codebase for no good reason.

There is a good reason — since our engine is asynchronous using the client asynchronously will be more performant. That said, we understand that a synchronous interface is still very valuable and convenient.

And as soon as I make anything "async" or "await", I would need to mark the calling function also async

This is not the case. The following pattern is okay to use:

def foo():
   asyncio.run(bar())

async def bar():
     async with prefect.get_client() as client:
          await ...
MattDelac commented 1 year ago

There is a good reason — since our engine is asynchronous using the client asynchronously will be more performant

I get the design decision 👍 just that it adds complexity for users with simpler use cases like "Have a CLI that uses the python SDK to perform some operations". Here speed is unimportant to some extent.

This is not the case. The following pattern is okay to use:

Yeah I would need to wrap most of the Prefect logic in async functions and then call them synchronously. Realistically this adds complexity.

Right now, we play with the REST endpoints directly.

MattDelac commented 1 year ago

This is not the case. The following pattern is okay to use:

def foo():
   asyncio.run(bar())

async def bar():
     async with prefect.get_client() as client:
          await ...

yes I will mostly likely need to double the number of methods in order to make some publics in a synchronous way.

And so instead of

def create_something():
    client = get_client()

    # Boilerplate
    ...

    client.create_flow(...)

I have to do something like the following

async def _sync_prefect_create_something():
    async with get_client():
        ...

def create_something():    
    # Boilerplate
    ...

    asyncio.run(_sync_prefect_create_something())
hozn commented 8 months ago

I wanted to add a comment here to urge further consideration of providing a blocking version of PrefectClient.

To provide context, we have a blocking library for performing our database operations. (This is due to significant instability we experienced with the asyncio postgres drivers used w/ sqlalchemy.) We use FastAPI but also some blocking tools (e.g. CLI scripts) to invoke these blocking DB APIs. When doing this from FastAPI we either use blocking routes, which FastAPI will execute in a thread pool or we use a manual thread-pool executor. In certain situations we want to have Prefect modifications participate in the database transaction -- e.g. we want creating a specific object to result in a Deployment creation and to fail (+ rollback transaction) if this HTTP request to the Prefect server fails.

A typical invocation thus looked like:

async event loop -> thread -> async event loop (PrefectClient)

The problem with the async nature of PrefectClient arises because we are sometimes calling in to our API with an event loop already running and sometimes without an event loop. So we cannot consistently use asyncio.run(prefect_client.do_something(...)) due to the inability to use this when there is already an eventloop running. We spent considerable time experimenting with nest_asyncio library; however, this introduced oddities such as hanging when trying to close out event loops. Other issues with that library led us to conclude that if we can avoid it, we really should.

In the end, the simple solution for us was to create a subset of the API functionality that we needed in BlockingPrefectClient class. This is basically copy/paste with removing await and changing things like AsyncExitStack to ExitStack. This feels kludgy, for sure, though less kludgy than understanding and testing the nuanced asyncio+thread turtles we were traversing.

cicdw commented 1 month ago

This is now achievable:

client = get_client(sync_client=True)

There are still many improvements we can make to handling our sync / async interfaces; some discussion on the direction we are planning to take can be found here - feel free to comment.

I'm going to close this issue as complete and ask that any follow-up issues related to the sync client be opened separately.