kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.55k stars 1.6k forks source link

dsl.pipeline does not allow sending paramter_values to Vertex CustomTrainingJobOp at runtime #10902

Closed pthieu closed 1 week ago

pthieu commented 3 months ago

Environment

google-cloud-aiplatform==1.38.1 google-cloud-core==2.4.1 google-cloud-pipeline-components==2.8.0


### Steps to reproduce
In this code:

from kfp import compiler, dsl from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp ... @dsl.pipeline(name="Training Pipeline", description="Training pipeline on Vertex AI") def training_pipeline(datetime_suffix: str): train_ranking_job_name = "train-ranking-model" train_ranking_job = CustomTrainingJobOp( project=PROJECT_ID, display_name=train_ranking_job_name, location="us-west1", worker_pool_specs=[ { "machine_spec": { "machine_type": "n1-standard-16", "accelerator_type": "NVIDIA_TESLA_T4", # See: https://cloud.google.com/vertex-ai/docs/training/configure-compute#gpu-compatibility-table "accelerator_count": 1, }, "replica_count": "1", "container_spec": { "image_uri": RANKING_TRAIN_IMAGE, "env": [ { "name": "ENTRYPOINT_SCRIPT", "value": "trainer.ranking.task", }, { "name": "ENVIRONMENT", "value": "prod", }, { "name": "MODEL_SUFFIX", "value": datetime_suffix, }, ], }, } ], ).set_display_name(train_ranking_job_name)

pipeline submit code:

from google.cloud import aiplatform from datetime import datetime

def main(request=None): if request: request = request.get_data()

now = datetime.now()
datetime_suffix = now.strftime("%Y%m%d-%H%M%S")
# timestamp = int(now.timestamp())

job = aiplatform.PipelineJob(
    display_name="commit-recsys-train-pipeline",
    template_path="gs://commit-recsys/artifacts/pipeline.yaml",
    pipeline_root="gs://commit-recsys/logs",
    project="commitlab",
    location="us-west1",
    failure_policy="fast",
    enable_caching=False,
    parameter_values={
        "datetime_suffix": datetime_suffix,
        # "timestamp": timestamp,
    },
)

job.submit()

return "Pipeline triggered"

I get the error:

raise ValueError('Value must be one of the following types: ' ValueError: Value must be one of the following types: str, int, float, bool, dict, and list. Got: "{{channel:task=;name=datetime_suffix;type=String;}}" of type "<class 'kfp.dsl.pipeline_channel.PipelineParameterChannel'>".



### Expected result

I expect the parameter `datetime_suffix` to be populated with a string and set the MODEL_SUFFIX env var in the `CustomTrainingJobOp`

---

<!-- Don't delete message below to encourage users to support your issue! -->
Impacted by this bug? Give it a 👍. 
github-actions[bot] commented 1 month ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 1 week ago

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.