flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.78k stars 659 forks source link

[BUG] Task registration fails if `node_dependency_hints` includes a workflow #5989

Open Tom-Newton opened 4 days ago

Tom-Newton commented 4 days ago

Describe the bug

With the following workflow

@task
def task0():
    return None

@workflow
def workflow0():
    return task0()

@dynamic(node_dependency_hints=[workflow0])
def dynamic0():
    return workflow0()

@workflow
def workflow1():
    return dynamic0()
bazel run example -- remote register dynamic0

fails with

RPC Failed, with Status: StatusCode.NOT_FOUND
        Details: missing entity of type TASK with identifier project:"flyteexamples"  domain:"development"  name:"<redacted>.workflow.workflow0"  version:"L6mEsnIJmUVT84b059kVJA"

However

bazel run example -- remote register workflow1

works as expected

Expected behavior

bazel run example -- remote register dynamic0

Should register the workflow successfully

Additional context to reproduce

The cause

The registration code assumes that the last entity registered is the top level one

        # concurrent register
        cp_task_entity_map = OrderedDict(filter(lambda x: isinstance(x[1], task_models.TaskSpec), m.items()))
        tasks = []
        loop = asyncio.get_event_loop()
        for entity, cp_entity in cp_task_entity_map.items():
            tasks.append(
                loop.run_in_executor(
                    None,
                    functools.partial(self.raw_register, cp_entity, serialization_settings, version, og_entity=entity),
                )
            )

        identifiers_or_exceptions = []
        identifiers_or_exceptions.extend(await asyncio.gather(*tasks, return_exceptions=True))
        # Check to make sure any exceptions are just registration skipped exceptions
        for ie in identifiers_or_exceptions:
            if isinstance(ie, RegistrationSkipped):
                logger.info(f"Skipping registration... {ie}")
                continue
            if isinstance(ie, Exception):
                raise ie
        # serial register
        cp_other_entities = OrderedDict(filter(lambda x: not isinstance(x[1], task_models.TaskSpec), m.items()))
        for entity, cp_entity in cp_other_entities.items():
            identifiers_or_exceptions.append(
                self.raw_register(cp_entity, serialization_settings, version, og_entity=entity)
            )
        return identifiers_or_exceptions[-1]

https://github.com/flyteorg/flytekit/blob/8fdd0c68f10b9f08cd65aa74ef90f0d9af564dd2/flytekit/remote/remote.py#L890 However this assumption no longer holds when registering a task with a node_dependency_hint on a workflow. I guess we need to switch to retrieving the top level registered entity by ID instead of assuming.

Screenshots

No response

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

eapolinario commented 1 day ago

@Tom-Newton , is this something that you could contribute?

Tom-Newton commented 1 day ago

@Tom-Newton , is this something that you could contribute?

I don't know when but I probably can

eapolinario commented 1 day ago

Thank you. I'm going to assign this to you, but in case you notice that you won't have time, please comment again and we can try to find someone to take a look, ok?