apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.42k stars 14.11k forks source link

Type hinted `@task` decorated functions to create typed XComs #19884

Open MatrixManAtYrService opened 2 years ago

MatrixManAtYrService commented 2 years ago

Description

I wish that Airflow would look at type hints on @task decorated functions to determine:

  1. Do the hinted types provide serialize/deserialize functions (like Foo)? or are they callable (like int)?
class Foo:
    ....
    def to_json(self) -> str:
        pass 

@task
def do_thing(some_num: int, some_obj: Foo) -> Bar:
    pass
  1. Do upstream tasks write return_values to XCom that conflict with the parameter types of downstream tasks?

If 1 I'd like Airflow to initialize the desired type for me If 2 I'd like Airflow to warn me about the type conflicts at parse time

Use case/motivation

I usually don't find it to be burdensome to manipulate jumbles of Tuples/Dicts/Lists. Because of this, I don't write a lot of classes.

But I've been using the Taskflow API lately, and there's something about working with it that makes me want to type-hint everything that becomes an XComArg. Maybe the part of my brain that used to keep track of the Tuple/Dict/List soup is now keeping track of whether this is task-code or dag-definition-code, it's hard to say.

Whatever the reason, this has lead me to write dags that look something like this:

from typing import List
from dataclasses import dataclass
from dataclasses_json import dataclass_json

@dataclass_json
@dataclass
class Foo:
    bar: str

@dataclass_json
@dataclass
class Baz:
    foos: List[Foo]

@task
def get_baz() -> Baz:
    foos = [Foo(x) for x in ["wakka", "bang"]]
    return Baz(foos).to_dict(). # could be inferred

@task
def whats_a_baz(_baz: Baz): # could warn on type mismatch
    baz = Baz.from_dict(_baz)  # could be inferred
    print(baz)

@task
def whats_are_bazzes(_bazzes: List[Baz]):  # could warn on type mismatch
    bazzes = [Baz.from_dict(x) for x in _bazzes]. # could be inferred
    print(bazzes)

with DAG(
    dag_id="request",
    schedule_interval=None,
    start_date=days_ago(1),
) as dag:

    one_baz = get_baz()
    two_baz = get_baz()

    whats_a_baz(one_baz)
    # Baz(foos=[Foo(bar='wakka'), Foo(bar='bang')])

    whats_are_bazzes([one_baz, two_baz])
    # [Baz(foos=[Foo(bar='wakka'), Foo(bar='bang')]), Baz(foos=[Foo(bar='wakka'), Foo(bar='bang')])]

I like this because if I'm wrong about the shape of my data in an early task, I notice it when that task fails to convert the data into custom objects. Without these conversions, mistakes show up when they cause problems downstream, not where they were introduced.

I dislike this because all of those to/from calls are ugly and easy to get wrong.

This raised two questions:

  1. Since the hinted types have from_json() and to_json() functions, could airflow handle the conversions for me?
  2. Airflow knows which XComs are generated as task outputs, and which are later used as task inputs--so could it inform me of task I/O type conflicts at parse time?

If so, I'd be able to iterate faster since a whole category of bug would be catchable in a tighter debug loop (i.e. before even running the task).

I realize that this is a nontrivial change. Thanks for at least considering it.

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

uranusjr commented 2 years ago

This may be possible in the future, but I wouldn’t recommend doing this right now. Python core devs’ official stance on type hints, at least at the current time, is that they are merely hints and should not be relied on by runtime logic. The last people that do this kind of thing extensively—Pydantic and FastAPI—caused a big drama when PEP 563 was almost released in 3.10 and needed to be reverted at the last minute. I have a lot of thoughts on all the things related to this, but the short version is I would hate Airflow to become such projects. At the very leat, we should wait until the runtime aspects of type hints are discussed, decided on, and implemented, before making an effort toward related features.

hterik commented 2 years ago

I don't think you have to rely on runtime type hints to provide such feature.

As long as the types passed between tasks provide some kind of from_json and to_json functions, you can rely on normal mypy to provide the validation of whats_a_baz(get_baz()) having correct return value vs input argument. Right now this is constrained by only allowing scalars or dicts to be returned from tasks. Then inside airflow internals, the serialization (eg the # could be inferred" in example above) should be possible to do runtime without the type hints, by calling the from_json/to_json if the returned value has such functions.

potiuk commented 2 years ago

I am with @uranusjr on that one. I think there are a number of cases that we did not realise it might cause if we validate XCom structure at runtime. Python value is that while simple thing are nice, you can (if you want) tap into the power of metaclasses, dynamic attributes and the likes and even if we add runtime warnings, we are limiting ourselves to just the "obvious" cases. I can very easily imagine a case where an implementation of Operator would push to Xcom an object with different internal structure but dynamic get_atrs() that would make it works with specific structure. This is not verifiable at runtime almost by definition.

However I think validating cross-operator xcoms type hints in such situation is possible (for example with PEP 484 and stub files). And since DAGs are Python, we should be able to simply (when we implement it) run mypy on the DAGs and there type hints should help DAG writer to develop the DAG.

MatrixManAtYrService commented 2 years ago

Thanks for your thoughts on this. I didn't realize that runtime logic based on type hints was frowned upon, but seeing as it's a newer feature I understand the desire to proceed with caution.

@hterik I'm trying to visualize your strategy, specifically when an xcom_pull happens--without relying on the type hint how do we know where to look for those functions? Do we requireENABLE_XCOM_PICKLING and look on the object itself, or do we expect a special field in the json that says "here's my class, go look there for conversion functions"?


re: type checking, this sounds nice:

we should be able to simply (when we implement it) run mypy on the DAGs and there type hints should help DAG writer to develop the DAG.

From my naive point view it seems like that would only require something like this around the decorators, but it looks like we're already doing something similar.

Could it be that all we need is minor tweaks to how we already handle hinting around decorators, or would it be more invasive than what I'm thinking? If it's just the former, I might take a shot at it.

hterik commented 2 years ago

@MatrixManAtYrService Sorry, i didn't think of the de-serialization scenario. It's only the to_json that would be as simple as i imagined at first. Deserializing would need either the serialized data to contain the class name itself, which can be a security risk, or registering a list of valid deserializers, or relying on the type hints.

One more suggestion is to add the type information in the @task-annotation, something along the lines of @task(arg_types=Baz) could work.