apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.14k stars 14.03k forks source link

`DateTimeSensor()` does not work with `datetime`s rendered via `render_template_as_native_obj=True` #38336

Open dwreeves opened 5 months ago

dwreeves commented 5 months ago

Apache Airflow version

2.8.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Running the following code produces an error:

with DAG(
        dag_id="test_date_time_sensor",
        schedule="@monthly",
        start_date=datetime(2024, 1, 1),
        render_template_as_native_obj=True
) as dag:

    wait = DateTimeSensorAsync(
        task_id="wait",
        target_time="{{ (data_interval_end + macros.timedelta(minutes=1)) }}"
    )

Here's the full traceback:

  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/sensors/date_time.py", line 94, in execute
    trigger = DateTimeTrigger(moment=timezone.parse(self.target_time))
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/timezone.py", line 213, in parse
    return pendulum.parse(string, tz=timezone or TIMEZONE, strict=strict)  # type: ignore
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pendulum/parser.py", line 30, in parse
    return _parse(text, **options)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pendulum/parser.py", line 43, in _parse
    parsed = base_parse(text, **options)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pendulum/parsing/__init__.py", line 78, in parse
    return _normalize(_parse(text, **_options), **_options)
                      ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pendulum/parsing/__init__.py", line 113, in _parse
    return parse_iso8601(text)
           ^^^^^^^^^^^^^^^^^^^
TypeError: argument 'input': 'DateTime' object cannot be converted to 'PyString'

The way to fix it is to add a | string filter to the Jinja:

target_time="{{ (data_interval_end + macros.timedelta(minutes=1)) | string }}"

What you think should happen instead?

The operator should handle typing for the timestamp in execute() instead of in __init__, so that runtime conversions can be properly handled.

How to reproduce

See code example above.

Operating System

Not relevant

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

Deployment is not relevant.

Anything else?

This can also probably be removed this the code for DateTimeSensorAsync while I/we are at it:

    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)

Are you willing to submit PR?

Code of Conduct

pierregillet commented 1 month ago

I encountered the same error, exactly the same use case (wait some time after data_interval_end before continuing). Thanks for the temporary fix!