kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.55k stars 1.6k forks source link

[backend] conditions in componet.yaml does not work properly #11009

Open daro1337 opened 1 month ago

daro1337 commented 1 month ago

Environment

Steps to reproduce

Conditions in the component.yaml file are not working correctly. It appears that the argo-workflow component takes the "IF" condition argument literally. This is something that worked in kubeflow v1.8.0 and KFP 1.8.X (argoworkflow manifest)

here we I wrote dummy pipeline that prints /etc/hosts file. If line_number=true it should add -n argument to cat executable to print file with line numbers.

  1. Compile following using KFP v2.8.0
    
    ---
    name: kfp-busybox
    description: check-conditions

inputs:

implementation: container: args:

  1. compiled component:
components:
  comp-kfp-busybox:
    executorLabel: exec-kfp-busybox
    inputDefinitions:
      parameters:
        file:
          defaultValue: /etc/hosts
          isOptional: true
          parameterType: STRING
        line_number:
          isOptional: true
          parameterType: BOOLEAN
deploymentSpec:
  executors:
    exec-kfp-busybox:
      container:
        args:
        - '{{$.inputs.parameters[''file'']}}'
        - '{"IfPresent": {"InputName": "line_number", "Then": ["-n"]}}'
        command:
        - /bin/cat
        image: busybox:latest
pipelineInfo:
  name: pipeline
root:
  dag:
    tasks:
      kfp-busybox:
        cachingOptions:
          enableCache: true
        componentRef:
          name: comp-kfp-busybox
        inputs:
          parameters:
            file:
              componentInputParameter: file
            line_number:
              componentInputParameter: line_number
        taskInfo:
          name: cat-hosts
  inputDefinitions:
    parameters:
      file:
        parameterType: STRING
      line_number:
        parameterType: BOOLEAN
schemaVersion: 2.1.0
sdkVersion: kfp-2.8.0
---
platforms:
  kubernetes:
    deploymentSpec:
      executors:
        exec-kfp-busybox:
          imagePullSecret:
          - secretName: sdk-docker
  1. upload pipeline to kubeflow platform
  2. run pipeline

Output:

time="2024-07-16T11:16:04.035Z" level=info msg="capturing logs" argo=true
time="2024-07-16T11:16:04.153Z" level=info msg="capturing logs" argo=true
I0716 11:16:04.212214      25 launcher_v2.go:90] input ComponentSpec:{
  "inputDefinitions": {
    "parameters": {
      "file": {
        "parameterType": "STRING",
        "defaultValue": "/etc/hosts",
        "isOptional": true
      },
      "line_number": {
        "parameterType": "BOOLEAN",
        "isOptional": true
      }
    }
  },
  "executorLabel": "exec-kfp-busybox"
}
I0716 11:16:04.213720      25 cache.go:139] Cannot detect ml-pipeline in the same namespace, default to ml-pipeline.kubeflow:8887 as KFP endpoint.
I0716 11:16:04.213766      25 cache.go:116] Connecting to cache endpoint ml-pipeline.kubeflow:8887
# Kubernetes-managed hosts file.
127.0.0.1   localhost
::1 localhost ip6-localhost ip6-loopback
fe00::0 ip6-localnet
fe00::0 ip6-mcastprefix
fe00::1 ip6-allnodes
fe00::2 ip6-allrouters
cat: can't open '{"IfPresent": {"InputName": "line_number", "Then": ["-n"]}}': No such file or directory
I0716 11:16:04.443096      25 launcher_v2.go:151] publish success.
F0716 11:16:04.443142      25 main.go:49] failed to execute component: exit status 1
time="2024-07-16T11:16:05.156Z" level=info msg="sub-process exited" argo=true error="<nil>"
Error: exit status 1
time="2024-07-16T11:16:06.037Z" level=info msg="sub-process exited" argo=true error="<nil>"
Error: exit status 1

as you can see here:

cat: can't open '{"IfPresent": {"InputName": "line_number", "Then": ["-n"]}}': No such file or directory

Expected result

Print file with line numbers for instance:

     1  127.0.0.1   localhost
     2  ::1 localhost ip6-localhost ip6-loopback
     3  fe00::0 ip6-localnet
     4  ff00::0 ip6-mcastprefix
     5  ff02::1 ip6-allnodes
     6  ff02::2 ip6-allrouters

Materials and Reference

Impacted by this bug? Give it a 👍.

papagala commented 1 month ago

This issue is critical to us since we have hundreds of pipelines that use the conditions to conditionally preprocess datasets. @juliusvonkohout FYI.

juliusvonkohout commented 1 month ago

@rimolive maybe something for 1.9.1

daro1337 commented 1 month ago

@juliusvonkohout FYI It looks like central dashboard does not support Optional field (all are mandatory) also default value does not work in central dashboard :(

juliusvonkohout commented 1 month ago

@juliusvonkohout FYI It looks like central dashboard does not support Optional field (all are mandatory) also default value does not work in central dashboard :(

@daro1337 you might want to ask @thesuperzapper about this.

papagala commented 3 weeks ago

I was able to successfully pass a default value given this snipped. @daro1337

@dsl.pipeline(name="pipeline-with-loop-output-v2")
def my_pipeline(size: Optional[int] = 5):
    print_before_parallel = print_op(s="something before parallel")
    args_generator = args_generator_op(size=size)
    # parallelism is being ignored. There's an open issue about this
    with dsl.ParallelFor(args_generator.output, parallelism=10) as item:  
        print_op_sleep(s=item).after(print_before_parallel)