apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.49k stars 14.13k forks source link

TaskGroupContext not removing resources properly if DatabricksWorkflowGroup raises inside __exit__ method #42164

Open fedemgp opened 2 weeks ago

fedemgp commented 2 weeks ago

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

The __exit__ method of the DatabricksWorkflowTaskGroup operator is capable of raise an exception (it literally does it If you instance an operator class that cannot be serialized into a json here)

If something inside the method raises, there will not be called the __exit__ method of the superclass, that is in charge of poping a class' attribute with the group executed.

Any further workflow that you instance inside that dag (or another dag) will raise an exception with the message RuntimeError: ('Cannot mix TaskGroups from different DAGs: %s and %s').

What you think should happen instead?

I see two possible solutions, one more feasible than the other.

  1. move the call to the superclass' exit method at the beginning of this method (not 100% that this will work because maybe the code inside relies on having that context attribute in it)
  2. encapsulate the code in a try-finally block, that makes sure the superclass exit method is called no matter what at the end of execution.

How to reproduce

I will add a simple unit test where you can check the issue

import pytest
from airflow import DAG
from airflow.exceptions import AirflowException
from airflow.operators.python import PythonOperator
from airflow.providers.databricks.operators.databricks_workflow import DatabricksWorkflowTaskGroup
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroupContext

def test_example():
    dag = DAG(
        dag_id="test_workflows",
        start_date=days_ago(1),
        schedule_interval=None,
    )

    workflow = DatabricksWorkflowTaskGroup(databricks_conn_id="default", group_id="tasks", dag=dag)

    with pytest.raises(AirflowException) as e_info:
        with workflow:
            # Force an exception instantiating an operator that is not supported by databricks workflows
            op = PythonOperator(python_callable=lambda x: x, task_id="foo")

    assert str(e_info.value) == "Task tasks.foo does not support conversion to databricks workflow task."

    # Here the test will fail because TaskGroupContext didn't pop the instance
    group = TaskGroupContext.pop_context_managed_task_group()
    assert group is None

Operating System

MacOs Sonoma 14.6.1

Versions of Apache Airflow Providers

apache-airflow-providers-common-compat==1.1.0 apache-airflow-providers-common-io==1.4.0 apache-airflow-providers-common-sql==1.15.0 apache-airflow-providers-databricks==6.8.0 apache-airflow-providers-fab==1.2.2 apache-airflow-providers-ftp==3.10.1 apache-airflow-providers-google==10.9.0 apache-airflow-providers-http==4.12.0 apache-airflow-providers-imap==3.6.1 apache-airflow-providers-smtp==1.7.1 apache-airflow-providers-sqlite==3.8.2

Deployment

Amazon (AWS) MWAA

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 2 weeks ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.