flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.72k stars 642 forks source link

[BUG] Accessing Dataclass fields fail on local execute #4581

Open EngHabu opened 10 months ago

EngHabu commented 10 months ago

Describe the bug

from dataclasses import dataclass
import typing
from dataclasses_json import dataclass_json
from flytekit import task, workflow

@dataclass_json
@dataclass
class Foo:
  a: typing.List[str]

@task
def my_task(a: typing.List[str]) -> int:
  return len(a)

@task
def gen_foo() -> Foo:
  return Foo(a=["hello", "world"])

@workflow
def my_wf() -> int:
  f = gen_foo()
  return my_task(a=f.a)

Run:

pyflyte run my_wf.py my_wf

Error:

Running Execution on local.
2023-12-12 08:55:30,747011 ERROR    {"asctime": "2023-12-12 08:55:30,747", "name": "flytekit", "levelname": "ERROR", "message": "Failed to    base_task.py:263
                                    convert inputs of task 'test_bug.my_task':\n  Failed argument 'a': Failed to convert to generic protobuf                  
                                    struct"}                                                                                                                  
Failed with Unknown Exception <class 'TypeError'> Reason: Encountered error while executing workflow 'test_bug.my_wf':
  Error encountered while executing 'my_wf':
  Failed to convert inputs of task 'test_bug.my_task':
  Failed argument 'a': Failed to convert to generic protobuf struct
Encountered error while executing workflow 'test_bug.my_wf':
  Error encountered while executing 'my_wf':
  Failed to convert inputs of task 'test_bug.my_task':
  Failed argument 'a': Failed to convert to generic protobuf struct

Expected behavior

It should work...

Additional context to reproduce

No response

Screenshots

No response

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

dosubot[bot] commented 10 months ago

🤖

Hi @EngHabu, I'm Dosu, a friendly bot here to assist you while our human maintainers are away. I'm here to help you solve bugs, answer questions, and guide you through becoming a contributor. Let's get started on addressing your issue!

Based on the error message you provided, it seems like the issue is related to the conversion of the 'a' argument from the Foo dataclass to a generic protobuf struct. This conversion is necessary for Flyte to pass the data between tasks in a workflow.

The dataclass_json decorator you're using is supposed to handle the conversion of the dataclass to a JSON-compatible format, which should then be compatible with the protobuf struct. However, it seems like this conversion is not happening correctly.

One possible reason for this could be a mismatch in the data types of the 'a' argument in the workflow and the task. In the workflow 'my_wf', the 'a' argument is defined as a simple type 1, which corresponds to an integer in Flyte's type system. However, in the task 't2' where 'a' is used as an input, it is defined as a simple type 3, which corresponds to a string in Flyte's type system. This discrepancy in data types could be causing the TypeError.

To resolve this issue, you should ensure that the data types of the 'a' argument in both the workflow and the task match. You can refer to the YAML files in the Flyte repository for the correct data types for each argument.

If the data types are already matching and the error still persists, it could be a bug in the Flyte codebase or in the dataclass_json library. In that case, it would be helpful if you could provide more information about your environment, such as the versions of Flyte and dataclass_json you're using, and any other relevant details.

For more information on how Flyte handles the conversion of Python data types to protobuf structs, you can refer to the ProtobufStruct class in the Flyte codebase. This class handles the conversion process in its to_dict method.

I hope this helps! If you have any more questions or need further clarification, feel free to ask.

Source

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

JackUrb commented 4 months ago

Ran into this bug as well. I did some digging into this as Flyte is looking to be a perfect fit for us, but this list issue is a clear blocker.

The underlying issue appears to be that _MessageToDict, as used by ProtobufTransformer does not actually always return dicts, meaning if the result is blindly fed into the protobuf struct the whole thing fails. In particular, if a dataclass has an attribute that is a simple list, _MessageToDict ends up called on that attribute at some point (as it is not a simple type) and then returns a list, which blows up when trying to pass it to struct.update.

With very little knowledge of what is going on under the surface, my closest patch was to have the ProtobufTransformer act more like the ListTransformer in this specific case:

class ProtobufTransformer(TypeTransformer[Message]):
    ...

    def _handle_list_literal(self, ctx: FlyteContext, elems: list) -> Literal:
        if len(elems) == 0:
            return Literal(collection=LiteralCollection(literals=[]))
        st = type(elems[0])
        lt = TypeEngine.to_literal_type(st)
        lits = [TypeEngine.to_literal(ctx, x, st, lt) for x in elems]
        return Literal(collection=LiteralCollection(literals=lits))

    def to_literal(self, ctx: FlyteContext, python_val: T, python_type: Type[T], expected: LiteralType) -> Literal:
        struct = Struct()
        try:
            message_dict = _MessageToDict(cast(Message, python_val))
            if isinstance(message_dict, list):
                # _MessageToDict will return a `list` on ListValue protobufs
                return self._handle_list_literal(ctx, message_dict)
            struct.update(message_dict)
        except Exception:
            raise TypeTransformerFailedError("Failed to convert to generic protobuf struct")
        return Literal(scalar=Scalar(generic=struct))

    ...

Now, this appears to work, both for the case shared by @EngHabu as well as the slightly more complicated nested dataclass example below:

from dataclasses import dataclass
import typing
from dataclasses_json import dataclass_json
from flytekit import task, workflow

@dataclass_json
@dataclass
class Foo:
    a: typing.List[str]

@dataclass_json
@dataclass
class Bar:
    b: typing.List[Foo]

@task
def my_task(a: typing.List[Foo]) -> tuple[int, list[str]]:
    joined_list = []
    for f in a:
        joined_list += f.a
    return len(joined_list), joined_list

@task
def gen_bar() -> Bar:
  return Bar(b=[Foo(a=["hello", "world"])])

@workflow
def my_wf() -> tuple[int, list[str]]:
  b = gen_bar()
  return my_task(a=b.b)
>> pyflyte run workflows/graphs/test_case.py my_wf
Running Execution on local.
DefaultNamedTupleOutput(o0=2, o1=['hello', 'world'])

I hesitate to open this as a PR though, as it does not however seem to hold up to the same typing rigor that the rest of Flyte seems to have - the resolution in _handle_list_literal is pretty ad-hoc.

@eapolinario do you have suggestions on where this may be taken farther?