kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.62k stars 1.63k forks source link

[bug] Artifact resolution failure in Conditional branches #11275

Open Alexandre-Ramtoula opened 1 month ago

Alexandre-Ramtoula commented 1 month ago

I'm converting pipelines from SDK 1.8.6 to 2.9.0 and having issues when trying to pass artifacts from components outside of dsl condition blocks to components within them. I created a simplified example to show the error I'm having.

Environment

Steps to reproduce

Use the following code to create a pipeline:

import kfp
from kfp import dsl
from kfp.dsl import *
from typing import *

@dsl.component
def start_comp(choose_txt_artifact: Output[Artifact]):
    message = 'I choose the pokemon '
    # Create artifact file
    with open(choose_txt_artifact.path, 'w') as f:
        f.write(message)

@dsl.component
def choose_element_starter() -> str:
    import random
    return random.choice(['Grass', 'Fire', 'Water'])

@dsl.component
def print_pokemon_message(pokemon: str, choose_txt_artifact: Input[Artifact]):
    # Read artifact file
    with open(choose_txt_artifact.path) as f:  
        base_message = f.read() 
    print(f"{base_message} {pokemon}!")

@dsl.pipeline
def pokemon_pipeline():
    start_task = start_comp()
    element_task = choose_element_starter()

    # Conditional branches
    with dsl.If(element_task.output == 'Grass'):
        print_pokemon_message(pokemon='Bulbasaur', choose_txt_artifact=start_task.outputs['choose_txt_artifact']) 
    with dsl.Elif(element_task.output == 'Fire'):
        print_pokemon_message(pokemon='Charmander', choose_txt_artifact=start_task.outputs['choose_txt_artifact'])
    with dsl.Else():
        print_pokemon_message(pokemon='Squirtle', choose_txt_artifact=start_task.outputs['choose_txt_artifact'])

The pipeline fails during execution with an error related to resolving input artifacts in conditional branches.

Expected result

The print_pokemon_message component should receive the choose_txt_artifact output from start_comp and print the appropriate message.

The pipeline fails with an error when trying to resolve the input artifact pipelinechannel--start-comp-choose_txt_artifact within the conditional branches:

failed to resolve inputs: failed to resolve input artifact pipelinechannel--start-comp-choose_txt_artifact with spec component_input_artifact:"pipelinechannel--start-comp-choose_txt_artifact": component input artifact not implemented yet

I believe passing artifacts into components inside conditional branches may not be working as expected.

Materials and Reference

 parent DAG input parameters map[pipelinechannel--choose-pokemon-starter-Output:string_value:"Grass"]
 KFP driver: driver.DAG(pipelineName=pokemon-pipeline, runID=[REDACTED], task="condition-4", component="comp-condition-4", dagExecutionID=[REDACTED], componentSpec) failed: failed to resolve inputs: failed to resolve input artifact pipelinechannel--start-comp-choose_txt_artifact with spec component_input_artifact:"pipelinechannel--start-comp-choose_txt_artifact": component input artifact not implemented yet
time="2024-10-08T15:37:18.142Z" level=info msg="sub-process exited" argo=true error="<nil>"
time="2024-10-08T15:37:18.142Z" level=error msg="cannot save parameter /tmp/outputs/execution-id" argo=true error="open /tmp/outputs/execution-id: no such file or directory"
time="2024-10-08T15:37:18.142Z" level=error msg="cannot save parameter /tmp/outputs/iteration-count" argo=true error="open /tmp/outputs/iteration-count: no such file or directory"
time="2024-10-08T15:37:18.142Z" level=error msg="cannot save parameter /tmp/outputs/condition" argo=true error="open /tmp/outputs/condition: no such file or directory"

Impacted by this bug? Give it a 👍.