kubeflow / kfp-tekton

Kubeflow Pipelines on Tekton
https://developer.ibm.com/blogs/kubeflow-pipelines-with-tekton-and-watson/
Apache License 2.0
175 stars 122 forks source link

load pipeline from yaml #1341

Open omriel1 opened 1 year ago

omriel1 commented 1 year ago

/kind question

Question: Hi! I'm using kfp-tekton = "1.5.4" (currently I have to) and I'm looking for a way to load a pipeline into memory from a yaml file. For example, in kfp V2 I can use kfp.components.load_component_from_file to load the pipeline. When trying it with V1 it seems not to be working (and I cannot find any other way).

Is it possible to? Ideally I'd want to perform the following:

import kfp
from kfp import dsl
from kfp_tekton.compiler import TektonCompiler

def say_hello(name: str) -> str:
    hello_text = f'Hello, {name}!'
    print(hello_text)
    return hello_text

say_hello = kfp.components.create_component_from_func(say_hello)

@dsl.pipeline(name="name", description="description")
def hello_pipeline(recipient: str) -> str:
    hello_task = say_hello(recipient)
    return hello_task.output

if __name__=="__main__":
    TektonCompiler().compile(pipeline_func=hello_pipeline, package_path="./test.yaml")
    pipe = kfp.components.load_component_from_file("./test.yaml")

But it fails with:

TypeError: Error: MetadataSpec.from_dict(struct=OrderedDict([('name', 'name'), ('annotations', OrderedDict([('tekton.dev/output_artifacts', '{"say-hello": [{"key": "artifacts/$PIPELINERUN/say-hello/Output.tgz", "name": "say-hello-Output", "path": "/tmp/outputs/Output/data"}]}'), ('tekton.dev/input_artifacts', '{}'), ('tekton.dev/artifact_bucket', 'mlpipeline'), ('tekton.dev/artifact_endpoint', 'minio-service.kubeflow:9000'), ('tekton.dev/artifact_endpoint_scheme', 'http://'), ('tekton.dev/artifact_items', '{"say-hello": [["Output", "$(results.Output.path)"]]}'), ('sidecar.istio.io/inject', 'false'), ('tekton.dev/template', ''), ('pipelines.kubeflow.org/big_data_passing_format', '$(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME'), ('pipelines.kubeflow.org/pipeline_spec', '{"description": "description", "inputs": [{"name": "recipient", "type": "String"}], "name": "name", "outputs": [{"name": "Output", "type": "String"}]}')])), ('labels', OrderedDict([('pipelines.kubeflow.org/pipelinename', ''), ('pipelines.kubeflow.org/generation', '')]))])) failed with exception:
MetadataSpec.__init__() got an unexpected keyword argument 'name'
Tomcli commented 1 year ago

In KFP V1, a component is only mapped to a containerOp. KFP V2 introduced graph components where you can load a compiled V2 IR into a KFP component.

This is the same behavior with KFP Argo v1 as well, as you will see the similar errors with the original KFP V1 code

import kfp
from kfp import dsl

def say_hello(name: str) -> str:
    hello_text = f'Hello, {name}!'
    print(hello_text)
    return hello_text

say_hello = kfp.components.create_component_from_func(say_hello)

@dsl.pipeline(name="name", description="description")
def hello_pipeline(recipient: str) -> str:
    hello_task = say_hello(recipient)
    return hello_task.output

if __name__=="__main__":
    kfp.compiler.Compiler().compile(pipeline_func=hello_pipeline, package_path="./test.yaml")
    pipe = kfp.components.load_component_from_file("./test.yaml")

Errors with Argo yaml:

TypeError: Error: MetadataSpec.from_dict(struct=OrderedDict([('generateName', 'name-'), ('annotations', OrderedDict([('pipelines.kubeflow.org/kfp_sdk_version', '1.8.22'), ('pipelines.kubeflow.org/pipeline_compilation_time', '2023-09-12T09:51:53.515450'), ('pipelines.kubeflow.org/pipeline_spec', '{"description": "description", "inputs": [{"name": "recipient", "type": "String"}], "name": "name", "outputs": [{"name": "Output", "type": "String"}]}')])), ('labels', OrderedDict([('pipelines.kubeflow.org/kfp_sdk_version', '1.8.22')]))])) failed with exception:
__init__() got an unexpected keyword argument 'generateName'
Error: Structure "OrderedDict([('generateName', 'name-'), ('annotations', OrderedDict([('pipelines.kubeflow.org/kfp_sdk_version', '1.8.22'), ('pipelines.kubeflow.org/pipeline_compilation_time', '2023-09-12T09:51:53.515450'), ('pipelines.kubeflow.org/pipeline_spec', '{"description": "description", "inputs": [{"name": "recipient", "type": "String"}], "name": "name", "outputs": [{"name": "Output", "type": "String"}]}')])), ('labels', OrderedDict([('pipelines.kubeflow.org/kfp_sdk_version', '1.8.22')]))])" is not None.
Error: Structure "OrderedDict([('generateName', 'name-'), ('annotations', OrderedDict([('pipelines.kubeflow.org/kfp_sdk_version', '1.8.22'), ('pipelines.kubeflow.org/pipeline_compilation_time', '2023-09-12T09:51:53.515450'), ('pipelines.kubeflow.org/pipeline_spec', '{"description": "description", "inputs": [{"name": "recipient", "type": "String"}], "name": "name", "outputs": [{"name": "Output", "type": "String"}]}')])), ('labels', OrderedDict([('pipelines.kubeflow.org/kfp_sdk_version', '1.8.22')]))])" is incompatible with type "typing.Optional[kfp.components._structures.MetadataSpec]" - none of the types in Union are compatible.
Tomcli commented 1 year ago

Components like KServe are just a simple containerOp which you can load it with KFP V1 kfp.components.load_component_from_file https://github.com/kubeflow/pipelines/blob/master/components/kserve/component.yaml