kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.54k stars 1.6k forks source link

[sdk] Can't use InputPath together with a container_component #9986

Closed fbpo23 closed 11 months ago

fbpo23 commented 11 months ago

Environment

Steps to reproduce

I'm trying to setup a pipeline using container components, but the InputPath seems to not be working correctly with them. In this code snippet:

import kfp
from kfp import dsl

@dsl.component
def create_dataset(dataset_path: dsl.OutputPath('Dataset'),):
    import json
    dataset = {'my_dataset': [[1, 2, 3], [4, 5, 6]]}
    with open(dataset_path, 'w') as f:
        json.dump(dataset, f)

@dsl.component
def consume_dataset(dataset: dsl.InputPath('Dataset')):
    print(dataset)

@dsl.pipeline(name='my-pipeline')
def my_pipeline():
    create_dataset_op = create_dataset()
    consume_dataset(dataset=create_dataset_op.outputs['dataset_path'])

def gen_pipeline(pipeline_fn):
    kfp.compiler.Compiler().compile(my_pipeline, f"{pipeline_fn}.yaml")

gen_pipeline("new_pipeline_v2")

It all compiles correctly with no problems. However by switching to container_component in this snippet:

import kfp
from kfp import dsl

@dsl.container_component
def create_dataset(dataset_path: dsl.OutputPath('Dataset'),):
    return dsl.ContainerSpec(
        image="image/path",
        command=["python", "/microservice/pipeline_step.py"],
        args=[
            "--in-uri",
            dataset_path,
        ],
    )

@dsl.container_component
def consume_dataset(dataset: dsl.InputPath('Dataset')):
    return dsl.ContainerSpec(
        image="image/path",
        command=["python", "/microservice/pipeline_step.py"],
        args=[
            "--in-uri",
            dataset,
        ],
    )

@dsl.pipeline(name='my-pipeline')
def pipeline():
    create_dataset_op = create_dataset()
    consume_dataset(dataset=create_dataset_op.outputs['dataset_path'])

def gen_pipeline(pipeline_fn):
    kfp.compiler.Compiler().compile(pipeline, f"{pipeline_fn}.yaml")

gen_pipeline("new_pipeline_v2")

The pipeline no longer compiles and it gives me the error:

Traceback (most recent call last):
  File "/home/francisco/scotty/bitbucket/kubeflow-pipelines/intent_recognizer_pipeline/pipeline/new_pipeline_v2.py", line 16, in <module>
    @dsl.container_component
     ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/francisco/.conda/envs/kubeflow/lib/python3.11/site-packages/kfp/dsl/container_component_decorator.py", line 53, in container_component
    return component_factory.create_container_component_from_func(func)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/francisco/.conda/envs/kubeflow/lib/python3.11/site-packages/kfp/dsl/component_factory.py", line 607, in create_container_component_from_func
    make_input_for_parameterized_container_component_function(
  File "/home/francisco/.conda/envs/kubeflow/lib/python3.11/site-packages/kfp/dsl/component_factory.py", line 586, in make_input_for_parameterized_container_component_function
    placeholder._ir_type = type_utils.get_parameter_type_name(
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/francisco/.conda/envs/kubeflow/lib/python3.11/site-packages/kfp/dsl/types/type_utils.py", line 198, in get_parameter_type_name
    return pipeline_spec_pb2.ParameterType.ParameterTypeEnum.Name(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/francisco/.conda/envs/kubeflow/lib/python3.11/site-packages/google/protobuf/internal/enum_type_wrapper.py", line 68, in Name
    raise TypeError(
TypeError: Enum value for ParameterTypeEnum must be an int, but got <class 'NoneType'> None.

This is weird because I kept the exact same function signature in both snippets, the real only thing that changed was I used container_components instead of components. Can someone help me out with this?

Expected result

Compilation with no issues.

Impacted by this bug? Give it a 👍.

fbpo23 commented 11 months ago

It seems that while InputPath does not work, we can simply use the Input class, like in the snippet below

from kfp.dsl import Input, Output, Dataset

@dsl.container_component
def create_dataset(dataset: Output[Dataset]):
    return dsl.ContainerSpec(
        image='alpine:3.18.2',
        command=['sh', '-c', '''echo "I am data" > $0'''],
        args=[dataset.path])

@dsl.container_component
def debug_info(dataset: Input[Dataset]):
    return dsl.ContainerSpec(
        image='alpine:3.18.2',
        command=['echo'],
        args=[dataset.path])

@dsl.pipeline(
    name='test-pipeline', description='Some pipeline')
def demo_pipeline():
    dataset_task = create_dataset()
    debug_info(dataset=dataset_task.output)

I'll consider this issue solved!