when running a dag created with partial_subset, the id of sentinel values are not properly maintained for decorated python mapped tasks, specifically EXPAND_INPUT_EMPTY that is set as the default value for expand_input for any mapped @task. this causes all calls to the mapped, decorated tasks to fail with following error: AssertionError: unexpected expand_input: DictOfListsExpandInput(value={}) raises here as the comparison is done using is not; i have added a minimally reproducible example below.
in debugging, adding EXPAND_INPUT_EMPTY to deepcopy's memo arg fixed the issue within partial_subset like
as I imagine the comparison to a sentinel value by id instead of by value will want to continue. If that is the desired path forward, I can open a PR to that effect.
What you think should happen instead?
the mapped task should run without failure.
How to reproduce
import datetime
import os
from airflow import settings
from airflow.utils.state import DagRunState
from airflow.utils.db import initdb, resetdb
from airflow.decorators import dag
from airflow.decorators import task
@task
def generate_input() -> list[int]:
return [1, 2, 3, 4, 5, 6, 7]
@task
def mapped(multiplier: int, value: int) -> int:
return multiplier * value
@task
def validate_output(multiplier: int, results: list[int]) -> None:
for result in results:
if result % multiplier != 0:
raise ValueError("Oops!")
@dag(
dag_id="reproducible_dag",
default_args={
"start_date": datetime.date.today().strftime("%Y-%m-%d"),
"retries": 0,
},
schedule=None,
catchup=False,
render_template_as_native_obj=True,
)
def build_reproducible_dag() -> None:
mapped_input = generate_input()
mapped_output = mapped.partial(multiplier=3).expand(value=mapped_input)
validate_output(multiplier=3, results=mapped_output)
reproducible_dag = build_reproducible_dag()
if __name__ == "__main__":
from recidiviz.tools.postgres import local_postgres_helpers
# set up
temp_dir = local_postgres_helpers.start_on_disk_postgresql_database()
os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = (
local_postgres_helpers.on_disk_postgres_db_url().render_as_string()
)
settings.initialize()
initdb(load_connections=False)
dag_run = reproducible_dag.test() # this run will succeeed
assert dag_run.get_state() == DagRunState.SUCCESS
# reset for test
resetdb()
reproducible_subset = reproducible_dag.partial_subset(
task_ids_or_regex=[
"generate_input",
"mapped",
"validate_output",
]
)
dag_run = reproducible_subset.test() # this run will fail
assert dag_run.get_state() == DagRunState.SUCCESS
# tear down
resetdb(skip_init=True)
local_postgres_helpers.stop_and_clear_on_disk_postgresql_database(temp_dir)
[2024-07-31T12:01:54.270-0400] {environment.py:213} INFO - Test environment, proceeding.
[2024-07-31T12:01:54.274-0400] {environment.py:213} INFO - Test environment, proceeding.
Created database `recidiviz_test_db0` on postgres instance bound to port 54300
[2024-07-31T12:01:55.179-0400] {environment.py:213} INFO - Test environment, proceeding.
To query data, connect with `psql postgresql://recidiviz_test_usr@localhost:54300/recidiviz_test_db0`
[2024-07-31T12:01:55.180-0400] {environment.py:213} INFO - Test environment, proceeding.
[2024-07-31T12:01:55.309-0400] {migration.py:216} INFO - Context impl PostgresqlImpl.
[2024-07-31T12:01:55.309-0400] {migration.py:219} INFO - Will assume transactional DDL.
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.runtime.migration] Running stamp_revision -> 405de8318b3a
[2024-07-31 12:01:55,790] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='reproducible_dag' AIRFLOW_CTX_TASK_ID='generate_input' AIRFLOW_CTX_EXECUTION_DATE='2024-07-31T16:01:55.743454+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-07-31T16:01:55.743454+00:00'
INFO [airflow.task] Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='reproducible_dag' AIRFLOW_CTX_TASK_ID='generate_input' AIRFLOW_CTX_EXECUTION_DATE='2024-07-31T16:01:55.743454+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-07-31T16:01:55.743454+00:00'
[2024-07-31 12:01:55,791] {python.py:194} INFO - Done. Returned value was: [1, 2, 3, 4, 5, 6, 7]
INFO [airflow.task.operators] Done. Returned value was: [1, 2, 3, 4, 5, 6, 7]
[2024-07-31 12:01:55,795] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=reproducible_dag, task_id=generate_input, execution_date=20240731T160155, start_date=, end_date=20240731T160155
INFO [airflow.task] Marking task as SUCCESS. dag_id=reproducible_dag, task_id=generate_input, execution_date=20240731T160155, start_date=, end_date=20240731T160155
[2024-07-31 12:01:55,816] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
ERROR [airflow.task] Task failed with exception
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
[2024-07-31 12:01:55,818] {taskinstance.py:1983} ERROR - Unable to unmap task to determine if we need to send an alert email
ERROR [airflow.task] Unable to unmap task to determine if we need to send an alert email
[2024-07-31 12:01:55,818] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=reproducible_dag, task_id=mapped, map_index=0, execution_date=20240731T160155, start_date=, end_date=20240731T160155
INFO [airflow.task] Marking task as FAILED. dag_id=reproducible_dag, task_id=mapped, map_index=0, execution_date=20240731T160155, start_date=, end_date=20240731T160155
ERROR [airflow.models.dag.DAG] Task failed; ti=<TaskInstance: reproducible_dag.mapped manual__2024-07-31T16:01:55.743454+00:00 map_index=0 [failed]>
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/dag.py", line 2776, in test
ret = _run_task(ti, session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/dag.py", line 3938, in _run_task
ret = ti._run_raw_task(session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
[2024-07-31 12:01:55,825] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
ERROR [airflow.task] Task failed with exception
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
[2024-07-31 12:01:55,825] {taskinstance.py:1983} ERROR - Unable to unmap task to determine if we need to send an alert email
ERROR [airflow.task] Unable to unmap task to determine if we need to send an alert email
[2024-07-31 12:01:55,826] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=reproducible_dag, task_id=mapped, map_index=1, execution_date=20240731T160155, start_date=, end_date=20240731T160155
INFO [airflow.task] Marking task as FAILED. dag_id=reproducible_dag, task_id=mapped, map_index=1, execution_date=20240731T160155, start_date=, end_date=20240731T160155
ERROR [airflow.models.dag.DAG] Task failed; ti=<TaskInstance: reproducible_dag.mapped manual__2024-07-31T16:01:55.743454+00:00 map_index=1 [failed]>
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/dag.py", line 2776, in test
ret = _run_task(ti, session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/dag.py", line 3938, in _run_task
ret = ti._run_raw_task(session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
[2024-07-31 12:01:55,831] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
ERROR [airflow.task] Task failed with exception
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
[2024-07-31 12:01:55,832] {taskinstance.py:1983} ERROR - Unable to unmap task to determine if we need to send an alert email
ERROR [airflow.task] Unable to unmap task to determine if we need to send an alert email
[2024-07-31 12:01:55,832] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=reproducible_dag, task_id=mapped, map_index=2, execution_date=20240731T160155, start_date=, end_date=20240731T160155
INFO [airflow.task] Marking task as FAILED. dag_id=reproducible_dag, task_id=mapped, map_index=2, execution_date=20240731T160155, start_date=, end_date=20240731T160155
ERROR [airflow.models.dag.DAG] Task failed; ti=<TaskInstance: reproducible_dag.mapped manual__2024-07-31T16:01:55.743454+00:00 map_index=2 [failed]>
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/dag.py", line 2776, in test
ret = _run_task(ti, session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/dag.py", line 3938, in _run_task
ret = ti._run_raw_task(session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
[2024-07-31 12:01:55,838] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
ERROR [airflow.task] Task failed with exception
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
[2024-07-31 12:01:55,840] {taskinstance.py:1983} ERROR - Unable to unmap task to determine if we need to send an alert email
ERROR [airflow.task] Unable to unmap task to determine if we need to send an alert email
[2024-07-31 12:01:55,840] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=reproducible_dag, task_id=mapped, map_index=3, execution_date=20240731T160155, start_date=, end_date=20240731T160155
INFO [airflow.task] Marking task as FAILED. dag_id=reproducible_dag, task_id=mapped, map_index=3, execution_date=20240731T160155, start_date=, end_date=20240731T160155
ERROR [airflow.models.dag.DAG] Task failed; ti=<TaskInstance: reproducible_dag.mapped manual__2024-07-31T16:01:55.743454+00:00 map_index=3 [failed]>
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/dag.py", line 2776, in test
ret = _run_task(ti, session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/dag.py", line 3938, in _run_task
ret = ti._run_raw_task(session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
[2024-07-31 12:01:55,847] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
ERROR [airflow.task] Task failed with exception
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
[2024-07-31 12:01:55,848] {taskinstance.py:1983} ERROR - Unable to unmap task to determine if we need to send an alert email
ERROR [airflow.task] Unable to unmap task to determine if we need to send an alert email
[2024-07-31 12:01:55,848] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=reproducible_dag, task_id=mapped, map_index=4, execution_date=20240731T160155, start_date=, end_date=20240731T160155
INFO [airflow.task] Marking task as FAILED. dag_id=reproducible_dag, task_id=mapped, map_index=4, execution_date=20240731T160155, start_date=, end_date=20240731T160155
ERROR [airflow.models.dag.DAG] Task failed; ti=<TaskInstance: reproducible_dag.mapped manual__2024-07-31T16:01:55.743454+00:00 map_index=4 [failed]>
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/dag.py", line 2776, in test
ret = _run_task(ti, session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/dag.py", line 3938, in _run_task
ret = ti._run_raw_task(session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
[2024-07-31 12:01:55,853] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
ERROR [airflow.task] Task failed with exception
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
[2024-07-31 12:01:55,854] {taskinstance.py:1983} ERROR - Unable to unmap task to determine if we need to send an alert email
ERROR [airflow.task] Unable to unmap task to determine if we need to send an alert email
[2024-07-31 12:01:55,854] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=reproducible_dag, task_id=mapped, map_index=5, execution_date=20240731T160155, start_date=, end_date=20240731T160155
INFO [airflow.task] Marking task as FAILED. dag_id=reproducible_dag, task_id=mapped, map_index=5, execution_date=20240731T160155, start_date=, end_date=20240731T160155
ERROR [airflow.models.dag.DAG] Task failed; ti=<TaskInstance: reproducible_dag.mapped manual__2024-07-31T16:01:55.743454+00:00 map_index=5 [failed]>
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/dag.py", line 2776, in test
ret = _run_task(ti, session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/dag.py", line 3938, in _run_task
ret = ti._run_raw_task(session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
[2024-07-31 12:01:55,860] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
ERROR [airflow.task] Task failed with exception
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
[2024-07-31 12:01:55,862] {taskinstance.py:1983} ERROR - Unable to unmap task to determine if we need to send an alert email
ERROR [airflow.task] Unable to unmap task to determine if we need to send an alert email
[2024-07-31 12:01:55,862] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=reproducible_dag, task_id=mapped, map_index=6, execution_date=20240731T160155, start_date=, end_date=20240731T160155
INFO [airflow.task] Marking task as FAILED. dag_id=reproducible_dag, task_id=mapped, map_index=6, execution_date=20240731T160155, start_date=, end_date=20240731T160155
ERROR [airflow.models.dag.DAG] Task failed; ti=<TaskInstance: reproducible_dag.mapped manual__2024-07-31T16:01:55.743454+00:00 map_index=6 [failed]>
Traceback (most recent call last):
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/dag.py", line 2776, in test
ret = _run_task(ti, session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/dag.py", line 3938, in _run_task
ret = ti._run_raw_task(session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates
original_task.render_template_fields(context)
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oro/.local/share/virtualenvs/airflow-JBf5o6AR/lib/python3.11/site-packages/airflow/decorators/base.py", line 527, in _expand_mapped_kwargs
raise AssertionError(f"unexpected expand_input: {self.expand_input}")
AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
ERROR [airflow.models.dagrun.DagRun] Marking run <DagRun reproducible_dag @ 2024-07-31T16:01:55.743454+00:00: manual__2024-07-31T16:01:55.743454+00:00, state:running, queued_at: None. externally triggered: False> failed
Traceback (most recent call last):
File "/Users/oro/Library/Application Support/Code/User/globalStorage/buenon.scratchpads/scratchpads/1a4f2b73c916c9af3926773a674c6453/scratch107.py", line 74, in <module>
assert dag_run.get_state() == DagRunState.SUCCESS
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.7.3
What happened?
when running a dag created with
partial_subset
, theid
of sentinel values are not properly maintained for decorated python mapped tasks, specificallyEXPAND_INPUT_EMPTY
that is set as the default value forexpand_input
for any mapped@task
. this causes all calls to the mapped, decorated tasks to fail with following error:AssertionError: unexpected expand_input: DictOfListsExpandInput(value={})
raises here as the comparison is done usingis not
; i have added a minimally reproducible example below.in debugging, adding
EXPAND_INPUT_EMPTY
to deepcopy'smemo
arg fixed the issue within partial_subset likeas I imagine the comparison to a sentinel value by
id
instead of by value will want to continue. If that is the desired path forward, I can open a PR to that effect.What you think should happen instead?
the mapped task should run without failure.
How to reproduce
for
local_postgres_helpers
code see, pulse-datatask failure trace
Operating System
macOS::14.1
Versions of Apache Airflow Providers
Deployment
Google Cloud Composer
Deployment details
pipfile for env, drawn from https://cloud.google.com/composer/docs/concepts/versioning/composer-versions for
composer-2.7.1-airflow-2.7.3
Anything else?
here are full logs from failed dag run
Are you willing to submit PR?
Code of Conduct