flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.42k stars 581 forks source link

[BUG] New flytekit SDK versions fails when registering a script #5706

Open zshareef opened 2 weeks ago

zshareef commented 2 weeks ago

Describe the bug

If we use flytekit 1.13.0 or higher version then we get the error that failed to compile workflow and TaskReferenceNotFound. Till version flytekit==1.11.0 everything was working fine. But when we have a new clean installation with flytekit==1.13.0 or higher versions then we get the following error:

debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"failed to compile workflow for [resource_type:WORKFLOW project:\"playground\" domain:\"dev\" name:\"hello.hello_wf\" version:\"c2d8jj\"] with err failed to compile workflow with err Collected Errors: 1\n\tError 0: Code: TaskReferenceNotFound, Node Id: start-node, Description: Referenced Task [resource_type:TASK name:\"hello.hello_world\" version:\"c2d8jj\"] not found.\n", grpc_status:3, created_time:"2024-08-28T14:11:52.293396625+00:00"}"

Expected behavior

In expected behavior the workflow should be compiled and TaskReference should be found. The workflow should execute properly without any error.

Additional context to reproduce

Flytekit Issues

There is a bug in Flytekit SDK when registering a script.

Reproduce issue

Required software

Steps-to-reproduce

from flytekit import task, workflow

@task
def say_hello() -> str:
    return "Hello, World!"

@workflow
def hello_world_wf() -> str:
    res = say_hello()
    return res

"""Runs a Flyte workflow called directly from Python."""

import json
import logging
import os
import random
import string
import sys
from importlib import import_module
from pathlib import Path
from typing import Any, List

import keyring
from flytekit import WorkflowExecutionPhase
from flytekit.configuration import Config as FlyteConfig
from flytekit.configuration import ImageConfig, PlatformConfig
from flytekit.core.notification import Email
from flytekit.models.security import Identity, SecurityContext
from flytekit.remote.remote import FlyteRemote
from flytekit.tools.translator import Options

def _create_flyte_remote(project: str, domain: str, url: str) -> FlyteRemote:
    """Creates a flyte config file."""
    return FlyteRemote(
        config=FlyteConfig(platform=PlatformConfig(endpoint=url, insecure=True)),
        default_domain=domain,
        default_project=project,
    )

def load_function(python_file: Path, function: str) -> Any:
    """Loads a function by name."""
    module_path = python_file.parent.resolve()
    module = os.path.basename(python_file).replace(".py", "")

    sys.path.append(str(module_path))
    module = import_module(module)

    return getattr(module, function)

def run_workflow(
    project: str,
    domain: str,
    url: str,
    workflow_file: str,
    workflow: str,
):
    """Runs a Flyte workflow."""
    try:
        options = Options()

        workflow_file_path = Path(workflow_file)

        flyte_remote = _create_flyte_remote(project=project, domain=domain, url=url)
        entity_wf = load_function(workflow_file_path, workflow)
        random_suffix = "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(6))  # noqa: S311
        execution_id = f"{workflow.replace('_', '-')}-{random_suffix}"

        flyte_remote.register_script(
            entity=entity_wf,
            source_path=workflow_file_path.parent.resolve(),
            options=options,
            version=random_suffix,
            copy_all=True,
        )

        flyte_remote.execute(
            entity=entity_wf,
            image_config=ImageConfig.auto_default_image(),
            options=options,
            version=random_suffix,
            wait=False,
            inputs=inputs,
            execution_name=execution_id,
        )

        logging.info(f"Flyte execution ID: {execution_id}")
    except Exception as e:
        logging.error(f"Error in Flyte Runner: {e}")
        sys.exit(1)

if __name__ == "__main__":
    run_workflow(
        project="flytesnacks",
        domain="development",
        url="localhost:30080",
        workflow_file="hello_world.py",
        workflow="hello_world_wf",
    )

Screenshots

image

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

welcome[bot] commented 2 weeks ago

Thank you for opening your first issue here! 🛠

runllm[bot] commented 2 weeks ago

The error you're encountering (TaskReferenceNotFound) when using flytekit version 1.13.0 or higher is likely due to changes in how tasks and workflows are registered and referenced in newer versions. This issue can be related to the module resolution and task registration process.

According to this GitHub issue, Flytekit versions after 0.32.0 handle module names differently, which might be causing the TaskReferenceNotFound error. Additionally, this discussion suggests that the task resolver might be confused about the task's location.

Would you like more detailed steps or further assistance?

Sources:


This is an AI-generated response and your feedback is appreciated! Please leave a 👍 if this is helpful and 👎 if it is not.

Mention @runllm to ask follow-up questions. See runllm.com for more!

zshareef commented 1 week ago

Hi @eapolinario

Any update on this Ticket/issue.

kumare3 commented 5 days ago

@zshareef I was able to reproduce, trying to look what the possible problem is. Why are you not using pyflyte run? @eapolinario did you take a look?

eapolinario commented 4 days ago

Some bisecting points to https://github.com/flyteorg/flytekit/pull/2367 as the culprit.

We know that something broke after 1.11.0, the bug was introduced by. Here's the git bisect details:

❯ git bisect start $(git show -s --format=%H v1.12.0) $(git show -s --format=%H v1.11.0)
Bisecting: 54 revisions left to test after this (roughly 6 steps)
[e121d35009b6b91112c1ba1a007edcce2e18d7d7] fix databricks job request serialization (#2286)

~/repos/flytekit @e121d350 *2 bisect
❯ git bisect good
Bisecting: 27 revisions left to test after this (roughly 5 steps)
[824353d784f16ce6c4330e91fb9fada15191c754] Feature/array node workflow parallelism (#2268)

~/repos/flytekit @824353d7 *2 bisect
❯ git bisect good
Bisecting: 13 revisions left to test after this (roughly 4 steps)
[93f690c2124d1c15c6c65cd98d384f881dba41fb] Query by default when missing (#2379)

~/repos/flytekit @93f690c2 *2 bisect
❯ git bisect bad
Bisecting: 6 revisions left to test after this (roughly 3 steps)
[f1689b7dbd3e872dbb48e92d968c87826ffc6bde] Remove redundant error message and update exit strategy (#2374)

~/repos/flytekit @f1689b7d *2 bisect
❯ git bisect good
Bisecting: 3 revisions left to test after this (roughly 2 steps)
[da0485faf763de9b0db2b452ea1c97c24f2d213b] Fix use default inputs with remote LaunchPlan (#2372)

~/repos/flytekit @da0485fa *2 bisect
❯ git bisect bad
Bisecting: 0 revisions left to test after this (roughly 1 step)
[069f87b064dabe918b192214c83649f8da5f5056] Use default serialization settings if serialization_settings is None (#2367)

Reverting that commit fixes the issue. The original PR is https://github.com/flyteorg/flytekit/pull/2367.

zshareef commented 3 days ago

Hi @eapolinario

Thank you very much for your reply. Please let me know once you fix this bug and we can test it.