kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.59k stars 1.62k forks source link

[backend] Pipeline channel placeholders in formatted string cannot be substituted correctly #11035

Open JasonNS1425 opened 2 months ago

JasonNS1425 commented 2 months ago

Environment

Steps to reproduce

When using the pipeline channel placeholders in formatted string and then passing it as another task input, the downstream PipelineTask often get a non-substituted version of input value. See example for details. (you may have to run the pipeline run a few times to trigger this issue)

Source codes

from kfp import dsl, compiler

@dsl.component(base_image='python:3.10-slim-bookworm')
def print_op(message: str) -> str:
    print(message)
    return message

@dsl.pipeline(name='example-pipeline')
def pipeline(name: str):
    for i in range(1, 6):
        message = f'Hello {name} {i}'
        print_op(message=message)

if __name__ == "__main__":
    pipeline_yaml_path = __file__.replace(".py", ".yaml")
    compiler.Compiler().compile(pipeline, package_path=pipeline_yaml_path)

Pipeline run results

pipeline print-op-2

Hello Kubeflow Pipeline 2 [KFP Executor 2024-07-22 07:46:45,542 INFO]: Wrote executor output file to /tmp/kfp_outputs/output_metadata.json. I0722 07:46:45.553621 22 launcher_v2.go:705] ExecutorOutput: { "parameterValues": { "Output": "Hello Kubeflow Pipeline 2" } }

pipeline print-op-3

Hello {{$.inputs.parameters['pipelinechannel--name']}} 3 [KFP Executor 2024-07-22 07:46:42,415 INFO]: Wrote executor output file to /tmp/kfp_outputs/output_metadata.json. I0722 07:46:42.426454 24 launcher_v2.go:705] ExecutorOutput: { "parameterValues": { "Output": "Hello {{$.inputs.parameters['pipelinechannel--name']}} 3" } }

pipeline print-op-4

Hello Kubeflow Pipeline 4 [KFP Executor 2024-07-22 07:46:45,782 INFO]: Wrote executor output file to /tmp/kfp_outputs/output_metadata.json. I0722 07:46:45.794531 24 launcher_v2.go:705] ExecutorOutput: { "parameterValues": { "Output": "Hello Kubeflow Pipeline 4" } }

pipeline print-op-5

Hello Kubeflow Pipeline 5 [KFP Executor 2024-07-22 07:46:42,380 INFO]: Wrote executor output file to /tmp/kfp_outputs/output_metadata.json. I0722 07:46:42.396266 23 launcher_v2.go:705] ExecutorOutput: { "parameterValues": { "Output": "Hello Kubeflow Pipeline 5" } }


### Expected result

The downstream task should always receive input values with all pipeline placeholders substituted.

### Materials and Reference

<details>
  <summary>Generated YAML</summary>

 ```yaml
  # PIPELINE DEFINITION
  # Name: example-pipeline
  # Inputs:
  #    name: str
  components:
    comp-print-op:
      executorLabel: exec-print-op
      inputDefinitions:
        parameters:
          message:
            parameterType: STRING
      outputDefinitions:
        parameters:
          Output:
            parameterType: STRING
    comp-print-op-2:
      executorLabel: exec-print-op-2
      inputDefinitions:
        parameters:
          message:
            parameterType: STRING
      outputDefinitions:
        parameters:
          Output:
            parameterType: STRING
    comp-print-op-3:
      executorLabel: exec-print-op-3
      inputDefinitions:
        parameters:
          message:
            parameterType: STRING
      outputDefinitions:
        parameters:
          Output:
            parameterType: STRING
    comp-print-op-4:
      executorLabel: exec-print-op-4
      inputDefinitions:
        parameters:
          message:
            parameterType: STRING
      outputDefinitions:
        parameters:
          Output:
            parameterType: STRING
    comp-print-op-5:
      executorLabel: exec-print-op-5
      inputDefinitions:
        parameters:
          message:
            parameterType: STRING
      outputDefinitions:
        parameters:
          Output:
            parameterType: STRING
  deploymentSpec:
    executors:
      exec-print-op:
        container:
          args:
          - --executor_input
          - '{{$}}'
          - --function_to_execute
          - print_op
          command:
          - sh
          - -c
          - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
            \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
            \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\
            \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
            $0\" \"$@\"\n"
          - sh
          - -ec
          - 'program_path=$(mktemp -d)

            printf "%s" "$0" > "$program_path/ephemeral_component.py"

            _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

            '
          - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
            \ *\n\ndef print_op(message: str) -> str:\n    print(message)\n    return\
            \ message\n\n"
          image: python:3.10-slim-bookworm
      exec-print-op-2:
        container:
          args:
          - --executor_input
          - '{{$}}'
          - --function_to_execute
          - print_op
          command:
          - sh
          - -c
          - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
            \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
            \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\
            \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
            $0\" \"$@\"\n"
          - sh
          - -ec
          - 'program_path=$(mktemp -d)

            printf "%s" "$0" > "$program_path/ephemeral_component.py"

            _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

            '
          - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
            \ *\n\ndef print_op(message: str) -> str:\n    print(message)\n    return\
            \ message\n\n"
          image: python:3.10-slim-bookworm
      exec-print-op-3:
        container:
          args:
          - --executor_input
          - '{{$}}'
          - --function_to_execute
          - print_op
          command:
          - sh
          - -c
          - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
            \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
            \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\
            \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
            $0\" \"$@\"\n"
          - sh
          - -ec
          - 'program_path=$(mktemp -d)

            printf "%s" "$0" > "$program_path/ephemeral_component.py"

            _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

            '
          - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
            \ *\n\ndef print_op(message: str) -> str:\n    print(message)\n    return\
            \ message\n\n"
          image: python:3.10-slim-bookworm
      exec-print-op-4:
        container:
          args:
          - --executor_input
          - '{{$}}'
          - --function_to_execute
          - print_op
          command:
          - sh
          - -c
          - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
            \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
            \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\
            \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
            $0\" \"$@\"\n"
          - sh
          - -ec
          - 'program_path=$(mktemp -d)

            printf "%s" "$0" > "$program_path/ephemeral_component.py"

            _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

            '
          - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
            \ *\n\ndef print_op(message: str) -> str:\n    print(message)\n    return\
            \ message\n\n"
          image: python:3.10-slim-bookworm
      exec-print-op-5:
        container:
          args:
          - --executor_input
          - '{{$}}'
          - --function_to_execute
          - print_op
          command:
          - sh
          - -c
          - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
            \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
            \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\
            \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
            $0\" \"$@\"\n"
          - sh
          - -ec
          - 'program_path=$(mktemp -d)

            printf "%s" "$0" > "$program_path/ephemeral_component.py"

            _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

            '
          - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
            \ *\n\ndef print_op(message: str) -> str:\n    print(message)\n    return\
            \ message\n\n"
          image: python:3.10-slim-bookworm
  pipelineInfo:
    name: example-pipeline
  root:
    dag:
      tasks:
        print-op:
          cachingOptions:
            enableCache: true
          componentRef:
            name: comp-print-op
          inputs:
            parameters:
              message:
                runtimeValue:
                  constant: Hello {{$.inputs.parameters['pipelinechannel--name']}} 1
              pipelinechannel--name:
                componentInputParameter: name
          taskInfo:
            name: print-op
        print-op-2:
          cachingOptions:
            enableCache: true
          componentRef:
            name: comp-print-op-2
          inputs:
            parameters:
              message:
                runtimeValue:
                  constant: Hello {{$.inputs.parameters['pipelinechannel--name']}} 2
              pipelinechannel--name:
                componentInputParameter: name
          taskInfo:
            name: print-op-2
        print-op-3:
          cachingOptions:
            enableCache: true
          componentRef:
            name: comp-print-op-3
          inputs:
            parameters:
              message:
                runtimeValue:
                  constant: Hello {{$.inputs.parameters['pipelinechannel--name']}} 3
              pipelinechannel--name:
                componentInputParameter: name
          taskInfo:
            name: print-op-3
        print-op-4:
          cachingOptions:
            enableCache: true
          componentRef:
            name: comp-print-op-4
          inputs:
            parameters:
              message:
                runtimeValue:
                  constant: Hello {{$.inputs.parameters['pipelinechannel--name']}} 4
              pipelinechannel--name:
                componentInputParameter: name
          taskInfo:
            name: print-op-4
        print-op-5:
          cachingOptions:
            enableCache: true
          componentRef:
            name: comp-print-op-5
          inputs:
            parameters:
              message:
                runtimeValue:
                  constant: Hello {{$.inputs.parameters['pipelinechannel--name']}} 5
              pipelinechannel--name:
                componentInputParameter: name
          taskInfo:
            name: print-op-5
    inputDefinitions:
      parameters:
        name:
          parameterType: STRING
  schemaVersion: 2.1.0
  sdkVersion: kfp-2.8.0


Impacted by this bug? Give it a 👍.

github-actions[bot] commented 1 week ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

jaredwillard12 commented 2 days ago

Also hitting this issue -- curious if anyone finds a workaround