apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.31k forks source link

`DatabricksNotebookOperator` fails when task_key is longer than 100 characters #41816

Open rawwar opened 2 months ago

rawwar commented 2 months ago

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

According to the Databricks API documentation, task_key has a max length of 100: Link .

When the Dag ID and task ID strings are long enough, we create a task_key with more than 100 characters. However, this limit does not affect during job creation. Job gets created with the full name. But, when fetching using the job run details using getrun endpoint, it truncates the task_key. This is causing issue in the following line of code to cause key error: Link

What you think should happen instead?

task key should be unique. Hence, we can include an uuid, instead of using dag_id+task_id

How to reproduce

have a dag_id and task_id names to be longer than 100 characters together and use DatabricksNotebookOperator

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

apache-airflow-providers-databricks==6.8.0

Deployment

Astronomer

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

eladkal commented 2 months ago

If there is a 100 limit why Databricks don't raise error when you create the notebook?

However, this limit does not affect during job creation

This feels more like a feature request to Databricks. If the API call return exception Airflow will notify users about the problem if they don't then this is not really Airflow problem. We should avoid trying to solve services problems like this within Airflow. If Databricks decides to change the limit to 200 or 50. What then? We will need to release a new version of the provider to accommodate. What about all the users who run previous versions?

Lee-W commented 2 months ago

I agree with @eladkal. This should be a feature request to Databricks, but we could have a workaround like @rawwar suggests. WDYT?

rawwar commented 2 months ago

@eladkal, That makes sense. But this case still needs to be handled on the Airflow side as well since we want a way to track the job. I have a few suggestions here.

suggestions:

Lee-W commented 1 month ago

@rawwar I think we at least can do Clearly mention in the documentation and also print logs when the task_key is beyond 100 characters for now. I'm thinking of making Instead of dag_id+task_id as the task key, we can use a [5-10] digit UUID that should be sufficiently unique. optional feature? some users might expect it to be dag_id+task_id.

maciej-szuszkiewicz commented 1 month ago

Hey, I've ran into the same issue today. In our case, we're using in-house DAG factory for generating DAGs from configuration files. This can result in both long dag ids and task ids, as the task ids also contains task groups names. For example, I have a dag id that's already 81 chars long, and in addition to that, the DatabricksWorkflowTaskGroup is nested in another group. So, for me the task key generated by DatabricksTaskBaseOperator._get_databricks_task_id is 125 chars long, and I have no way of shortening it.

When I try to run this dag, DatabricksWorkflowTaskGroup.launch operator fails with:

requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://<redacted>.cloud.databricks.com/api/2.1/jobs/create
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
    return func(self, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/operators/databricks_workflow.py", line 201, in execute
    job_id = self._create_or_reset_job(context)
  File "/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/operators/databricks_workflow.py", line 178, in _create_or_reset_job
    job_id = self._hook.create_job(job_spec)
  File "/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/hooks/databricks.py", line 226, in create_job
    response = self._do_api_call(CREATE_ENDPOINT, json)
  File "/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 579, in _do_api_call
    raise AirflowException(msg)
airflow.exceptions.AirflowException: Response: {"error_code":"INVALID_PARAMETER_VALUE","message":"The provided task key (of 125 characters) exceeds the maximum allowed length of 100 characters."}, Status Code: 400

I see three options here:

potiuk commented 1 month ago

Warning when it happens is already merged. Should we keep that issue open ? Or should we close it after this is addressed better - everyone?

rawwar commented 1 month ago

If we agree on an approach, I can work on this one. So far, I like the idea of the task key being passed by the user. If user doesn't provide one, we generate a random id(possibly by using an uuid)

potiuk commented 1 month ago

Assigned to you - I like the idea too.

pankajkoti commented 1 month ago

Another idea I have @rawwar is to check if we can compute a hash of the task ID built using the current combination of DAG ID task ID instead of a completely random UUID. If we generate a random UUID, we would also need to store that against each task so that it can be monitored accordingly

rawwar commented 1 month ago

Another idea I have @rawwar is to check if we can compute a hash of the task ID built using the current combination of DAG ID task ID instead of a completely random UUID. If we generate a random UUID, we would also need to store that against each task so that it can be monitored accordingly

Yeah, I think that's better. I will just encode it using base64 hash it

potiuk commented 1 month ago

Agreed