apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.11k stars 14.3k forks source link

mini-scheduler raises AttributeError: 'NoneType' object has no attribute 'keys' #24525

Closed jochenott closed 2 years ago

jochenott commented 2 years ago

Apache Airflow version

2.3.2 (latest released)

What happened

The mini-scheduler run after a task finishes sometimes fails with an error "AttributeError: 'NoneType' object has no attribute 'keys'"; see full traceback below.

What you think should happen instead

No response

How to reproduce

The minimal reproducing example I could find is this:

import pendulum
from airflow.models import BaseOperator
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task
from airflow import DAG

@task
def task0():
    pass

class Op0(BaseOperator):
    template_fields = ["some_input"]

    def __init__(self, some_input, **kwargs):
        super().__init__(**kwargs)
        self.some_input = some_input

if __name__ == "__main__":
    with DAG("dag0", start_date=pendulum.now()) as dag:
        with TaskGroup(group_id="tg1"):
            Op0(task_id="task1", some_input=task0())
    dag.partial_subset("tg1.task1")

Running this script with airflow 2.3.2 produces this traceback:

Traceback (most recent call last):
  File "/app/airflow-bug-minimal.py", line 22, in <module>
    dag.partial_subset("tg1.task1")
  File "/venv/lib/python3.10/site-packages/airflow/models/dag.py", line 2013, in partial_subset
    dag.task_dict = {
  File "/venv/lib/python3.10/site-packages/airflow/models/dag.py", line 2014, in <dictcomp>
    t.task_id: _deepcopy_task(t)
  File "/venv/lib/python3.10/site-packages/airflow/models/dag.py", line 2011, in _deepcopy_task
    return copy.deepcopy(t, memo)
  File "/usr/local/lib/python3.10/copy.py", line 153, in deepcopy
    y = copier(memo)
  File "/venv/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 1156, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/venv/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 1000, in __setattr__
    self.set_xcomargs_dependencies()
  File "/venv/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 1107, in set_xcomargs_dependencies
    XComArg.apply_upstream_relationship(self, arg)
  File "/venv/lib/python3.10/site-packages/airflow/models/xcom_arg.py", line 186, in apply_upstream_relationship
    op.set_upstream(ref.operator)
  File "/venv/lib/python3.10/site-packages/airflow/models/taskmixin.py", line 241, in set_upstream
    self._set_relatives(task_or_task_list, upstream=True, edge_modifier=edge_modifier)
  File "/venv/lib/python3.10/site-packages/airflow/models/taskmixin.py", line 185, in _set_relatives
    dags: Set["DAG"] = {task.dag for task in [*self.roots, *task_list] if task.has_dag() and task.dag}
  File "/venv/lib/python3.10/site-packages/airflow/models/taskmixin.py", line 185, in <setcomp>
    dags: Set["DAG"] = {task.dag for task in [*self.roots, *task_list] if task.has_dag() and task.dag}
  File "/venv/lib/python3.10/site-packages/airflow/models/dag.py", line 508, in __hash__
    val = tuple(self.task_dict.keys())
AttributeError: 'NoneType' object has no attribute 'keys'

Note that the call to dag.partial_subset usually happens in the mini-scheduler: https://github.com/apache/airflow/blob/2.3.2/airflow/jobs/local_task_job.py#L253

Operating System

Linux (Debian 9)

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 2 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

potiuk commented 2 years ago

Isn't that the same class of problem as with #23838 @pingzh @ashb @uranusjr ?

uranusjr commented 2 years ago

I don’t think it’s the same. The final error looks similar, but this one is triggered by task_dict, which is on the DAG object, not Operator. The DAG structure is pretty stable and mostly unchanged for a long while afaik, so this is intriging.

c-thiel commented 2 years ago

We are having the same issue as well with the KubernetesExecutor:

Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/__main__.py", line 38, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 51, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/cli.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 376, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 182, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 240, in _run_task_by_local_task_job
    run_job.run()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 244, in run
    self._execute()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/local_task_job.py", line 133, in _execute
    self.handle_task_exit(return_code)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/local_task_job.py", line 171, in handle_task_exit
    self._run_mini_scheduler_on_child_tasks()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 71, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/local_task_job.py", line 253, in _run_mini_scheduler_on_child_tasks
    partial_dag = task.dag.partial_subset(
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line 2013, in partial_subset
    dag.task_dict = {
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line 2014, in <dictcomp>
    t.task_id: _deepcopy_task(t)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line 2011, in _deepcopy_task
    return copy.deepcopy(t, memo)
  File "/usr/local/lib/python3.9/copy.py", line 153, in deepcopy
    y = copier(memo)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 1156, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/local/lib/python3.9/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.9/copy.py", line 230, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.9/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.9/copy.py", line 230, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.9/copy.py", line 172, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/lib/python3.9/copy.py", line 270, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/lib/python3.9/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.9/copy.py", line 230, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.9/copy.py", line 153, in deepcopy
    y = copier(memo)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 1156, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 1000, in __setattr__
    self.set_xcomargs_dependencies()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 1107, in set_xcomargs_dependencies
    XComArg.apply_upstream_relationship(self, arg)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/xcom_arg.py", line 186, in apply_upstream_relationship
    op.set_upstream(ref.operator)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskmixin.py", line 241, in set_upstream
    self._set_relatives(task_or_task_list, upstream=True, edge_modifier=edge_modifier)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskmixin.py", line 185, in _set_relatives
    dags: Set["DAG"] = {task.dag for task in [*self.roots, *task_list] if task.has_dag() and task.dag}
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskmixin.py", line 185, in <setcomp>
    dags: Set["DAG"] = {task.dag for task in [*self.roots, *task_list] if task.has_dag() and task.dag}
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line 508, in __hash__
    val = tuple(self.task_dict.keys())
AttributeError: 'NoneType' object has no attribute 'keys'

Note that we have dag.partial_subset here as well.

As this is no longer only affecting the Mini-Scheduler (if I am right and the issue is related), is this somehting we should schedule for 2.3.X instead of 2.4?

uranusjr commented 2 years ago

Thinking about it, this is likely related to AIP-45 in the same way it affected task mapping. Now that the mini-scheduler runs against serialised DAGs (instead of actual DAGs from Python code), something in the serialisation-copy-etc. chain likely went wrong and populated task_dict incorrectly.

ashb commented 2 years ago

I've just run a git bisect, and unsurprisingly the issue at fault was ##19965 (which was in 2.3.0)

ayotomiwasalau commented 1 year ago

Hello, what do I do to solve this error "AttributeError: 'NoneType' object has no attribute 'keys'"? I use Airflow 2.3.0