apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.64k stars 14.18k forks source link

"RuntimeError: dictionary changed size during iteration" happens in scheduler after running for a few hours #23609

Closed kaojunsong closed 2 years ago

kaojunsong commented 2 years ago

Apache Airflow version

2.2.5

What happened

The following error happens in the arflow scheduler:

[2022-05-09 11:22:44,122] [INFO] base_executor.py:85 - Adding to queue: ['airflow', 'tasks', 'run', 'stress-ingest', 'ingest', 'manual__2022-05-09T10:31:12.256131+00:00', '--local', '--subdir', 'DAGS_FOLDER/stress_ingest.py']
Exception in thread Thread-7940:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.9/concurrent/futures/process.py", line 317, in run
    result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
  File "/usr/local/lib/python3.9/concurrent/futures/process.py", line 376, in wait_result_broken_or_wakeup
    worker_sentinels = [p.sentinel for p in self.processes.values()]
  File "/usr/local/lib/python3.9/concurrent/futures/process.py", line 376, in <listcomp>
    worker_sentinels = [p.sentinel for p in self.processes.values()]
RuntimeError: dictionary changed size during iteration

What you think should happen instead

No error happens

How to reproduce

I have created 2 DAGs, and DAG one will trigger DAG two thousands of times:

from datetime import datetime, timedelta
from distutils.command.config import config
from textwrap import dedent

from common.utils import httpclient

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import PythonOperator

DELEGATOR_END_POINT = 'http://space-common-delegator-clusteripsvc:9049/api/pipelines/dags/stress-ingest/dagRuns/'

def trigger(**kwargs):
    conf = kwargs.get('dag_run').conf
    loop_number=conf["loop"]
    print("Loop: "+str(loop_number))
    for i in range(loop_number):
      httpclient.post(DELEGATOR_END_POINT, conf)
      print("Trigger: "+str(i))

with DAG(
    'stress-test',
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        'depends_on_past': False,
        'email': ['songkaojun2017@gmail.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='A Stress Test DAG',
    start_date=datetime(2022, 1, 1),
    catchup=False,
    schedule_interval=None,
    tags=['stress','test'],
) as dag:

    trigger_via_delegator = PythonOperator(
        task_id='trigger_via_delegator',
        python_callable=trigger,
        dag=dag,
        do_xcom_push=True,
        retries=0
    )

    trigger_via_delegator
from datetime import datetime, timedelta
from distutils.command.config import config
from textwrap import dedent

from common.utils import httpclient

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import PythonOperator
from common.utils.httpclient import post, get
from common.constants import INGEST_PULL_END_POINT

def ingest(**kwargs):
    conf = kwargs.get('dag_run').conf
    body = conf['body']
    post(INGEST_PULL_END_POINT, body, 'stress test')

with DAG(
    'stress-ingest',
    default_args={
        'depends_on_past': False,
        'email': ['songkaojun2017@gmail.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='A Stress Test DAG',
    start_date=datetime(2022, 1, 1),
    catchup=False,
    schedule_interval=None,
    tags=['stress','test'],
) as dag:

    trigger_via_delegator = PythonOperator(
        task_id='ingest',
        python_callable=ingest,
        dag=dag,
        do_xcom_push=True,
        retries=0
    )

    trigger_via_delegator

Operating System

PRETTY_NAME="Debian GNU/Linux 9 (stretch)" NAME="Debian GNU/Linux" VERSION_ID="9" VERSION="9 (stretch)" VERSION_CODENAME=stretch ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/"

Versions of Apache Airflow Providers

apache-airflow[kubernetes,postgres,celery,redis,trino,ldap,elasticsearch,amazon,crypto,oracle,jdbc,microsoft-mssql]==2.2.5 \
     --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.5/constraints-3.9.txt"

Deployment

Other Docker-based deployment

Deployment details

We build airflow using the following Dockerfile:

FROM openjdk-python:jdk8-python3.9

# install deps
RUN apt-get update -y && apt-get install -y \
    libczmq-dev \
    python3-dev \
    python2.7-dev \
    libldap2-dev \
    libsasl2-dev \
    ldap-utils \
    tox \
    lcov \
    valgrind \
    libssl-dev \
    inetutils-telnet \
    bind9utils \
    gcc \
    alien \
    && apt-get clean

RUN pip3 install --upgrade pip

RUN pip3 install apache-airflow[kubernetes,postgres,celery,redis,trino,ldap,elasticsearch,amazon,crypto,oracle,jdbc,microsoft-mssql]==2.2.5 \
     --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.5/constraints-3.9.txt"
RUN pip3 install trino psycopg2-binary python-ldap matplotlib retrying authlib cx_Oracle

RUN wget https://download.oracle.com/otn_software/linux/instantclient/214000/oracle-instantclient-basic-21.4.0.0.0-1.el8.x86_64.rpm && \
    alien -i oracle-instantclient-basic-21.4.0.0.0-1.el8.x86_64.rpm

RUN apt-get install libaio1
ENV LD_LIBRARY_PATH /usr/lib/oracle/21/client64/lib/${LD_LIBRARY_PATH:+:$LD_LIBRARY_PATH}
ENV ORACLE_HOME /usr/lib/oracle/21/client64
ENV PATH $PATH:$ORACLE_HOME/bin

Then we run it using Kubernetes v1.19

Anything else

No response

Are you willing to submit PR?

Code of Conduct

tirkarthi commented 2 years ago

This seems to be a problem with Python itself. It seems to have been fixed in https://github.com/python/cpython/pull/24868

Python 3.9 backport : https://github.com/python/cpython/commit/3b9d886567c4fc6279c2198b6711f0590dbf3336

kaojunsong commented 2 years ago

@tirkarthi Thanks a lot, I will try to upgrade the Python version to 3.9.12.

potiuk commented 2 years ago

@kaojunsong - let us know if it fixed the problem (thanks @tirkarthi for the investigation!)

kaojunsong commented 2 years ago

@potiuk OK, sure.

kaojunsong commented 2 years ago

@potiuk @tirkarthi The airflow has been running for a week, and the error does not happen again. So I think the issue is solved. Thanks a lot for your help.

potiuk commented 2 years ago

Thanks @kaojunsong !