apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.41k stars 14.36k forks source link

RuntimeError when retrying DAG run with zero-length mapped tasks #43214

Open szeswee opened 1 month ago

szeswee commented 1 month ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.9.2

What happened?

The Airflow docs state the following behaviour when encountering zero-length maps when using Dynamic Task Mapping:

If the input is empty (zero length), no new tasks will be created and the mapped task will be marked as SKIPPED.

The abovementioned behaviour is indeed correctly observed when a mapped task is first executed as part of a new DAG run (i.e. try_number = 1).

However, on subsequent tries (i.e. try_number > 1), the mapped task will instead throw the following exception:

[2024-10-21, 10:54:51 UTC] {taskinstance.py:2306} INFO - Starting attempt 2 of 2
[2024-10-21, 10:54:51 UTC] {taskinstance.py:2330} INFO - Executing <Mapped(PythonOperator): print_args> on 2024-10-21 10:54:27.291897+00:00
[2024-10-21, 10:54:51 UTC] {standard_task_runner.py:63} INFO - Started process 341 to run task
[2024-10-21, 10:54:51 UTC] {standard_task_runner.py:90} INFO - Running: ['airflow', 'tasks', 'run', 'dtm_failure', 'print_args', 'manual__2024-10-21T10:54:27.291897+00:00', '--job-id', '13517', '--raw', '--subdir', 'DAGS_FOLDER/dtm_failure.py', '--cfg-path', '/tmp/tmpahwek3h8', '--map-index', '0']
[2024-10-21, 10:54:51 UTC] {standard_task_runner.py:91} INFO - Job 13517: Subtask print_args
[2024-10-21, 10:54:51 UTC] {task_command.py:426} INFO - Running <TaskInstance: dtm_failure.print_args manual__2024-10-21T10:54:27.291897+00:00 map_index=0 [running]> on host 172.21.174.13
[2024-10-21, 10:54:52 UTC] {taskinstance.py:2905} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2479, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode, session=session)
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2633, in _execute_task_with_callbacks
    task_orig = self.render_templates(context=context, jinja_env=jinja_env)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3094, in render_templates
    original_task.render_template_fields(context, jinja_env)
  File "/usr/local/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 829, in render_template_fields
    mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 688, in _expand_mapped_kwargs
    return self._get_specified_expand_input().resolve(context, session)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/expandinput.py", line 202, in resolve
    data = {k: self._expand_mapped_field(k, v, context, session=session) for k, v in self.value.items()}
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/expandinput.py", line 202, in <dictcomp>
    data = {k: self._expand_mapped_field(k, v, context, session=session) for k, v in self.value.items()}
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/expandinput.py", line 182, in _expand_mapped_field
    found_index = _find_index_for_this_field(map_index)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/expandinput.py", line 176, in _find_index_for_this_field
    raise RuntimeError(f"cannot expand field mapped to length {mapped_length!r}")
RuntimeError: cannot expand field mapped to length 0
[2024-10-21, 10:54:52 UTC] {taskinstance.py:2953} ERROR - Unable to unmap task to determine if we need to send an alert email
[2024-10-21, 10:54:52 UTC] {taskinstance.py:1206} INFO - Marking task as FAILED. dag_id=dtm_failure, task_id=print_args, run_id=manual__2024-10-21T10:54:27.291897+00:00, map_index=0, execution_date=20241021T105427, start_date=20241021T105451, end_date=20241021T105452
[2024-10-21, 10:54:52 UTC] {standard_task_runner.py:110} ERROR - Failed to execute job 13517 for task print_args (cannot expand field mapped to length 0; 341)

To emphasise, this issue only occurs when a zero-length map is encountered on try_number > 1. When try_number = 1 and there exists a zero-length map, this issue does not occur.

What you think should happen instead?

There shouldn't be an exception raised. Looking at the logs above, it seems to be a bug with the handling of mapped tasks as it relates to sending alerts to emails.

How to reproduce

  1. Create the following DAG with this code:

    from airflow import DAG
    from airflow.operators.python import PythonOperator
    
    def generate_args() -> list[dict]:
      from airflow.operators.python import get_current_context
    
      context = get_current_context()
      is_first_try = context["ti"].try_number == 1
    
      return [{"foo": f"bar_{idx}"} for idx in range(5)] if is_first_try else []
    
    with DAG(
      "dtm_failure",
      description="Demonstrate DAG failure with zero-length mapped tasks on subsequent tries",
      schedule=None,
    ):
      task_generate_args = PythonOperator(
          task_id="generate_args",
          python_callable=generate_args,
      )
    
      task_print_args = PythonOperator.partial(
          task_id="print_args",
          python_callable=lambda foo: print(f"Arg: {foo}"),
      ).expand(op_kwargs=task_generate_args.output)
  2. On your Airflow deployment, manually trigger a run of the dtm_failure DAG.
  3. The first try will succeed and you should see the following: Screenshot 2024-10-21 at 7 13 53 PM
  4. Select Clear > Clear existing tasks to retry the entire DAG run.
  5. The second try will fail and you should see the following: Screenshot 2024-10-21 at 7 14 28 PM
  6. Select the failed mapped task to view its logs.

Operating System

Debian 11

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 month ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

romsharon98 commented 1 month ago

able to reproduce it also on the main branch

manojgosavi commented 1 month ago

Hi @romsharon98 , @potiuk , Can I work on this issue? Please assign it to me.Thanks.