astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
607 stars 153 forks source link

Increase Task Throughput #1120

Open pankajkoti opened 2 months ago

pankajkoti commented 2 months ago

Description co-authored by @tatiana @pankajastro

As of Cosmos 1.5, we are using dbt core to run transformations in the database. In other words, the Airflow worker node is being blocked during the execution of the SQL transformation. If the transformation takes 6h, we have a Airflow worker node just waiting.

One way to overcome this is to use Airflow deferrable operators, as opposed to using dbt-core commands to execute the transformation. This approach is not new, and it was discussed in Airflow Summit 2023 by Monzo. We (@tatiana ) had a meeting with Monzo to discuss the details afterward, in London.

The overall idea is:

For a first end-to-end, we'll only support:

This will allow us to measure the impact of using deferrable operators, and then we can extend to:

Tasks initially planned:

  1. PoC to try to have this feature end-to-end that
    • Introduce ExecutionMode.LOCAL_AIRFLOW_ASYNC (think if we can find a better name)
    • Check how we are populating the compiled_sql template field in Cosmos. If this is being populated after running the task, then, we can simplify the first E2E to use LoadMode.MANIFEST, and assume the compiled SQL is avaialble for Cosmos.
    • If the dbt profile is not related to Databricks, error
    • if it is related to Databricks, than attempt to use the DatabricksSubmitRunOperator to run the previously compiled sql.

Once we've done a PoC, we can then breakdown into other tasks, so we can split the work.

References when we support Snowflake: currently (provider 5.6.0), the official Airflow Snowflake SQLExecuteQueryOperator does not support running SQL using deferrable mode. The two alternatives would be either using this provider's SnowflakeSqlApiOperator which would require end-users to enable/have access to the Snowflake SQL API or using the SnowflakeOperatorAsync from Astronomer Providers. Some future references for when we decide to follow this path:

Tasks originally planned as part of this epic (more to come):

tatiana commented 1 month ago

One of the advantages of this feature would also be to give additional information to end-users that Airflow currently exposes, but dbt does not.

For example, by using dbt, the BigQuery job ID is not printed in the logs, but if we changed Cosmos to use the Airflow BigQuery operator to run the BQ SQL transformations, the logs include the full job ID. We could also expose this information in the Airflow UI.

This was brought up in a discussion with a customer: https://astronomer.slack.com/archives/C074NP4A8G1/p1722774197657379