apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.51k stars 14.14k forks source link

Passing a DagRun to a PythonVirtualenvOperator gives NameError: name 'timedelta' is not defined #35483

Closed Felix-neko closed 6 months ago

Felix-neko commented 10 months ago

Apache Airflow version

2.7.3

What happened

I have a simple DAG (with render_template_as_native_obj option enabled). I'm trying to pass a {{ dag_run }} to a PythonVirtualenvOperator, but when it starts to execute it fails with NameError: name 'timedelta' is not defined error.

I'm trying to run it on Python 3.11, airflow==2.7.3 and dill==0.3.7.

Here's my DAG:

import datetime
from pathlib import Path
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
import dill

dag = DAG(
    dag_id='strange_pickling_error_dag',
    schedule_interval='0 5 * * 1',
    start_date=datetime.datetime(2020, 1, 1),
    catchup=False,
    render_template_as_native_obj=True,
)

context = {"ts": "{{ ts }}", "dag_run": "{{ dag_run }}"}
op_args = [context, Path(__file__).parent.absolute()]

def make_foo(*args, **kwargs):
    print("---> making foo!")
    print("make foo(...): args")
    print(args)
    print("make foo(...): kwargs")
    print(kwargs)

make_foo_task = PythonVirtualenvOperator(
    task_id='make_foo',
    python_callable=make_foo,
    use_dill=True,
    system_site_packages=False,
    op_args=op_args,
    requirements=[f"dill=={dill.__version__}", f"apache-airflow=={airflow.__version__}"],
    dag=dag)

And here's my error:

[2023-11-06, 18:34:16 UTC] {process_utils.py:182} INFO - Executing cmd: /tmp/venvp8tw4qco/bin/python /tmp/venvp8tw4qco/script.py /tmp/venvp8tw4qco/script.in /tmp/venvp8tw4qco/script.out /tmp/venvp8tw4qco/string_args.txt /tmp/venvp8tw4qco/termination.log
[2023-11-06, 18:34:16 UTC] {process_utils.py:186} INFO - Output:
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO - Traceback (most recent call last):
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -   File "/tmp/venvp8tw4qco/script.py", line 17, in <module>
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -     arg_dict = dill.load(file)
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -                ^^^^^^^^^^^^^^^
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -   File "/tmp/venvp8tw4qco/lib/python3.11/site-packages/dill/_dill.py", line 287, in load
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -     return Unpickler(file, ignore=ignore, **kwds).load()
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -   File "/tmp/venvp8tw4qco/lib/python3.11/site-packages/dill/_dill.py", line 442, in load
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -     obj = StockUnpickler.load(self)
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -           ^^^^^^^^^^^^^^^^^^^^^^^^^
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -   File "/home/felix/Projects/fs_etl/venv_py3.11/lib/python3.11/site-packages/pendulum/tz/timezone.py", line 312, in __init__
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -     self._utcoffset = timedelta(seconds=offset)
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -                       ^^^^^^^^^
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO - NameError: name 'timedelta' is not defined
[2023-11-06, 18:34:17 UTC] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/felix/Projects/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 395, in execute
    return super().execute(context=serializable_context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/felix/Projects/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 192, in execute
    return_value = self.execute_callable()
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/felix/Projects/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 609, in execute_callable
    result = self._execute_python_callable_in_subprocess(python_path, tmp_path)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/felix/Projects/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 446, in _execute_python_callable_in_subprocess
    execute_in_subprocess(
  File "/home/felix/Projects/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/utils/process_utils.py", line 171, in execute_in_subprocess
    execute_in_subprocess_with_kwargs(cmd, cwd=cwd)
  File "/home/felix/Projects/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/utils/process_utils.py", line 194, in execute_in_subprocess_with_kwargs
    raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/tmp/venvp8tw4qco/bin/python', '/tmp/venvp8tw4qco/script.py', '/tmp/venvp8tw4qco/script.in', '/tmp/venvp8tw4qco/script.out', '/tmp/venvp8tw4qco/string_args.txt', '/tmp/venvp8tw4qco/termination.log']' returned non-zero exit status 1.

What you think should happen instead

I think that this DagRun variable should be correctly unpickled.

How to reproduce

I'm running airflow with simple airflow standalone command in my virtualenv shell. I'm using the default SequentialExecutor.

Operating System

Ubuntu 22.04

Versions of Apache Airflow Providers

apache-airflow-providers-common-sql==1.8.0
apache-airflow-providers-ftp==3.6.0
apache-airflow-providers-http==4.6.0
apache-airflow-providers-imap==3.4.0
apache-airflow-providers-sqlite==3.5.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

I'm using a Python 3.11 virtualenv. Here's my pip3 freeze:

aiohttp==3.8.6
aiosignal==1.3.1
alembic==1.12.1
annotated-types==0.6.0
anyio==4.0.0
apache-airflow==2.7.3
apache-airflow-providers-common-sql==1.8.0
apache-airflow-providers-ftp==3.6.0
apache-airflow-providers-http==4.6.0
apache-airflow-providers-imap==3.4.0
apache-airflow-providers-sqlite==3.5.0
apispec==6.3.0
argcomplete==3.1.4
asgiref==3.7.2
async-timeout==4.0.3
attrs==23.1.0
Babel==2.13.1
backoff==2.2.1
blinker==1.7.0
cachelib==0.9.0
cattrs==23.1.2
certifi==2023.7.22
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
clickclick==20.10.2
colorama==0.4.6
colorlog==4.8.0
ConfigUpdater==3.1.1
connexion==2.14.2
cron-descriptor==1.4.0
croniter==2.0.1
cryptography==41.0.5
Deprecated==1.2.14
dill==0.3.7
distlib==0.3.7
dnspython==2.4.2
docutils==0.20.1
email-validator==1.3.1
filelock==3.13.1
Flask==2.2.5
Flask-AppBuilder==4.3.6
Flask-Babel==2.0.0
Flask-Caching==2.1.0
Flask-JWT-Extended==4.5.3
Flask-Limiter==3.5.0
Flask-Login==0.6.3
Flask-Session==0.5.0
Flask-SQLAlchemy==2.5.1
Flask-WTF==1.2.1
frozenlist==1.4.0
google-re2==1.1
googleapis-common-protos==1.61.0
graphviz==0.20.1
greenlet==3.0.1
grpcio==1.59.2
gunicorn==21.2.0
h11==0.14.0
httpcore==1.0.1
httpx==0.25.1
idna==3.4
importlib-metadata==6.8.0
importlib-resources==6.1.0
inflection==0.5.1
itsdangerous==2.1.2
Jinja2==3.1.2
jsonschema==4.19.2
jsonschema-specifications==2023.7.1
lazy-object-proxy==1.9.0
limits==3.6.0
linkify-it-py==2.0.2
lockfile==0.12.2
Mako==1.2.4
Markdown==3.5.1
markdown-it-py==3.0.0
MarkupSafe==2.1.3
marshmallow==3.20.1
marshmallow-oneofschema==3.0.1
marshmallow-sqlalchemy==0.26.1
mdit-py-plugins==0.4.0
mdurl==0.1.2
multidict==6.0.4
numpy==1.26.1
opentelemetry-api==1.20.0
opentelemetry-exporter-otlp==1.20.0
opentelemetry-exporter-otlp-proto-common==1.20.0
opentelemetry-exporter-otlp-proto-grpc==1.20.0
opentelemetry-exporter-otlp-proto-http==1.20.0
opentelemetry-proto==1.20.0
opentelemetry-sdk==1.20.0
opentelemetry-semantic-conventions==0.41b0
ordered-set==4.1.0
packaging==23.2
pathspec==0.11.2
pendulum==2.1.2
platformdirs==3.11.0
pluggy==1.3.0
prison==0.2.1
protobuf==4.25.0
psutil==5.9.6
pycparser==2.21
pydantic==2.4.2
pydantic_core==2.10.1
Pygments==2.16.1
PyJWT==2.8.0
python-daemon==3.0.1
python-dateutil==2.8.2
python-nvd3==0.15.0
python-slugify==8.0.1
pytz==2023.3.post1
pytzdata==2020.1
PyYAML==6.0.1
referencing==0.30.2
requests==2.31.0
requests-toolbelt==1.0.0
rfc3339-validator==0.1.4
rich==13.6.0
rich-argparse==1.4.0
rpds-py==0.12.0
setproctitle==1.3.3
six==1.16.0
sniffio==1.3.0
SQLAlchemy==1.4.50
SQLAlchemy-JSONField==1.0.1.post0
SQLAlchemy-Utils==0.41.1
sqlparse==0.4.4
tabulate==0.9.0
tenacity==8.2.3
termcolor==2.3.0
text-unidecode==1.3
typing_extensions==4.8.0
uc-micro-py==1.0.2
unicodecsv==0.14.1
urllib3==2.0.7
virtualenv==20.24.6
Werkzeug==2.2.3
wrapt==1.15.0
WTForms==3.0.1
yarl==1.9.2
zipp==3.17.0

Anything else

No response

Are you willing to submit PR?

Code of Conduct

Felix-neko commented 10 months ago

UPD:

If I simply add this two lines

serialized_dag = dill.dumps(dag)
dill.loads(serialized_dag)

to the end of my DAG's code and execute it in PyCharm (on the save virtualenv on which my airflow executor runs) i get the same NameError: name 'timedelta' is not defined

Taragolis commented 10 months ago

I assume that is might be the same reason as described here: https://github.com/apache/airflow/issues/35307

jscheffl commented 10 months ago

Good catch @Taragolis :-D I assume the same when I read this.

@Felix-neko Is there a specific reason to pass the full DagRun object into the context? Would it help as workaround to only pass in specific details of the full DagRun? Which details would you need in the DagRun for the execution? (Just thinking about a workaround to un-block you)

Felix-neko commented 10 months ago

@Felix-neko Is there a specific reason to pass the full DagRun object into the context? Would it help as workaround to only pass in specific details of the full DagRun? Which details would you need in the DagRun for the execution? (Just thinking about a workaround to un-block you)

@jscheffl : аlas, yes: we have such reasons. We have many DAGs that use DagRun, Dag and TaskInstance instance as objects, extracting data from them inside operators.

And we have a homemade library that uses airflow and also extracts data from DagRun and TaskInstance objects inside operators. This feature is really helpful for us.

Taragolis commented 7 months ago

Hey, @Felix-neko a lot of time has passed since last activity on this issue. Any chance that you have check it on Airflow 2.8.1 with pendulum 3?

github-actions[bot] commented 6 months ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] commented 6 months ago

This issue has been closed because it has not received response from the issue author.