flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.17k stars 550 forks source link

[BUG] nested dynamic won't bind pydantic models or dictionaries as inputs #5473

Open LinoJaime opened 2 weeks ago

LinoJaime commented 2 weeks ago

Describe the bug

if a nested dynamic task has a pydantic model or dict as an input it won't bind to the task

The error when using pydantic is:

[User] malformed dynamic workflow, caused by: Collected Errors: 10
    Error 0: Code: MismatchingBindings, Node Id: dn0, Description: Input [l] on node [dn0] expects bindings of type [simple:STRUCT].  Received []

When transform pydantic model into a dict the error is:

TypeError: Failed to convert inputs of task 'flyte.workflows.test_wf.level2':
  'NoneType' object is not subscriptable

Expected behavior

Pydantic models and dictionaries bind properly to the nested dynamic task

Additional context to reproduce

Pydantic:

from typing import List

from flytekit import ImageSpec, dynamic, workflow
from pydantic.v1 import BaseModel

class SModel(BaseModel):
    name: str

@dynamic(container_image=pydantic_image)
def level2(l: SModel) -> List[int]:
    return [1, 2, 3]

@dynamic(container_image=pydantic_image)
def level1(l: List[int]) -> str:
    v = []
    for i in l:
        v.append(level2(l=SModel(name="a")))
    return "test"

@workflow
def multi_wf(l: List[int] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) -> str:
    return level1(l=l)

Dict:


from typing import List

from flytekit import ImageSpec, dynamic, workflow
from pydantic.v1 import BaseModel

class SModel(BaseModel):
    name: str

@dynamic(container_image=pydantic_image)
def level2(l: dict) -> List[int]:
    return [1, 2, 3]

@dynamic(container_image=pydantic_image)
def level1(l: List[int]) -> str:
    v = []
    for i in l:
        v.append(level2(l=SModel(name="a").dict()))
    return "test"

@workflow
def multi_wf(l: List[int] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) -> str:
    return level1(l=l)

Screenshots

No response

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

welcome[bot] commented 2 weeks ago

Thank you for opening your first issue here! 🛠

runllm[bot] commented 2 weeks ago

The issue you're encountering with nested dynamic tasks not binding Pydantic models or dictionaries as inputs is a known problem. This is due to the way Flyte handles type transformations and bindings in dynamic workflows. Specifically, Flyte's type system and serialization mechanisms can struggle with complex types like Pydantic models and dictionaries when used in nested dynamic tasks.

Pydantic Model Issue

The error message for Pydantic models indicates a type mismatch:

[User] malformed dynamic workflow, caused by: Collected Errors: 10
    Error 0: Code: MismatchingBindings, Node Id: dn0, Description: Input [l] on node [dn0] expects bindings of type [simple:STRUCT].  Received []

This suggests that Flyte is expecting a STRUCT type but is not receiving the correct bindings.

Dictionary Issue

The error message for dictionaries indicates a NoneType issue:

TypeError: Failed to convert inputs of task 'flyte.workflows.test_wf.level2':
  'NoneType' object is not subscriptable

This suggests that the dictionary is not being correctly serialized or deserialized, leading to a NoneType being passed where a dictionary is expected.

Workaround

One potential workaround is to serialize the Pydantic model or dictionary to a JSON string before passing it to the nested dynamic task and then deserialize it within the task. This can help bypass the type transformation issues.

Here is an example of how you can modify your code to use JSON serialization:

Pydantic Model with JSON Serialization

from typing import List
from flytekit import dynamic, workflow
from pydantic.v1 import BaseModel
import json

class SModel(BaseModel):
    name: str

@dynamic
def level2(l: str) -> List[int]:
    model = SModel.parse_raw(l)
    return [1, 2, 3]

@dynamic
def level1(l: List[int]) -> str:
    v = []
    for i in l:
        v.append(level2(l=SModel(name="a").json()))
    return "test"

@workflow
def multi_wf(l: List[int] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) -> str:
    return level1(l=l)

Dictionary with JSON Serialization

from typing import List
from flytekit import dynamic, workflow
import json

@dynamic
def level2(l: str) -> List[int]:
    d = json.loads(l)
    return [1, 2, 3]

@dynamic
def level1(l: List[int]) -> str:
    v = []
    for i in l:
        v.append(level2(l=json.dumps({"name": "a"})))
    return "test"

@workflow
def multi_wf(l: List[int] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) -> str:
    return level1(l=l)

Would you like more details on this workaround or any other aspect of Flyte?

Sources:


This is an AI-generated response and your feedback is appreciated! Please leave a 👍 if this is helpful and 👎 if it is not.

Mention @runllm to ask follow-up questions. See runllm.com for more!