kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.62k stars 1.63k forks source link

[sdk] Unable to pass env variables as pipeline parameters #11395

Open tumido opened 1 day ago

tumido commented 1 day ago

Something weird is going on with env variables and pipeline parameters in the SDK:

Environment

Steps to reproduce

Reference a pipeline param directly:

  1. Have a pipeline.py:

    from kfp import compiler, dsl
    
    @dsl.component()
    def op():
        import os
        import pprint as pp
        pp.pprint(dict(os.environ))
    
    @dsl.pipeline()
    def pipeline(param: str): # <-- Pipeline param declared here
        task = op()
        task.set_env_variable(name="FOO", value=param) # <-- Value passed here
    
    if __name__ == "__main__":
        compiler.Compiler().compile(pipeline, "pipeline.yaml")
    
  2. Try compiling it:
    $ python pipeline.py
    ...
      File ".../lib/python3.12/site-packages/kfp/compiler/pipeline_spec_builder.py", line 642, in build_container_spec_for_task
        .EnvVar(name=name, value=value)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    TypeError: bad argument type for built-in operation

Reference a pipeline param directly with default value "":

  1. Have a pipeline.py:

    from kfp import compiler, dsl
    
    @dsl.component()
    def op():
        import os
        import pprint as pp
        pp.pprint(dict(os.environ))
    
    @dsl.pipeline()
    def pipeline(param: str = ""): # <-- Pipeline param declared here with default value
        task = op()
        task.set_env_variable(name="FOO", value=param) # <-- Value passed here
    
    if __name__ == "__main__":
        compiler.Compiler().compile(pipeline, "pipeline.yaml")
    
  2. Try compiling it:
    $ python pipeline.py
    ...
      File ".../lib/python3.12/site-packages/kfp/compiler/pipeline_spec_builder.py", line 642, in build_container_spec_for_task
        .EnvVar(name=name, value=value)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    TypeError: bad argument type for built-in operation

Templated/indirect reference (probably missuse of the DSL:

  1. Have a pipeline.py:

    from kfp import compiler, dsl
    
    @dsl.component()
    def op():
        import os
        import pprint as pp
        pp.pprint(dict(os.environ))
    
    @dsl.pipeline()
    def pipeline(param: str): # <-- Pipeline param declared here
        task = op()
        task.set_env_variable(name="FOO", value=f"{param}") # <-- Value passed here as templated string reference
    
    if __name__ == "__main__":
        compiler.Compiler().compile(pipeline, "pipeline.yaml")
    
  2. Compiles successfully

  3. See the IR YAML:

    ...
    deploymentSpec:
      executors:
        exec-op:
          container:
            args:
            - --executor_input
            - '{{$}}'
            - --function_to_execute
            - op
            command:
            - sh
            - -c
            - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
              \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
              \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.10.1'\
              \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
              $0\" \"$@\"\n"
            - sh
            - -ec
            - 'program_path=$(mktemp -d)
    
              printf "%s" "$0" > "$program_path/ephemeral_component.py"
    
              _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"
    
              '
            - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
              \ *\n\ndef op():\n    import os\n    import pprint as pp\n    pp.pprint(dict(os.environ))\n\
              \n"
            env:
            - name: FOO
              value: '{{channel:task=;name=param;type=String;}}'
            image: python:3.9
    ...
  4. Submit the pipeline and see the logs. The environment variable value is {{channel:task=;name=param;type=String;}} and it never gets templated.

    ...
    
    'FOO': '{{channel:task=;name=param;type=String;}}',
    ...

Expected result

Be able to pass a pipeline parameter to environment variable of a component.

Materials and Reference


Impacted by this bug? Give it a 👍.

tumido commented 1 day ago

Same thing actually happens for kfp.kubernetes.use_secret_as_volume etc:

  1. Pipeline:
    
    from kfp import compiler, dsl
    from kfp.kubernetes import use_secret_as_volume

@dsl.component() def op(): pass

@dsl.pipeline() def pipeline(param: str): # <-- Pipeline param declared here task = op() use_secret_as_volume(task=task, secret_name=param, mount_path="/tmp") # <-- Value passed here as templated string reference

if name == "main": compiler.Compiler().compile(pipeline, "pipeline.yaml")


2. Try to compile it:
```bash
$ python pipeline.py
...
  File ".../lib/python3.12/site-packages/kfp/kubernetes/secret.py", line 81, in use_secret_as_volume
    secret_as_vol = pb.SecretAsVolume(
                    ^^^^^^^^^^^^^^^^^^
TypeError: bad argument type for built-in operation
tumido commented 23 hours ago

Is this the same as: https://github.com/kubeflow/pipelines/issues/10914

But just manifesting in a different place?