kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.55k stars 1.6k forks source link

[feature] Parse Iterables for PipelineParameterChannels #10840

Open zazulam opened 3 months ago

zazulam commented 3 months ago

Feature Area

/area sdk

What feature would you like to see?

During the build stage of the pipeline spec the kfp.dsl.pipeline_spec_builder.build_task_spec_for_task method loops through the tasks' inputs and based on the data type of the input performs certain logic based on that type. I would like to see additional logic to check for PipelineParameterChannels/PipelineArtifactChannels within common python iterables that are used as kfp inputs i.e. list & dict as the current state only checks for those types on the object it is looping over rather than going one level deep before sending the input to the _to_protobuf_value method.

What is the use case or pain point?

My team currently uses a custom mapping of our components to allow for our end users to dynamically build pipelines definitions in v1. We store our components output dictionary within another dictionary and reference those outputs as we iterate over the required components to run. This feature can assist in dynamic pipeline definitions and also helps with minimizing code changes/design for migration to v2.

Is there a workaround currently?

I am currently unaware of any workaround at this moment


Love this idea? Give it a 👍.

github-actions[bot] commented 1 month ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 1 month ago

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

zazulam commented 2 weeks ago

/reopen

google-oss-prow[bot] commented 2 weeks ago

@zazulam: Reopened this issue.

In response to [this](https://github.com/kubeflow/pipelines/issues/10840#issuecomment-2315623563): >/reopen Instructions for interacting with me using PR comments are available [here](https://git.k8s.io/community/contributors/guide/pull-requests.md). If you have questions or suggestions related to my behavior, please file an issue against the [kubernetes/test-infra](https://github.com/kubernetes/test-infra/issues/new?title=Prow%20issue:) repository.
zazulam commented 2 weeks ago

I created some examples of what I've attempted in v2.

  1. Attempt using kfp Artifacts / List[Datasets]
    
    from typing import List
    from kfp.dsl import Dataset, component, pipeline, Output, Input

@component(base_image="python:3.9",) def create_dataset_paths(name:str, out_dfs: Output[Dataset], input_dfs:List[Dataset]=None):

if input_dfs:
    input_df_paths = {input_df.name:input_df.metadata for input_df in input_dfs}
    print(input_df_paths)

dataset_paths = {
    'wine': 's3://my-bucket/datasets/wine_dataset.csv',
    'iris': 's3://my-bucket/datasets/iris_dataset.csv',
    'cancer': 's3://my-bucket/datasets/cancer_dataset.csv'
}

out_dfs.name = f'{name}_dfs'
out_dfs.metadata = dataset_paths

@component(base_image="python:3.9",) def process_datasets(dataset_artifact: Input[Dataset]):

dataset_paths = dataset_artifact.metadata

for name, path in dataset_paths.items():
    print(f"Looking at {name} dataset at S3 path: {path}")

@pipeline(name="dynamic-pipeline-example") def dynamic_pipeline(): fruits = { 'apple': ['banana', 'orange'], 'banana': ['orange'], 'orange': [], } sorted_fruits = dict(sorted(fruits.items(), key=lambda item: len(item[1]))) output_pool = {} for fruit, children in sorted_fruits.items(): if children: current_task = create_dataset_paths(name=fruit, input_dfs=[output_pool[child] for child in children]) else: current_task = create_dataset_paths(name=fruit) output_pool[fruit] = current_task.outputs["out_dfs"] process_datasets(dataset_artifact=current_task.outputs["out_dfs"])

endpoint = 'http://localhost:80' kfp_client = kfp.client.Client(host=endpoint) run = kfp_client.create_run_from_pipeline_func( dynamic_pipeline, arguments={}, )

That results in the following error:

Traceback (most recent call last): File "/mnt/c/Users/zaz/Projects/Python/KFP/dataset_usage.py", line 30, in @pipeline(name="dynamic-pipeline-example") ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/pipeline_context.py", line 65, in pipeline return component_factory.create_graph_component_from_func( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/component_factory.py", line 673, in create_graph_component_from_func return graph_component.GraphComponent( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/graph_component.py", line 58, in init pipeline_outputs = pipeline_func(*args_list) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/c/Users/zaz/Projects/Python/KFP/dataset_usage.py", line 41, in dynamic_pipeline current_task = create_dataset_paths(name=fruit, input_dfs=[output_pool[child] for child in children]) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/base_component.py", line 101, in call return pipeline_task.PipelineTask( ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/pipeline_task.py", line 118, in init type_utils.verify_type_compatibility( File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/types/type_utils.py", line 330, in verify_type_compatibility raise InconsistentTypeException(error_text) kfp.dsl.types.type_utils.InconsistentTypeException: Incompatible argument passed to the input 'input_dfs' of component 'create-dataset-paths': Argument type 'LIST' is incompatible with the input type 'List[system.Dataset@0.0.1]'


2. Using a PipelineArtifactChannel for those components result in this error:

Traceback (most recent call last): File "/mnt/c/Users/zaz/Projects/Python/KFP/artifact_collections.py", line 95, in @pipeline(name="dynamic-pipeline-example") ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/pipeline_context.py", line 65, in pipeline return component_factory.create_graph_component_from_func( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/component_factory.py", line 673, in create_graph_component_from_func return graph_component.GraphComponent( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/graph_component.py", line 68, in init pipeline_spec, platform_spec = builder.create_pipeline_spec( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 1919, in create_pipeline_spec build_spec_by_group( File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 1272, in build_spec_by_group subgroup_task_spec = build_task_spec_for_task( ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 309, in build_task_spec_for_task to_protobuf_value(input_value)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 78, in to_protobuf_value values=[to_protobuf_value(v) for v in value])) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 78, in values=[to_protobuf_value(v) for v in value])) ^^^^^^^^^^^^^^^^^^^^ File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 80, in to_protobuf_value raise ValueError('Value must be one of the following types: ' ValueError: Value must be one of the following types: str, int, float, bool, dict, and list. Got: "{{channel:task=create-dataset-paths;name=orange;type=Dataset;}}" of type "<class 'kfp.dsl.pipeline_channel.PipelineArtifactChannel'>".



This ValueError was the same error I would receive when processing a dictionary input to a component where said dict was dynamically created during client-side compilation when setting the values to outputs of previous components. However, the type of the argument received was a PipelineParameterChannel.

When modifying the `kfp.dsl.pipeline_spec_builder.build_task_spec_for_task` to perform a check on lists/dicts for `PipelineParameterChannels` and `PipelineArtifactChannels` I was able to successful compile and have a properly connected DAG render in the UI based on the component outputs that would be fed in via that dictionary.

It seems to me that the `List[Dataset]` is something that can only be retrieved as a component output by having a component return an object of that type rather than declaring that list within the pipeline during compilation.

Is there an existing solution or method of collecting component outputs and feeding them into downstream components programmatically? My team uses this capability in v1 as we generate dynamic DAGs using the same pipeline definition function.