Open gauravtanwar03 opened 5 months ago
To handle the dynamic nature of transient EMR clusters with Cosmos, you can programmatically update the Airflow connection within your DAG to reflect the new host information each time the cluster is created. Here’s a streamlined approach:
Create or Update Airflow Connection Programmatically: Use Airflow's BaseHook
and Connection
classes to update the connection details dynamically within your DAG.
Store Dynamic Host Information: Use Airflow Variables or Environment Variables to store the dynamic host information, which can be updated programmatically when the EMR cluster is created.
Update Connection in DAG: Ensure the connection is updated before any tasks that depend on the EMR cluster are executed.
Here’s an example workflow:
import os
from datetime import datetime
from pathlib import Path
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from airflow.hooks.base import BaseHook
from airflow.models import Connection
from cosmos import DbtTaskGroup, ExecutionConfig, ProfileConfig, ProjectConfig
from cosmos.constants import InvocationMode
from cosmos.profiles import get_automatic_profile_mapping
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
execution_config = ExecutionConfig(invocation_mode=InvocationMode.DBT_RUNNER)
def update_airflow_connection(conn_id, host, port, login, password, schema):
conn = Connection(
conn_id=conn_id,
conn_type='postgres',
host=host,
port=port,
login=login,
password=password,
schema=schema
)
BaseHook.get_connection(conn_id).set_connection(conn)
@dag(
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
)
def cosmos_emr_workflow() -> None:
dynamic_host = os.getenv("DYNAMIC_EMR_HOST")
dynamic_port = os.getenv("DYNAMIC_EMR_PORT")
dynamic_login = os.getenv("DYNAMIC_EMR_LOGIN")
dynamic_password = os.getenv("DYNAMIC_EMR_PASSWORD")
dynamic_schema = os.getenv("DYNAMIC_EMR_SCHEMA")
update_airflow_connection(
conn_id="emr_conn",
host=dynamic_host,
port=dynamic_port,
login=dynamic_login,
password=dynamic_password,
schema=dynamic_schema
)
pre_dbt = EmptyOperator(task_id="pre_dbt")
jaffle_shop = DbtTaskGroup(
execution_config=execution_config,
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=get_automatic_profile_mapping(
conn_id="emr_conn",
profile_args={"schema": dynamic_schema},
),
),
operator_args={"install_deps": True},
default_args={"retries": 2},
)
post_dbt = EmptyOperator(task_id="post_dbt")
pre_dbt >> jaffle_shop >> post_dbt
cosmos_emr_workflow()
This approach ensures that the Airflow connection is always up-to-date with the latest host information for the transient EMR cluster, allowing Cosmos to interact with the cluster seamlessly.
To continue the conversation, mention @dosu.
I want to use EMR transient cluster along with cosmos. As per my understanding, cosmos requires the host to available in airflow connection before hand and in case of transient cluster the host changes with every run. is there any existing workflow that i use to handle this?