kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.56k stars 1.61k forks source link

[feature] Compile processing of PipelineParam #7706

Closed kawofong closed 2 weeks ago

kawofong commented 2 years ago

Feature Area

/area sdk

What feature would you like to see?

Currently, KFP compiler compiles pipeline arguments into PipelineParam. Hence, it alters the string processing behavior of pipeline arguments at compile-time and runtime.

What is the use case or pain point?

I am building a Vertex Pipeline for batch prediction using Kubeflow Pipeline SDK. I want to process the pipeline argument bq_table as a string. The pipeline compiles successfully but it fails at runtime (see screenshot below).

@dsl.pipeline(name='batch-prediction-pipeline')
def batch_prediction_pipeline(
    project: str,
    region: str,
    gcs_source: str,
    bq_location: str = 'us',
    bq_table: str = 'ml.testing_data',
):
    bq_dataset = str(bq_table).split('.')[0]  # parse BigQuery dataset name
    bq_table_uri = f'{project}.{bq_table}'
    bq_dataset_uri = f'{project}.{bq_dataset}'
    bq_query = f'''CREATE OR REPLACE EXTERNAL TABLE
                    {bq_table_uri}
                OPTIONS (
                    format = 'CSV',
                    uris = ['{gcs_source}']
                );'''

    bq_query_op = BigqueryQueryJobOp(
        project=project,
        location=bq_location,
        query=bq_query
    )
.
.
.
    batch_op = gcc_aip.ModelBatchPredictOp(
        project=project,
        location=region,
        bigquery_source_input_uri=f'bq://{bq_table_uri}',
        bigquery_destination_output_uri=f'bq://{bq_dataset_uri}',
    ).after(bq_query_op)

image

As seen in the screenshot, the value of bigquery_destination_output_uri doesn't include the split logic which is implemented. In this case, I am using the pre-built Google Cloud Pipeline Components so I can't change the component implementation.

Other related issues: #2725, #2937

Is there a workaround currently?

Yes. I can define more granular pipeline arguments (e.g. bq_dataset) to support my use case.


Love this idea? Give it a 👍. We prioritize fulfilling features with the most 👍.

TheTravellingSalesman commented 2 years ago

I've also been wrestling with this for a while now.

Would like to highlight another problem that comes from the same behavior -- if you try to add string PipelineParams to a list within a component, the compiler fails because PipelineParams are not JSON serializable.

For example:

from typing import List
@component
def submit_dataproc_job(job_args: List):
    # submit job using dataproc client -- job args for dataproc jobs have to be strings

@pipeline 
def example_pipeline(
    dataproc_job_arg1: str,
    dataproc_job_arg2: str,
    dataproc_job_arg3: str,
):
    submit_dp_job_task = submit_dataproc_job(
        job_args=[
            dataproc_job_arg1,
            dataproc_job_arg2,
            dataproc_job_arg3,
        ]
    )

if __name__ == "__main__":
    compiler.Compiler().compile(
        pipeline_func=example_pipeline,
        package_path=f"example_pipeline.json",
    )

Running this code results in TypeError: Object of type PipelineParam is not JSON serializable

If you try to cast the parameter values to strings (which you'd expect them to be handled as, given the type hint) like so:

job_args=[
        str(dataproc_job_arg1),
        str(dataproc_job_arg2),
        str(dataproc_job_arg3),
]

you get an error like the original one shown in this issue. From the VAI UI, job_args is type string and set to:

["{{pipelineparam:op=;name=dataproc_job_arg1}}", "{{pipelineparam:op=;name=dataproc_job_arg2}}", "{{pipelineparam:op=;name=dataproc_job_arg3}}"]

Worth noting I have been able to work around this by passing in job_args as a pipeline level parameter as

@pipeline 
def example_pipeline(job_args: List):
    submit_dp_job_task = submit_dataproc_job(
        job_args=job_args
    )

The problem here is that I'd like to be able to use the original pipeline parameters across several components without having to unpack them from a list. Discrete parameters are also more convenient for comparing runs.


If we were able to access the value of a PipelineParam, working around this behavior wouldn't be too difficult. Perhaps that's already possible? My IDE only provides completion hints for str because of the type hint on these params.

EDIT: It is absolutely possible to access the value of a PipelineParam by calling .value, but the values all come out to null.

The docs for PipelineParam say that value is "The actual value of the PipelineParam. If provided, the PipelineParam is 'resolved' immediately. For now, we support string only." I'm not sure where PipelineParams are created, but I'd assume it's in the pipeline decorator logic. I'd have expected that the PipelineParam value would be set based on the default value in the pipeline function definition, if one was set, and then updated later when the RuntimeConfigBuilder is created, using its _parameter_values property which is defined when an actual pipeline run is started.

yonaroz commented 1 year ago

Has anyone succeeded with retrieving the value parameter from within the pipeline function and constructing a new string with it? I couldn't find any reference of doing so https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.dsl.html#kfp.dsl.PipelineParam

TheTravellingSalesman commented 1 year ago

@yonaroze

In short, no, it isn’t possible to manipulate pipeline parameters outside of components.

What we do instead is create components which preprocess pipeline parameters into the form that’s consumable by your components.

DennisRutjes commented 11 months ago

I think this is not a feature it should be designated as a BUG! I also ran into this problem pipeline params are changed during compile time and you cannot get the functionality of the original parameter like split etc.

e.g. ValueError: Value must be one of the following types: str, int, float, bool, dict, and list. Got: "{{channel:task=;name=tableRef;type=String;}}" of type "<class 'kfp.dsl.pipeline_channel.PipelineParameterChannel'>".

@dsl.pipeline(name="BigQuery_to_Parquet", description="Generic pipeline to extract a table to parquet file")
def bigquery_to_parquet(project: str, location: str, service_account: str,
                        temp_location: str, staging_location: str, tableRef: str, bucket:str
                        ) -> None:
    from google_cloud_pipeline_components.v1.wait_gcp_resources import WaitGcpResourcesOp
    from google_cloud_pipeline_components.preview.dataflow import DataflowFlexTemplateJobOp

    train_dataset_op = DataflowFlexTemplateJobOp(project=project, location=location,
                                                 container_spec_gcs_path="gs://dataflow-templates/2021-12-06-00_RC00/flex/BigQuery_to_Parquet",
                                                 service_account_email=service_account,
                                                 temp_location=temp_location,
                                                 staging_location=staging_location,
                                                 parameters={
                                                     "tableRef": tableRef,
                                                     "bucket": bucket}
                                                 )

    # wait serverless, no costs
    _ = WaitGcpResourcesOp(
        gcp_resources=train_dataset_op.outputs["gcp_resources"]
    )
    return None

and when I change the parameters to dict the third party component falters because it cannot deserialize the JSON it receives

@dsl.pipeline(name="BigQuery_to_Parquet", description="Generic pipeline to extract a table to parquet file")
def bigquery_to_parquet(project: str, location: str, service_account: str,
                        temp_location: str, staging_location: str, parameters: dict
                        ) -> None:

But we can wait as long until this "feature" is getting stale and it will disappear into electronic oblivion

SamAgarwal0 commented 7 months ago

facing a similar issue. We should be able to access and use pipeline parameters outside components...

antongollbo123 commented 4 months ago

Any update on this feature? I have encountered several cases where this would be very useful. For example, when wanting to create tasks for a sequential for-loop, that has to be done in a certain order. It would be really helpful to have some way to do this!

AnnKatrinBecker commented 3 months ago

Any updates? I am currently facing a use case where depending on the inputs I would like to increase the parallelization of my pipeline.

github-actions[bot] commented 1 month ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 2 weeks ago

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.