apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.27k stars 14.34k forks source link

Cannot update non-DAGs packages imported from DAGs using git-sync #39203

Open NBardelot opened 7 months ago

NBardelot commented 7 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.3

What happened?

We use a structure of git submodules synchronized by git-sync sidecars as per the documentation of Typical Structure of Packages.

Some git submodules are containing DAGs files. Some other git submodules are containing utility libraries imported from the DAGs. The directory containing the utility libraries is configured in the PYTHONPATH to be available to DAGs. At startup everything goes well.

But we have an issue when a new module is added, and a DAG is modified to import this new module (both modification pushed to git, with submodules updates, and the git-sync sidecar synchronizing the files). Then, the DAG Processor component of Airflow starts to reprocess the DAG Bag but it seems like the cache of importlib is not invalidated, and the new module is not found.

We have such logs in the DAG Processor, in the import errors DB table, and thus in the UI:

ModuleNotFoundError: No module named 'ournewlib`

Note that this is not an issue with the PYTHONPATH, as restarting the dag-processor and worker containers fixes the error.

What you think should happen instead?

The documentation of importlib mentions that invalidate_caches() might be used:

If you are dynamically importing a module that was created since the interpreter began execution (e.g., created a Python source file), you may need to call invalidate_caches() in order for the new module to be noticed by the import system.

It seems like the airflow processes should call invalidate_caches() when the repo linked git ref changes (meaning the content of the code might have changed and should be reprocessed with fresh imports).

How to reproduce

Operating System

Kubernetes

Versions of Apache Airflow Providers

apache-airflow[celery,kubernetes,statsd,password,ldap,otel]

apache-airflow-providers-amazon
apache-airflow-providers-common-sql
apache-airflow-providers-elasticsearch
apache-airflow-providers-hashicorp
apache-airflow-providers-http
apache-airflow-providers-microsoft-winrm
apache-airflow-providers-microsoft-azure
apache-airflow-providers-opsgenie
apache-airflow-providers-postgres
apache-airflow-providers-redis
apache-airflow-providers-sftp
apache-airflow-providers-smtp
apache-airflow-providers-ssh

And using the constraints file from Apache Airflow for Python 3.10 and version 2.8.3.

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

NBardelot commented 7 months ago

See also #118 (and especially this reply).

NBardelot commented 7 months ago

After further analysis, we think it can be a race condition as per the importlib documentation of FileFinder.

The finder will cache the directory contents as necessary, making stat calls for each module search to verify the cache is not outdated. Because cache staleness relies upon the granularity of the operating system’s state information of the file system, there is a potential race condition of searching for a module, creating a new file, and then searching for the module the new file represents. If the operations happen fast enough to fit within the granularity of stat calls, then the module search will fail. To prevent this from happening, when you create a module dynamically, make sure to call importlib.invalidate_caches().

Here is the status of our cache:

$ python
Python 3.10.13 (main, Mar 12 2024, 12:22:40) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import sys
>>> sys.path_importer_cache
{'/opt/airflow/dags/repo/src/libs': FileFinder('/opt/airflow/dags/repo/src/libs'), '/usr/local/lib/python310.zip': None, '/usr/local/lib/python3.10': FileFinder('/usr/local/lib/python3.10'), '/usr/local/lib/python3.10/encodings': FileFinder('/usr/local/lib/python3.10/encodings'), '/usr/local/lib/python3.10/importlib': FileFinder('/usr/local/lib/python3.10/importlib'), '/home/airflow/.local/lib/python3.10/site-packages': FileFinder('/home/airflow/.local/lib/python3.10/site-packages'), '/usr/local/lib/python3.10/lib-dynload': FileFinder('/usr/local/lib/python3.10/lib-dynload'), '/usr/local/lib/python3.10/site-packages': FileFinder('/usr/local/lib/python3.10/site-packages'), '/opt/airflow': FileFinder('/opt/airflow'), '/usr/local/lib/python3.10/collections': FileFinder('/usr/local/lib/python3.10/collections')}
>>>

... where /opt/airflow/dags/repo/src/libs contains our utility modules. As git-sync synchronizes rapidly both the DAG and the imported module, the race condition mentionned in importlib might be the cause of our issue. Here again the documentation mention the use of importlib.invalidate_caches() as the correct way to prevent the issue.

potiuk commented 7 months ago

I would be surprised invalidate_caches is a solution. When DAG is parsed in Airflow, it is always parsed in a newly parsed fork of the parent process and any imports done inside the fork are lost together with the fork when parsing completes.

But Maybe that's a side effect of https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#parsing-pre-import-modules. This configuration (True by default starting from 2.6.0) will pre-import "airflow" module in the parent process before fork happens. And it might well be that your local settings imported together will pull other imports when "airflow" module is imported.

An easy way to check it, is to set the flag to false. I'd recommend you to do it. But then if your hypothesis is right, you should also be able to test it by adding importlib.invalidate_caches() as the first thing in your DAG files, right? Can you run such tests?