apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.27k stars 14.34k forks source link

AIP-81 Implement POST/Insert Variables in FastAPI #43897

Open bugraoz93 opened 1 week ago

bugraoz93 commented 1 week ago

Description

Develop an endpoint in FastAPI that enables POST one or multiple Variables from a list. This feature should allow users to import their connections from a set of connections in a single request.

Use case/motivation

AIP-81

Related issues

https://github.com/apache/airflow/issues/42560

Are you willing to submit a PR?

Code of Conduct

jason810496 commented 1 week ago

@bugraoz93 The issue I will take, Thanks !

jason810496 commented 1 day ago

While working on this issue, I came up with two approaches and would appreciate some feedback to decide which one to pursue 🙏

Context for Variable.set

For the post_variable implementation, the Variable.set method is used to add new variables.
Ref: variables.py#L155

The Variable.set method involves four steps:

  1. Check if the secret exists in the custom secrets backend
    For example, if using Vault, AWS, GCP Secret Manager, etc., it will call SecretsBackend.get_variable to verify whether the variable is set in the external secret manager.
  2. Delete the old variable in the metastore
  3. Add the variable with the new value
  4. Set the secret in the cache

Approaches

1. Add a get_variables method to the Variables model and SecretsBackend

This approach would require implementing a get_variables method for both core and external secret managers:

2. Call Variable.set concurrently using asyncio

Instead of creating a get_variables method, this approach would involve calling Variable.set concurrently, and have to make sure there are no thread-safety issues.

Pros and Cons

First Approach

Second Approach

My opinion

We can start with the second approach to quickly implement the main requirement (inserting multiple variables in FastAPI).
Afterward, we can gradually refactor to use the first approach for better efficiency.

cc @bugraoz93 @pierrejeambrun

pierrejeambrun commented 1 day ago

I would say second approach is more straightforward to implement with an asyncio.gather on all the variables. The issue is for that to work we need the underlying methods, (called methods) to also be async and release the loop on blocking I/O.

Which is not the case at the moment. We would need to async await the calls to the secret backend and also to the DB. So I would say that this is too much work and a deeper change.

That leaves us with choice 1 only I believe.

Or backup choice: no bulk endpoint, asyncio on the client side (CLI) to generate concurrent requests on the API to create all the different Variables at the same time using only 1 thread on the client. But I think that was already considered and discarded.

bugraoz93 commented 21 hours ago

Great findings! Thanks! My main concern was the Variable :)

Which is not the case at the moment. We would need to async await the calls to the secret backend and also to the DB. So I would say that this is too much work and a deeper change.

I agree this would be too much effort. I would rather not touch those code pieces at all.

Or backup choice: no bulk endpoint, asyncio on the client side (CLI) to generate concurrent requests on the API to create all the different Variables at the same time using only 1 thread on the client. But I think that was already considered and discarded.

My idea was to have a single/common approach for all the file inserts. Making these changes requires more effort than just handling this case on the CLI. Let's simplify this and aim to provide the features for v3 rather than putting more effort which can have multiple side effects too. We may need to spend operational (bugfix/maintenance) time on top of it. We can include this as an improvement point to have a common approach for all file inserts as a next step maybe in v3.1 and we would have more time to implement at ease. Let's take a step back and reject this ticket. :)

What do you think?

pierrejeambrun commented 12 hours ago

I agree that consistency would be great. If we feel like this is too much work for now and that our bandwidth needs to be focused on other priorities we can mark it for 3.1 👍.

How will you do the bulk insert for variables while we don't have this implemented ? async loop on the client / CLI as suggested above or you have something else in mind ?

jason810496 commented 10 hours ago

If we implement this on the FastAPI side, I would add a wrapper async def set_async around the _set method to allow the use of asyncio.gather. Additionally, I have verified that the SQLAlchemy session is thread-safe for this case, as the same session object will be used across all coroutines. Ref: https://github.com/sqlalchemy/sqlalchemy/issues/5828

If implemented on the CLI side, I agree with @pierrejeambrun approach.
Both works, depends on finial decision.

pierrejeambrun commented 8 hours ago

If we implement this on the FastAPI side, I would add a wrapper async def set_async around the _set method to allow the use of asyncio.gather. Additionally, I have verified that the SQLAlchemy session is thread-safe for this case, as the same session object will be used across all coroutines. Ref: https://github.com/sqlalchemy/sqlalchemy/issues/5828

The Variable.set method is synced, so basically it will never release and block the main loop. The way you can run sync code from async route is with run_in_executor from asyncio or run_in_threadpool from starlette. https://sentry.io/answers/fastapi-difference-between-run-in-executor-and-run-in-threadpool/

Additionally, I have verified that the SQLAlchemy session is thread-safe for this case, as the same session object will be used across all coroutines. Ref: https://github.com/sqlalchemy/sqlalchemy/issues/5828

I don' think so. As mentioned above you will have to use multiple threads otherwise that would just basically be the same a sequential requests (because the main loop is blocked) => might run into problem with one session shared. And there is a note about AsyncSession which are not threadsafe` in the issue you linked.

Worth trying both, I have no preference at this point. (Maybe inside FastAPI is cleaner because from the CLI point of view, everything is handled the same -> through 1 call to the API, but not sure if this will be super easy to do because of the things highlighted above)