kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.54k stars 1.6k forks source link

[feature] Support parameter inputs for V2 Kubernetes_platform Spec #10534

Open Tomcli opened 6 months ago

Tomcli commented 6 months ago

Feature Area

In V1, parameter inputs can be used within some Kubernetes container spec using the native Argo/Tekton variables. However, when we switched to the V2 driver approach, we don't update the input parameters in the Kubernetes_platform Spec. This is due to we haven't define a syntax for KFP variables in the Kubernetes_platform Spec context.

To support this, we need to:

  1. Define a string representation for KFP variables in the kubernetes_platform spec context. Can be something like {{tasks.taskname.param_name}} for referring task parameters and {{loops.inputs.param_name}} for loop parameters. Here are the syntax references from the Argo and Tekton community: https://argo-workflows.readthedocs.io/en/latest/variables/#steps-templates https://tekton.dev/docs/pipelines/variables/#variables-available-in-a-task

  2. Update the initPodSpecPatch function in the driver to replace these variables from MLMD before applying to the pod_spec_patch.

What feature would you like to see?

What is the use case or pain point?

Is there a workaround currently?


Love this idea? Give it a 👍.

github-actions[bot] commented 4 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

HumairAK commented 3 months ago

/remove lifecycle/stale

revit13 commented 3 months ago

Here is a example of an error we get trying to pass input parameters as argument to kfp component:

import kfp.dsl as dsl
from kfp import compiler

@dsl.component(base_image="python:3.10")
def execute_job(
   params: dict,
):
  print(f"in execute_job")

# Pipeline to invoke execution
@dsl.pipeline(
    name="dummy-pipeline",
    description="",
)
def dummy_pipeline(
    iterations: int = 7, # number of iterations
):
   dummy_dict: dict = {"iterations": iterations}

   job = execute_job(
       params=dummy_dict,
   )

if __name__ == "__main__":
   # Compiling the pipeline
   compiler.Compiler().compile(dummy_pipeline, __file__.replace(".py", ".yaml"))

the error:

Traceback (most recent call last):
  File "/home/eres/data-prep-lab/kfp/transform_workflows/universal/noop/example1.py", line 16, in <module>
    def dummy_pipeline(
  File "/home/eres/data-prep-lab/kfp/transform_workflows/venv/lib/python3.10/site-packages/kfp/dsl/pipeline_context.py", line 65, in pipeline
    return component_factory.create_graph_component_from_func(
  File "/home/eres/data-prep-lab/kfp/transform_workflows/venv/lib/python3.10/site-packages/kfp/dsl/component_factory.py", line 673, in create_graph_component_from_func
    return graph_component.GraphComponent(
  File "/home/eres/data-prep-lab/kfp/transform_workflows/venv/lib/python3.10/site-packages/kfp/dsl/graph_component.py", line 68, in __init__
    pipeline_spec, platform_spec = builder.create_pipeline_spec(
  File "/home/eres/data-prep-lab/kfp/transform_workflows/venv/lib/python3.10/site-packages/kfp/compiler/pipeline_spec_builder.py", line 1919, in create_pipeline_spec
    build_spec_by_group(
  File "/home/eres/data-prep-lab/kfp/transform_workflows/venv/lib/python3.10/site-packages/kfp/compiler/pipeline_spec_builder.py", line 1272, in build_spec_by_group
    subgroup_task_spec = build_task_spec_for_task(
  File "/home/eres/data-prep-lab/kfp/transform_workflows/venv/lib/python3.10/site-packages/kfp/compiler/pipeline_spec_builder.py", line 309, in build_task_spec_for_task
    to_protobuf_value(input_value))
  File "/home/eres/data-prep-lab/kfp/transform_workflows/venv/lib/python3.10/site-packages/kfp/compiler/pipeline_spec_builder.py", line 74, in to_protobuf_value
    fields={k: to_protobuf_value(v) for k, v in value.items()}))
  File "/home/eres/data-prep-lab/kfp/transform_workflows/venv/lib/python3.10/site-packages/kfp/compiler/pipeline_spec_builder.py", line 74, in <dictcomp>
    fields={k: to_protobuf_value(v) for k, v in value.items()}))
  File "/home/eres/data-prep-lab/kfp/transform_workflows/venv/lib/python3.10/site-packages/kfp/compiler/pipeline_spec_builder.py", line 80, in to_protobuf_value
    raise ValueError('Value must be one of the following types: '
ValueError: Value must be one of the following types: str, int, float, bool, dict, and list. Got: "{{channel:task=;name=iterations;type=Integer;}}" of type "<class 'kfp.dsl.pipeline_channel.PipelineParameterChannel'>".

pip list | grep kfp kfp 2.7.0 kfp-kubernetes 1.2.0

HumairAK commented 2 months ago

/assign @gmfrasca

gmfrasca commented 2 months ago

I think this may have actually been resolved by https://github.com/kubeflow/pipelines/pull/10883 , which was accepted and included in the very recently cut v2.8.0 SDK. Using that version, I am able to compile the example pipeline provided by @revit13 with no errors (and I am able to replicate the error seen with the previous SDK v2.7.0). I added a simple print statement to the execute_job component to verify the value is successfully passed, and was able to confirm this as well.

With that said, I'm interested to hear from the community and/or @Tomcli to ensure this is truly resolved - the PR mentioned above doesn't seem to utilize the prescribed solution path from the Issue Description, as it only updates the SDK code and not the v2 driver, so want to make sure we're not missing anything here.

gregsheremeta commented 2 weeks ago

With that said, I'm interested to hear from the community and/or Tomcli to ensure this is truly resolved - the PR mentioned above doesn't seem to utilize the prescribed solution path from the Issue Description, as it only updates the SDK code and not the v2 driver, so want to make sure we're not missing anything here.

Exactly what you said -- it doesn't update driver / pod spec patch, so it hasn't solved the problem originally described.

@DharmitD and I have taken a stab at solving it in https://github.com/opendatahub-io/data-science-pipelines/pull/71

Our solution doesn't take loops into account, so we would need to rework it a bit. But the POC works for setting a secret name to mount at pipeline run time.

gregsheremeta commented 1 week ago

We talked about this in the KFP community call today, and @chensun suggested an alternative approach to using templating. He's going to post an example here (thanks Chen!)