kineticadb / kinetica-airflow

Kinetica provider for Apache Airflow.
MIT License
2 stars 3 forks source link

[Proposed fix] Pass conn id from SQL operator to hook #1

Closed alexmoroz15 closed 4 months ago

alexmoroz15 commented 4 months ago

Noticed that runs using this operator in our dev environment were failing because they were using the kinetica_default connection.

[2024-02-06, 21:14:19 UTC] {base.py:73} INFO - Using connection ID 'kinetica_default' for task execution.

Despite us passing a new conn_id to the SQL operator:

ensure_namespace_exists = KineticaSqlOperator(
    task_id="ensure_namespace_exists",
    kinetica_conn_id="kinetica_{{ params.dest_kinetica_cluster_value }}",
    doc_md="Ensure the namespace exists, if not create it.",
    sql=f"CREATE SCHEMA IF NOT EXISTS {prefixed_namespace}",
)