kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.6k stars 1.62k forks source link

[feature] Make `set_env_variable` compatible with the channels [KFP V2] #10111

Closed alexdashkov closed 1 month ago

alexdashkov commented 1 year ago

Feature Area

/area sdk

What feature would you like to see?

I want to be able to use values from the parameters and/or outputs to set environment variables in KFP v2

What is the use case or pain point?

I'm trying to pass a variable in the parameters of the job execution and then set it as a variable. Example:


@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
    env: str = "dev",
):
    great_operator_op = great_operator().set_env_variable("env", env)

However current code raises an exception:

TypeError: Cannot set ml_pipelines.PipelineDeploymentConfig.PipelineContainerSpec.EnvVar.value to {{channel:task=;name=env;type=String;}}: {{channel:task=;name=env;type=String;}} has type <class 'kfp.dsl.pipeline_channel.PipelineParameterChannel'>, but expected one of: (<class 'bytes'>, <class 'str'>) for field EnvVar.value

I've tried to pass trough the intermediate task to cast a type (it helped me with the similar situation to cast parameters types ) but it doesn't work neither.

@dsl.component()
def convert_env_to_string(environment: str) -> str:
    return environment

@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
    env: str = "dev",
):
    convert_env_to_string_op = convert_env_to_string(environment=env)
    great_operator_op = great_operator().set_env_variable("env", convert_env_to_string_op.output)
TypeError: Cannot set ml_pipelines.PipelineDeploymentConfig.PipelineContainerSpec.EnvVar.value to {{channel:task=convert-env-to-string;name=Output;type=String;}}: {{channel:task=convert-env-to-string;name=Output;type=String;}} has type <class 'kfp.dsl.pipeline_channel.PipelineParameterChannel'>, but expected one of: (<class 'bytes'>, <class 'str'>) for field EnvVar.value

Is there a workaround currently?

It's possible to don't use an environment at all, but it's not really a workaround.


Love this idea? Give it a 👍.

Davidnet commented 1 year ago

As an alternative, would you consider to pass as a parameter to great_operator instead?

alexdashkov commented 1 year ago

Hi @Davidnet, yes. I should have been more clear. That's exactly what I meant by don't use an environment at all. This solution works and I'll use it. But still I have to move out from the environment variables. I can imagine even deeper solution by passing this value as a param and then, inject it using for example environ but it looks really hackish.

@dsl.component()
def convert_env_to_string(env: str) -> str:
    import os
    os.environ["env"] = env
    # rest of the code

@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
    env: str = "dev",
):
    great_operator_op = great_operator(env=env)
ViktorWelbers commented 11 months ago

Hi, I am also struggling with this.

I am using the env variables to set the image to a dynamic version inside of the components like so:


@dsl.component(
    base_image=os.environ.get("BASE_IMAGE"),
)
def some_op(foo: str):
...

@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
    base_image: str = "IMAGE_ID"
):
  op = (
       some_op(
            foo="bar",
        )
        .set_env_variable(name="BASE_IMAGE", value=base_image)
    )

Is there any alternative way of doing this? Otherwise I would like to help out on implementing this feature.

alexdashkov commented 11 months ago

@ViktorWelbers , are you sure that you're looking for the same feature? It looks like it won't help you with your problem. If you want it to fill base_image for your component only, you should provide this variable during pipeline compilation, not in runtime.

Something like that:

import os
BASE_IMAGE = =os.environ.get("BASE_IMAGE")

@dsl.component(
    base_image=BASE_IMAGE
)
def some_op(foo: str):
    ...

@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
    base_image: str = "IMAGE_ID"
):
  op = (
       some_op(
            foo="bar",
        )
    )

From my understanding, base_image from the dsl.component won't be accessible at runtime even if you would have a possibility to set env variable from the parameters. So if you really want to change an image from parameters I guess you might want to go trough the container component. Something like that:

from kfp import dsl

@dsl.container_component
def some_op(foo:str, base_image: str):
    return dsl.ContainerSpec(image=base_image, command=['echo'], args=[foo])

@dsl.pipeline(name="useful-pipeline")
def training_pipeline(
    base_image: str = "IMAGE_ID"
):
  op = (
       some_op(
            foo="bar",
            base_image=base_image
        )
    )
ViktorWelbers commented 11 months ago

@alexdashkov

I want to reuse the same component logic with different base images.

You are totally correct. I think the second option you posted is what I was looking for. Thank you.

fw8 commented 8 months ago

Same problem with

kubernetes.use_secret_as_env(task, secret_name=secret_name_from_pipeline_args,...)
github-actions[bot] commented 6 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

alexdashkov commented 6 months ago

A comment to remove stale label as I still want to have this feature

alexdashkov commented 4 months ago

@beoygnas Where do you run your pipeline? Do you have an on-premise kubeflow instance? I'm using GCP's vertex ai pipelines and it does not pass there. I can compile a pipeline, but then an executor fails.

com.google.cloud.ai.platform.common.errors.AiPlatformException: code=INVALID_ARGUMENT, message=List of found errors: 1.Field: job_spec.worker_pool_specs[0].container_spec.env[0].value; Message: Required field is not set. , cause=null; Failed to create custom job for the task. 
github-actions[bot] commented 1 month ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 1 month ago

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.