apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.13k stars 14.31k forks source link

Problem with "repair_run" option in DatabricksRunNowOperator #44114

Open zerodarkzone opened 2 hours ago

zerodarkzone commented 2 hours ago

Apache Airflow Provider(s)

databricks

Versions of Apache Airflow Providers

apache-airflow-providers-databricks==6.9.0

Apache Airflow version

2.10.1

Operating System

linux

Deployment

Astronomer

Deployment details

No response

What happened

When using the DatabricksRunNowOperator with the option "repair_run=True", if the databricks job fails, airflow should command a repair request to the databricks API, however this is failing with the following error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py", line 117, in _start_by_fork
    ret = args.func(args, dag=self.dag)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line 115, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 483, in task_run
    task_return_code = _run_task_by_selected_method(args, _dag, ti)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 256, in _run_task_by_selected_method
    return _run_raw_task(args, ti)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 341, in _run_raw_task
    return ti._run_raw_task(
           ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 97, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3004, in _run_raw_task
    return _run_raw_task(
           ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 273, in _run_raw_task
    TaskInstance._execute_task_with_callbacks(
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3158, in _execute_task_with_callbacks
    result = self._execute_task(context, task_orig)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3182, in _execute_task
    return _execute_task(self, context, task_orig)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/providers/databricks/operators/databricks.py", line 868, in execute
    _handle_databricks_operator_execution(self, hook, self.log, context)
  File "/usr/local/lib/python3.12/site-packages/airflow/providers/databricks/operators/databricks.py", line 114, in _handle_databricks_operator_execution
    operator.json["latest_repair_id"] = hook.repair_run(operator, repair_json)
                                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: DatabricksHook.repair_run() takes 2 positional arguments but 3 were given

Checking the operator code, this is the ofending part. image

The repair method only receives 2 arguments but airflow is passing an extra one ("operator").

image

What you think should happen instead

When a Databricks job fails, the job should be repaired but it is failing instead.

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py", line 117, in _start_by_fork
    ret = args.func(args, dag=self.dag)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line 115, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 483, in task_run
    task_return_code = _run_task_by_selected_method(args, _dag, ti)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 256, in _run_task_by_selected_method
    return _run_raw_task(args, ti)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 341, in _run_raw_task
    return ti._run_raw_task(
           ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 97, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3004, in _run_raw_task
    return _run_raw_task(
           ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 273, in _run_raw_task
    TaskInstance._execute_task_with_callbacks(
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3158, in _execute_task_with_callbacks
    result = self._execute_task(context, task_orig)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3182, in _execute_task
    return _execute_task(self, context, task_orig)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/providers/databricks/operators/databricks.py", line 868, in execute
    _handle_databricks_operator_execution(self, hook, self.log, context)
  File "/usr/local/lib/python3.12/site-packages/airflow/providers/databricks/operators/databricks.py", line 114, in _handle_databricks_operator_execution
    operator.json["latest_repair_id"] = hook.repair_run(operator, repair_json)
                                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: DatabricksHook.repair_run() takes 2 positional arguments but 3 were given

How to reproduce

Use the DatabricksRunNowOperator to run a databricks job with the "repair_run" argument as True. If the job fails, airflow is unable to repair the failed job and is going to try to retry it.

Anything else

The problem occurs with every failed Databricks job.

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 2 hours ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.