apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.07k stars 14.3k forks source link

DatabricksRunNowOperator failing as named parameters Jinja templating not getting resolved #40788

Closed vatsrahul1001 closed 3 months ago

vatsrahul1001 commented 3 months ago

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

DatabricksRunNowOperator started failing after upgrading to 6.7.0 version with the below error


[2024-07-15, 05:29:05 UTC] {taskinstance.py:2905} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 563, in _do_api_call
    for attempt in self._get_retry_object():
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 435, in __iter__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 368, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 390, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
                                     ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 573, in _do_api_call
    response.raise_for_status()
  File "/usr/local/lib/python3.11/site-packages/requests/models.py", line 1021, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://adb-2703548196728655.15.azuredatabricks.net/api/2.1/jobs/run-now
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 460, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py", line 862, in execute
    self.run_id = hook.run_now(self.json)
                  ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks.py", line 243, in run_now
    response = self._do_api_call(RUN_NOW_ENDPOINT, json)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 580, in _do_api_call
    raise AirflowException(msg)
airflow.exceptions.AirflowException: Response: {"error_code":"INVALID_PARAMETER_VALUE","message":"Job 0 does not exist."}, Status Code: 400

I have verified it works well with 6.6.0 version

What you think should happen instead?

No response

How to reproduce

  1. Try to run below DAG with databricks provider 6.7.0

import json import os from datetime import timedelta from typing import Dict, Optional

from airflow.models.dag import DAG from airflow.utils.timezone import datetime

from airflow.providers.databricks.operators.databricks import ( DatabricksRunNowOperator, DatabricksSubmitRunOperator, )

DATABRICKS_CONN_ID = os.getenv("ASTRO_DATABRICKS_CONN_ID", "databricks_default")
# Notebook path as a Json object
notebook_task = '{"notebook_path": "/Users/x/quick_start"}'
NOTEBOOK_TASK = json.loads(os.getenv("DATABRICKS_NOTEBOOK_TASK", notebook_task))
notebook_params: Optional[Dict[str, str]] = {"Variable": "5"}
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))

default_args = {
    "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
    "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)),
    "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}

new_cluster = {
    "num_workers": 1,
    "spark_version": "10.4.x-scala2.12",
    "spark_conf": {},
    "azure_attributes": {
        "availability": "ON_DEMAND_AZURE",
        "spot_bid_max_price": -1,
    },
    "node_type_id": "Standard_D3_v2",
    "ssh_public_keys": [],
    "custom_tags": {},
    "spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"},
    "cluster_source": "JOB",
    "init_scripts": [],
}

with DAG(
    dag_id="example_async_databricks",
    start_date=datetime(2022, 1, 1),
    schedule=None,
    catchup=False,
    default_args=default_args,
    tags=["example", "async", "databricks"],
) as dag:
    # [START howto_operator_databricks_submit_run_async]
    opr_submit_run = DatabricksSubmitRunOperator(
        task_id="submit_run",
        databricks_conn_id=DATABRICKS_CONN_ID,
        new_cluster=new_cluster,
        notebook_task=NOTEBOOK_TASK,
        do_xcom_push=True,
        deferrable=True
    )
    # [END howto_operator_databricks_submit_run_async]

    # [START howto_operator_databricks_run_now_async]
    opr_run_now = DatabricksRunNowOperator(
        task_id="run_now",
        databricks_conn_id=DATABRICKS_CONN_ID,
        job_id="{{ task_instance.xcom_pull(task_ids='submit_run', dag_id='example_async_databricks', key='job_id') }}",
        notebook_params=notebook_params,
        deferrable=True
    )
    # [END howto_operator_databricks_run_now_async]

opr_submit_run >> opr_run_now

Operating System

Linux

Versions of Apache Airflow Providers

databricks 6.7.0

Deployment

Astronomer

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

vatsrahul1001 commented 3 months ago

Looks like its related to https://github.com/apache/airflow/pull/40471

eladkal commented 3 months ago

cc @boraberke

boraberke commented 3 months ago

Hi @vatsrahul1001,

job_id field of DatabricksRunNowOperator is not a templated field which might be the cause of the issue. Before #40471 constructor added job_id inside the json parameter before rendering the templated field which supported job_id as a template field.

A workaround for this could be setting json parameter as

json={"job_id": "{{ task_instance.xcom_pull(task_ids='submit_run', dag_id='example_async_databricks', key='job_id') }}"},

instead of specifying explicit job_id parameter:

job_id="{{ task_instance.xcom_pull(task_ids='submit_run', dag_id='example_async_databricks', key='job_id') }}",

I do not have test environment for Databricks to test if my assumption works. Let me know if this fixes the problem.

wolfier commented 3 months ago

Previously, before 6.7.0, even though the named parameters were not templated, they were placed in a templated field named json in the init function. When execute is called, the template field json is resolved.

In 6.7.0, the change made it so that the named parameters are saved to a non-templated field overridden_json_params to be later used in the execute function in via calling _setup_and_validate_json. This means that named parameters that would have been templated are no longer resolved.

@boraberke Was this an intended change?

vatsrahul1001 commented 3 months ago

@boraberke

Hi @vatsrahul1001,

job_id field of DatabricksRunNowOperator is not a templated field which might be the cause of the issue. Before #40471 constructor added job_id inside the json parameter before rendering the templated field which supported job_id as a template field.

A workaround for this could be setting json parameter as

json={"job_id": "{{ task_instance.xcom_pull(task_ids='submit_run', dag_id='example_async_databricks', key='job_id') }}"},

instead of specifying explicit job_id parameter:

job_id="{{ task_instance.xcom_pull(task_ids='submit_run', dag_id='example_async_databricks', key='job_id') }}",

I do not have test environment for Databricks to test if my assumption works. Let me know if this fixes the problem.

I tried using templating json param as expected, however, existing example DAG using named param with templating should not break by this new change. As mentioned here using only named params instead of json is very common in use.

boraberke commented 3 months ago

Previously, before 6.7.0, even though the named parameters were not templated, they were placed in a templated field named json in the init function. When execute is called, the template field json is resolved.

In 6.7.0, the change made it so that the named parameters are saved to a non-templated field overridden_json_params to be later used in the execute function in via calling _setup_and_validate_json. This means that named parameters that would have been templated are no longer resolved.

@boraberke Was this an intended change?

40471 intended to solve the issue #35433, which is to fix json parameter to be templated as it should be. However, as you stated, named parameters that were implicitly templated (i.e. not in the template_fields but merged with json) are no longer resolved correctly.

This affected all of the below operators:

I tried using templating json param as expected, however, existing example DAG using named param with templating should not break by this new change. As mentioned here using only named params instead of json is very common in use.

I agree @vatsrahul1001, apparently docs mentioned some of the params, including job_id as templated here but I did not see them before.

Adding necessary named params into template_fields may be a way to fix it. WDYT @wolfier @vatsrahul1001?

Additionally, @potiuk should we revert #40471 or add a new commit that fixes this issue?

vatsrahul1001 commented 3 months ago

@boraberke As per documentation Template substitution occurs just before the pre_execute function of your operator is called. I don't think so adding named params into template_fields will resolve this

potiuk commented 3 months ago

Additionally, @potiuk should we revert https://github.com/apache/airflow/pull/40471 or add a new commit that fixes this issue?

Fix will be best

Stormhand commented 3 months ago

Hello, these changes also broke my code where im using jinja templating in the notebook_params for DatabricksRunNowOperator

vatsrahul1001 commented 3 months ago

@boraberke are you working on a fix?