apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.55k stars 14.15k forks source link

Airflow assume task context is serialized with Pydantic models #42485

Open wolfier opened 1 week ago

wolfier commented 1 week ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.10.2

What happened?

The triggerer terminates when attempting to deserialize the task context dictionary.

[2024-09-25T13:25:57.492-0500] {triggerer_job_runner.py:338} INFO - Starting the triggerer
[2024-09-25T13:25:57.629-0500] {triggerer_job_runner.py:348} ERROR - Exception when executing TriggererJobRunner._run_trigger_loop
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 346, in _execute
    self._run_trigger_loop()
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 374, in _run_trigger_loop
    self.load_triggers()
  File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py", line 58, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 400, in load_triggers
    self.trigger_runner.update_triggers(set(ids))
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 719, in update_triggers
    new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)
                                           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/trigger.py", line 94, in kwargs
    return self._decrypt_kwargs(self.encrypted_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/trigger.py", line 130, in _decrypt_kwargs
    return BaseSerialization.deserialize(decrypted_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py", line 831, in deserialize
    return {k: cls.deserialize(v, use_pydantic_models) for k, v in var.items()}
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py", line 821, in deserialize
    d[k] = cls.deserialize(v, use_pydantic_models=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py", line 803, in deserialize
    raise RuntimeError(
RuntimeError: Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. This parameter will be removed eventually when new serialization is used by AIP-44

The issue started appearing when I upgraded from Airflow 2.9.3 to Airflow 2.10.2.

Do note that I have a custom trigger where I am serializing the task context.

class CustomTrigger(BaseTrigger):
...
    def serialize(self) -> tuple[str, dict[str, Any]]:
        """
        Serializes the trigger's arguments and classpath.

        :return: A tuple containing the classpath and a dictionary of arguments.
        """
        return (
            "CustomTrigger",
            {
                "context": self.context,
            },
        )

What you think should happen instead?

I believe the deserialize operation should not be forcing use_pydantic_models to be true.

Instead, it should be using the value passed as a parameter.

                d[k] = cls.deserialize(v, use_pydantic_models)

Also, when the task context is being serialized, it is respecting the value passed to the serialized function.

How to reproduce

  1. Create a custom Trigger that serializes the task context
  2. Create a deferrable operator that uses the custom trigger
  3. Run the task

Operating System

n/a

Versions of Apache Airflow Providers

No response

Deployment

Astronomer

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

gopidesupavan commented 20 hours ago

Currently context is not supported in trigger's, if you need any values from the context object , extract and send as key-->values.

potiuk commented 9 hours ago

Yeah. We might want to handle it better - so better error message should be printed in this case. Marked it as good-first-issue.