flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.42k stars 581 forks source link

[BUG] flytekit 1.6.0 task function calls do not type check #3682

Open tekumara opened 1 year ago

tekumara commented 1 year ago

Describe the bug

In https://github.com/flyteorg/flytekit/pull/1631 the task return type was changed to effectively Tuple[Promise] | Promise | VoidPromise | Tuple[Unknown, ...] | None.

This breaks usage of tasks in type checkers like pyright / vscode.

Expected behavior

Task function calls still type check correctly, as they did in flytekit 1.5.0

Additional context to reproduce

This type checks without errors in flytekit 1.5.0:

from typing import Dict, List, NamedTuple

import pandas as pd
from flytekit import task, workflow

class OpenFlightsData(NamedTuple):
    routes: List[Dict[str, str]]
    airlines: Dict[str, str]
    airports: Dict[str, Dict[str, str]]

@task()
def extract_reference_data() -> OpenFlightsData:
    return OpenFlightsData(
        routes=[{"MAO": "CIZ"}], airlines={"0B": "Blue Air"}, airports={"AAA": {"lat": "1", "lon": "2"}}
    )

@task()
def extract_live_data() -> pd.DataFrame:
    return pd.DataFrame()

@task()
def transform(raw_aircraft_data: pd.DataFrame, airlines: Dict[str, str]) -> pd.DataFrame:
    # do stuff ..
    return pd.DataFrame()

@workflow
def main() -> None:
    reference_data = extract_reference_data()
    reveal_type(reference_data)   # expecting OpenFlightsData
    live_data = extract_live_data()
    reveal_type(live_data)        # expecting Dataframe

    transform(raw_aircraft_data=live_data, airlines=reference_data.airlines)

if __name__ == "__main__":
    main()

With flytekit 1.6.0 and pyright / vscode:

❯ pyright test.py
test.py
  test.py:34:17 - information: Type of "reference_data" is "Tuple[Promise] | Promise | VoidPromise | Tuple[Unknown, ...] | None"
  test.py:36:17 - information: Type of "live_data" is "Tuple[Promise] | Promise | VoidPromise | Tuple[Unknown, ...] | None"
  test.py:38:68 - error: Cannot access member "airlines" for type "Tuple[Promise]"
    Member "airlines" is unknown (reportGeneralTypeIssues)
  test.py:38:68 - error: Cannot access member "airlines" for type "Promise"
    Member "airlines" is unknown (reportGeneralTypeIssues)
  test.py:38:68 - error: Cannot access member "airlines" for type "VoidPromise"
    Member "airlines" is unknown (reportGeneralTypeIssues)
  test.py:38:68 - error: Cannot access member "airlines" for type "Tuple[Unknown, ...]"
    Member "airlines" is unknown (reportGeneralTypeIssues)
  test.py:38:68 - error: "airlines" is not a known member of "None" (reportOptionalMemberAccess)
5 errors, 0 warnings, 2 informations

Screenshots

No response

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

dennisobrien commented 9 months ago

Verifying that this is still an issue with flytekit==1.10.2. Here's a simple example workflow:

from flytekit import task, workflow

@task
def task_get_a() -> int:
    return 1

@workflow
def my_workflow() -> int:
    a: int = task_get_a()
    return a

And here's the error from pyright:

$ pyright workflow_test.py
workflow_test.py
  workflow_test.py:10:14 - error: Expression of type "Tuple[Promise] | Promise | VoidPromise | Tuple[Unknown, ...] | int | None" cannot be assigned to declared type "int"
    Type "Tuple[Promise] | Promise | VoidPromise | Tuple[Unknown, ...] | int | None" cannot be assigned to type "int"
      "Promise" is incompatible with "int" (reportGeneralTypeIssues)
1 error, 0 warnings, 0 informations

Trying this with mypy gives a different error:

$ mypy workflow_test.py
workflow_test.py:1: error: Skipping analyzing "flytekit": module is installed, but missing library stubs or py.typed marker  [import-untyped]
workflow_test.py:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports
Found 1 error in 1 file (checked 1 source file)
dennisobrien commented 9 months ago

Returning a tuple of values also causes pyright errors.

Here's an example workflow and task:

from typing import Tuple

from flytekit import task, workflow

@task
def task_get_a_b() -> Tuple[int, int]:
    return 1, 2

@workflow
def my_workflow() -> int:
    a: int
    b: int
    a, b = task_get_a_b()
    return a

And here are the errors reported by pyright:

$ pyright workflow_test.py
workflow_test.py
  workflow_test.py:15:12 - error: "VoidPromise" is not iterable
    "__iter__" method not defined (reportGeneralTypeIssues)
  workflow_test.py:15:12 - error: "None" is not iterable
    "__iter__" method not defined (reportGeneralTypeIssues)
  workflow_test.py:15:5 - error: Expression with type "Tuple[Promise] | Promise | VoidPromise | Tuple[Unknown, ...] | Tuple[int, int] | None" cannot be assigned to target tuple
    Type "Tuple[Promise]" is incompatible with target tuple
      Tuple size mismatch; expected 2 but received 1 (reportGeneralTypeIssues)
  workflow_test.py:15:12 - error: Expression of type "Promise | Unknown | int" cannot be assigned to declared type "int"
    Type "Promise | Unknown | int" cannot be assigned to type "int"
      "Promise" is incompatible with "int" (reportGeneralTypeIssues)
4 errors, 0 warnings, 0 informations
dennisobrien commented 9 months ago

And one more example using a dataclass:

from dataclasses import dataclass

from dataclasses_json import DataClassJsonMixin
from flytekit import task, workflow

@dataclass
class MyDataClass(DataClassJsonMixin):
    a: int
    b: int

@task
def task_get_mydataclass() -> MyDataClass:
    return MyDataClass(a=1, b=2)

@workflow
def my_workflow() -> bool:
    my_dataclass: MyDataClass = task_get_mydataclass()
    if my_dataclass.a == 1 and my_dataclass.b == 2:
        return True
    return False

And the error from pyright:

pyright workflow_test.py

/workflow_test.py
  workflow_test.py:18:33 - error: Expression of type "Tuple[Promise] | Promise | VoidPromise | Tuple[Unknown, ...] | MyDataClass | None" cannot be assigned to declared type "MyDataClass"
    Type "Tuple[Promise] | Promise | VoidPromise | Tuple[Unknown, ...] | MyDataClass | None" cannot be assigned to type "MyDataClass"
      "Promise" is incompatible with "MyDataClass" (reportGeneralTypeIssues)
1 error, 0 warnings, 0 informations
fellhorn commented 7 months ago

We also ran into this issue, blocking us from type checking the flyte parts of our code base. It seems like flytekit types generally describe the static workflow graph at compilation, with nodes returning promises. This is contrary to our efforts here to check the execution types, a developer is usually interested in and writes the type annotations for.

I don't really have a good idea how to solve this, as the Promise type is equally valid and seems to be required for manipulating the graph. Does somebody have a better idea here than having own type stubs or just completely ignoring the flytekit types?

fellhorn commented 5 months ago

After digging deeper into this, I assume there is currently no solution do solve this in flytekit. Internally we do use a workaround, which I will elaborate at the end.

Example

"""A simple example on the typing issues in flytekit."""
from typing import Callable, ParamSpec, TypeVar, Union
from flytekit import task, workflow
from flytekit.core.promise import Promise

@task
def add_one(a: int) -> int:
    return a + 1

@workflow
def my_workflow() -> int:
    # Add one should be able to receive an int
    start: int = 0
    res = add_one(a=start)

    # The return type of the decorated task is no longer
    # an int but a Promise
    assert not isinstance(res, int) 

    # Thus the decorated add_one takes either a Promise or an int
    add_one(a=res)

    return res

Problem analysis

To me it seems like the correct types to cover both runtime and registration time view on the workflow can't be expressed in the python type system, as it does not have a Map operator on types. A task decorator would need to change a function signature from

T = TypeVar("T")
U = TypeVar("U")
V = TypeVar("V")
def python_func(arg1: T, arg2: U) -> V: ...

to something like

def decorated_python_func(arg1: Promise[T] | T, arg2: Promise[U] | U) -> Promise[V]: ...

To annotate the types in the flytekit.task decorator one would need to express this mapping for a generic

FuncIn = ParamSpec("FuncIn")
FuncOut = TypeVar("FuncOut")
ToDecorateFunction = Callable[FuncIn, FuncOut]

where each type T in FuncIn.kwarg is mapped to Promise[T] | T.

This is not yet supported in python's type system: https://github.com/python/typing/discussions/1163

Workaround

Our company already internally replaced flytekit.task with our own custom decorator. This also allowed us to overwrite the flytekit types to look only at the runtime types:

from typing import Callable, Optional, ParamSpec, TypeVar, Union, overload
from flytekit.core.task import task as flytekit_task

FuncOut = TypeVar("FuncOut")
FuncIn = ParamSpec("FuncIn")

@overload
def our_task(
    _task_function: None = ...,
    **kwargs,
) -> Callable[[Callable[FuncIn, FuncOut]], Callable[FuncIn, FuncOut]]: ...

@overload
def our_task(
    _task_function: Callable[FuncIn, FuncOut],
    **kwargs,
) -> Callable[FuncIn, FuncOut]: ...

def our_task(
    _task_function: Optional[Callable[FuncIn, FuncOut]] = None,
    **kwargs,
) -> Union[Callable[FuncIn, FuncOut], Callable[[Callable[FuncIn, FuncOut]], Callable[FuncIn, FuncOut]]]:
    # Do some other stuff here
    # ...

    # Then actually call the flytekit task
    def wrapped(_func: Callable[FuncIn, FuncOut]) -> Callable[FuncIn, FuncOut]:
        # As we want to type check the runtime
        return flytekit_task(_task_function=target_fn(_func), **kwargs)  # type: ignore

    if _task_function:
        return wrapped(_task_function)
    else:
        return wrapped

This can be done in a similar way for other flytekit decorators like workflow or dynamic.

It allows us to type check workflows for their runtime view but also causes issues one needs to be aware of:

from flytekit import workflow, Resources

@our_task
def add_one(a: int) -> int:
    return a + 1

@workflow
def my_workflow() -> int:
    start: int = 0

    # res will be an int in pyright as we use `our_task`
    res = add_one(a=start)

    # IMPORTANT!
    # The pyright types will be wrong but allow type checking, i.e.
    # this type assertion will fail: 
    # assert isinstance(res, int)
    # as res is actually still a Promise:
    print(type(res)) # <class 'flytekit.core.promise.Promise'>

    # When using anything related to `flytekit` Promises, as e.g. `with_overrides`, we need a small workaround:
    res2 = add_one(a=res) # res2 will still be an int in pyright

    # With overrides is called separately and needs type: ignore as pyright sees res2 as an int and not a Promise
    res2.with_overrides(requests=Resources(cpu="0.1", mem="500Mi"))  # type: ignore

    return res2
eapolinario commented 3 months ago

I wasn't aware that we can use TYPE_CHECKING inside class definitions, e.g. https://github.com/zen-xu/sunray/blob/c164537614d050a4acd67318913b6092ada799c8/sunray/_internal/dag.py#L49.

We could use this trick to provide a different type signature during at type-checking time.

fellhorn commented 3 months ago

Indeed, we could offer the runtime types with this trick. I think this would still not solve the issue as flytekit types could still not be expressed at the same time. Graph node functions in the workflow like .with_overrides would then be missing at type-checking time as described above.

Might be still a better developer experience?