flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.2k stars 550 forks source link

[BUG] Task config should be used when computing the task version hash #5364

Open ddl-rliu opened 1 month ago

ddl-rliu commented 1 month ago

Describe the bug

  1. Run pyflyte run --remote on this file, twice.
from flytekit import task, workflow
from flytekit.core.base_task import PythonTask
from flytekit.core.interface import Interface
import rich_click as click
from dataclasses import dataclass
from typing import Optional
import datetime

@dataclass
class MyConfig(object):
    MyField: Optional[str] = None

class MyTask(PythonTask[MyConfig]):
    def __init__(
        self,
        name: str,
        my_config: MyConfig,
        **kwargs,
    ):
        my_config.MyField = str(datetime.datetime.now())
        click.secho(f"Resolved my_config: {my_config}", fg="cyan")
        super().__init__(
            name=name,
            task_type="my_task",
            interface=Interface(),
            task_config=my_config,
            **kwargs,
        )

@workflow
def wf2():
    job = MyTask(
        name="test",
        my_config=MyConfig(),
    )

    return job()
  1. The output looks similar to below. The version string for each job is identical.
    
    $ pyflyte run --remote wf2.py wf2
    Running Execution on Remote.
    Resolved job properties: MyConfig(MyField='2024-05-14 22:03:53.646933')

[βœ”] Go to https://.../f6562c3c440124ee3a76 to see execution in the console.

$ pyflyte run --remote wf2.py wf2 Running Execution on Remote. Resolved my_config: MyConfig(MyField='2024-05-14 22:09:07.348125')

[βœ”] Go to https://.../f72aa991349d041f499b to see execution in the console.


### Expected behavior

Task version strings should be different because the task configs are different.

Especially when used in a custom flyte agent that has more complex logic, this will lead to an error like:

RPC Failed, with Status: StatusCode.INVALID_ARGUMENT details: task with different structure already exists:

Additional context to reproduce

No response

Screenshots

No response

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

welcome[bot] commented 1 month ago

Thank you for opening your first issue here! πŸ› 

runllm[bot] commented 1 month ago

The issue you're encountering is due to Flyte not considering changes in task configuration when computing the task version hash. This can lead to errors when the task configuration changes but the version remains the same.

To address this, you can manually specify a version when registering the task to ensure that changes in the configuration are recognized. Here's a modified version of your script that includes a version parameter:

from flytekit import task, workflow
from flytekit.core.base_task import PythonTask
from flytekit.core.interface import Interface
import rich_click as click
from dataclasses import dataclass
from typing import Optional
import datetime

@dataclass
class MyConfig(object):
    MyField: Optional[str] = None

class MyTask(PythonTask[MyConfig]):
    def __init__(
        self,
        name: str,
        my_config: MyConfig,
        version: str,
        **kwargs,
    ):
        my_config.MyField = str(datetime.datetime.now())
        click.secho(f"Resolved my_config: {my_config}", fg="cyan")
        super().__init__(
            name=name,
            task_type="my_task",
            interface=Interface(),
            task_config=my_config,
            version=version,
            **kwargs,
        )

@workflow
def wf2():
    job = MyTask(
        name="test",
        my_config=MyConfig(),
        version="v1.0.0"  # Change this version manually to reflect changes
    )

    return job()

Would you like more details on how to handle task versioning in Flyte?

Sources:


This is an AI-generated response and your feedback is appreciated! Please leave a πŸ‘ if this is helpful and πŸ‘Ž if it is not.

Mention @runllm to ask follow-up questions. See runllm.com for more!

kumare3 commented 1 month ago

@ddl-rliu - this is a controversial topic. Maybe we need to add that as a config? As folks change mem/cpu but often want it to be cached. I do think this might be interesting and in this case, i would simply hash the entire tasktemplate spec

kumare3 commented 1 month ago

cc @EngHabu / @eapolinario / @wild-endeavor / @cosmicBboy what do you folks think?

ddl-rliu commented 1 month ago

Right, that makes sense. Since values like mem/cpu are variable inputs, it's actually desirable for them to not cause changes to the version string. What might be better is, for our case, for us to move these kinds of fields out of the custom field, and into another field that's suited for variable input-type fields. (rather than this idea of using the task config to compute the version hash)

ddl-rliu commented 1 month ago

Maybe we need to add that as a config?

Another idea that I thought of is to extend Enable flytekit to be pluggable – and extend the version hash calculation to be pluggable. Then interested parties can write a plugin to add the task config into the version hash logic.