apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.26k stars 14.33k forks source link

Databricks Provider _get_databricks_task_id only cleanses task id #44250

Open mwoods-familiaris opened 21 hours ago

mwoods-familiaris commented 21 hours ago

Apache Airflow Provider(s)

databricks

Versions of Apache Airflow Providers

apache-airflow-providers-databricks==6.13.*

Apache Airflow version

2.10.2

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Astronomer

Deployment details

No response

What happened

_get_databricks_task_id only cleanses the task id, ref: https://github.com/apache/airflow/blob/a9242844706ca117f86d22092109939dd56435ee/providers/src/airflow/providers/databricks/plugins/databricks_workflow.py#L67 https://github.com/apache/airflow/blob/a9242844706ca117f86d22092109939dd56435ee/providers/src/airflow/providers/databricks/operators/databricks.py#L1077

However, the dag_id may also contain . - so the replacement of . with __ should be applied to the whole string, not just the task id portion, else periods placed in the dag name results in errors such as:

[2024-11-21, 13:12:42 GMT] {taskinstance.py:3310} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py", line 1252, in execute
    self.monitor_databricks_job()
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py", line 1203, in monitor_databricks_job
    current_task_run_id = self._get_current_databricks_task()["run_id"]
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py", line 1165, in _get_current_databricks_task
    return {task["task_key"]: task for task in sorted_task_runs}[
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyError: 'my.airflow.dag.with.periods__my_airflow_task'

(as the invalid chars are getting silently stripped by databricks, so the task key on the databricks side is myairflowdagwithperiods__my_airflow_task rather than my.airflow.dag.with.periods__my_airflow_task)

What you think should happen instead

The replacement of . with __ should be applied to the whole task key / run name string, not just the task id portion

How to reproduce

Use the affected operator(s) e.g. DatabricksNotebookOperator on a DAG which contains . in the dag_id

Anything else

Every time

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 21 hours ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

rawwar commented 3 hours ago

PR: https://github.com/apache/airflow/pull/43106 will also fix this issue