kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.63k stars 1.63k forks source link

convert string to GCSPath object or create one #4710

Closed yasersakkaf closed 8 months ago

yasersakkaf commented 4 years ago

I want to convert my string "gs://some_bucket/some_dir" into kfp.dsl.types.GCPPath How do I do it? Or probably I need to create a GCSPath Object with the above GCS path Any ideas?

Bobgy commented 4 years ago

/cc @chensun @numerology @Ark-kun

Ark-kun commented 4 years ago

I assume that you have a component with the "GcsPath" input type and want to pass a constant argument ("gs://some_bucket/some_dir") to it. The KFP SDK should allow you to do just that. You do not need any special conversion. You may pass a constant string as an argument for any input (the string must have correct format of course). When you try to pass a string (only a string), the system assumes that you've manually and correctly serialized the data that you want to pass. So, you can pass "42" to an "Integer" input or pass "[1, 2, 3]" to a "JsonArray" input.

yasersakkaf commented 4 years ago

But when I load a kfp component "ml_engine/deploy" and try to pass it a GCS Path as a string. It gives me an error while converting my python code to DSL. Following is the error:

type name String is different from expected: GCSPath
Traceback (most recent call last):
  File "/opt/conda/bin/dsl-compile", line 8, in <module>
    sys.exit(main())
  File "/opt/conda/lib/python3.7/site-packages/kfp/compiler/main.py", line 123, in main
    compile_pyfile(args.py, args.function, args.output, not args.disable_type_check)
  File "/opt/conda/lib/python3.7/site-packages/kfp/compiler/main.py", line 112, in compile_pyfile
    _compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check)
  File "/opt/conda/lib/python3.7/site-packages/kfp/compiler/main.py", line 71, in _compile_pipeline_function
    kfp.compiler.Compiler().compile(pipeline_func, output_path, type_check)
  File "/opt/conda/lib/python3.7/site-packages/kfp/compiler/compiler.py", line 885, in compile
    package_path=package_path)
  File "/opt/conda/lib/python3.7/site-packages/kfp/compiler/compiler.py", line 941, in _create_and_write_workflow
    pipeline_conf)
  File "/opt/conda/lib/python3.7/site-packages/kfp/compiler/compiler.py", line 791, in _create_workflow
    pipeline_func(*args_list)
  File "pipeline/tf_nlp_training_pipeline.py", line 287, in train
    replace_existing_version=replace_existing_version)
  File "https://raw.githubusercontent.com/kubeflow/pipelines/0.2.5/components/gcp/ml_engine/deploy/component.yaml", line 2, in Deploying a trained model to Cloud Machine Learning Engine
  File "/opt/conda/lib/python3.7/site-packages/kfp/components/_components.py", line 295, in create_task_from_component_and_arguments
    component_ref=component_ref,
  File "/opt/conda/lib/python3.7/site-packages/kfp/components/_dsl_bridge.py", line 33, in _create_container_op_from_component_and_arguments
    dsl.types.verify_type_compatibility(reference_type, input_type, 'Incompatible argument passed to the input "{}" of component "{}": '.format(input_name, component_spec.name))
  File "/opt/conda/lib/python3.7/site-packages/kfp/dsl/types.py", line 128, in verify_type_compatibility
    raise InconsistentTypeException(error_text)
kfp.dsl.types.InconsistentTypeException: Incompatible argument passed to the input "model_uri" of component "Deploying a trained model to Cloud Machine Learning Engine": Argument type "String" is incompatible with the input type "GCSPath"
Ark-kun commented 4 years ago

But when I load a kfp component "ml_engine/deploy" and try to pass it a GCS Path as a string. It gives me an error while converting my python code to DSL.

Are you passing it a constant string? Can you please show a PoC?

P.S. If you're passing not a constant string, but a pipeline parameter, then you can change the type of that pipeline parameter or use the .ignore_type() method.

stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

rfan-debug commented 3 years ago

@Ark-kun I ran into the same issue. The ignore_type() method does solve the problem for me but it removes the type checking in the KFP dsl. It looks not so great.

It would be handy if we could have some static methods like get_gcs_path_from_str(input: str) -> GcsPath to create GCSPath and other objects that extend from the same BaseType. It will help the static type-check in the dsl.

SputnikTea commented 3 years ago

I have a similar problem with the GCPProjectID type and kfp.v2.compiler.

I am trying to use this GCP Component to performe a BigQuery query. Sadly setting type_check=False in the compiler dosen't help, and I don't know how exactly I should apply the Ignore_type() function.

The Error I get is: TypeError: Passing PipelineParam "project_id" with type "String" (as "Parameter") to component input "project_id" with type "GCPProjectID" (as "Artifact") is incompatible. Please fix the type of the component input.

I understand that this is not only a type problem, more a problem of kfp.v2 expecting not primary types to be Artifacts. But how can I provide the project_id as a Artifact in my pipeline definition?

This is my pipeline definition:

@kfp.dsl.pipeline(name=GC_DISPLAY_NAME)
def gc_pipeline(
    query:str=QUERY, 
    project_id:str=PROJECT_ID, 
    dataset_id:str=DATASET_ID, 
    table_id:str=GC_TRAINING_TABLE, 
    dataset_location:str=REGION, 
    job_config:str=''
):
    training_data_op = bigquery_query_op(
        query=query, 
        project_id=project_id, 
        dataset_id=dataset_id, 
        table_id=table_id,  
        dataset_location=dataset_location, 
        job_config=job_config
    )

Compiler:

from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func=gc_pipeline, package_path=GC_DISPLAY_NAME + ".json", type_check=False
)
BorFour commented 3 years ago

I have a similar problem with the GCPProjectID type and kfp.v2.compiler.

I am trying to use this GCP Component to performe a BigQuery query. Sadly setting type_check=False in the compiler dosen't help, and I don't know how exactly I should apply the Ignore_type() function.

The Error I get is: TypeError: Passing PipelineParam "project_id" with type "String" (as "Parameter") to component input "project_id" with type "GCPProjectID" (as "Artifact") is incompatible. Please fix the type of the component input.

I understand that this is not only a type problem, more a problem of kfp.v2 expecting not primary types to be Artifacts. But how can I provide the project_id as a Artifact in my pipeline definition?

This is my pipeline definition:

@kfp.dsl.pipeline(name=GC_DISPLAY_NAME)
def gc_pipeline(
    query:str=QUERY, 
    project_id:str=PROJECT_ID, 
    dataset_id:str=DATASET_ID, 
    table_id:str=GC_TRAINING_TABLE, 
    dataset_location:str=REGION, 
    job_config:str=''
):
    training_data_op = bigquery_query_op(
        query=query, 
        project_id=project_id, 
        dataset_id=dataset_id, 
        table_id=table_id,  
        dataset_location=dataset_location, 
        job_config=job_config
    )

Compiler:

from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func=gc_pipeline, package_path=GC_DISPLAY_NAME + ".json", type_check=False
)

I am having the same problem. It seems the v2 of the kfp compiler is having trouble these type conversions.

TrsNium commented 3 years ago

I have same issue(https://github.com/kubeflow/pipelines/issues/4710#issuecomment-849465918). I also tried the following method, but it failed with error.


from kfp.dsl.types import GCPProjectID

@dsl.pipeline(...)
def feature_extract_pipeline(
    query:str, 
    project_id: GCPProjectID, 
    dataset_id:str, 
    table_id:str, 
    dataset_location:str, 
    job_config:str
):
    training_data_op = bigquery_query_op(
        query=query, 
        project_id=project_id, 
        dataset_id=dataset_id, 
        table_id=table_id,  
        dataset_location=dataset_location, 
        job_config=job_config
    )
  File "/Users/takuya.hirata/.pyenv/versions/3.7.1rc1/lib/python3.7/site-packages/kfp/v2/compiler/compiler.py", line 1122, in compile
    pipeline_parameters_override=pipeline_parameters)
  File "/Users/takuya.hirata/.pyenv/versions/3.7.1rc1/lib/python3.7/site-packages/kfp/v2/compiler/compiler.py", line 1030, in _create_pipeline_v2
    pipeline_meta = _python_op._extract_component_interface(pipeline_func)
  File "/Users/takuya.hirata/.pyenv/versions/3.7.1rc1/lib/python3.7/site-packages/kfp/components/_python_op.py", line 373, in _extract_component_interface
    type_struct = annotation_to_type_struct(parameter_type)
  File "/Users/takuya.hirata/.pyenv/versions/3.7.1rc1/lib/python3.7/site-packages/kfp/components/_python_op.py", line 310, in annotation_to_type_struct
    annotation = annotation.to_dict()
TypeError: to_dict() missing 1 required positional argument: 'self'

I wait for this document(https://github.com/kubeflow/pipelines/issues/5801) to be created.

stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

TheTravellingSalesman commented 3 years ago

I have a similar problem with the GCPProjectID type and kfp.v2.compiler. I am trying to use this GCP Component to performe a BigQuery query. Sadly setting type_check=False in the compiler dosen't help, and I don't know how exactly I should apply the Ignore_type() function. The Error I get is: TypeError: Passing PipelineParam "project_id" with type "String" (as "Parameter") to component input "project_id" with type "GCPProjectID" (as "Artifact") is incompatible. Please fix the type of the component input. I understand that this is not only a type problem, more a problem of kfp.v2 expecting not primary types to be Artifacts. But how can I provide the project_id as a Artifact in my pipeline definition? This is my pipeline definition:

@kfp.dsl.pipeline(name=GC_DISPLAY_NAME)
def gc_pipeline(
    query:str=QUERY, 
    project_id:str=PROJECT_ID, 
    dataset_id:str=DATASET_ID, 
    table_id:str=GC_TRAINING_TABLE, 
    dataset_location:str=REGION, 
    job_config:str=''
):
    training_data_op = bigquery_query_op(
        query=query, 
        project_id=project_id, 
        dataset_id=dataset_id, 
        table_id=table_id,  
        dataset_location=dataset_location, 
        job_config=job_config
    )

Compiler:

from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func=gc_pipeline, package_path=GC_DISPLAY_NAME + ".json", type_check=False
)

I am having the same problem. It seems the v2 of the kfp compiler is having trouble these type conversions.

I am also having this same problem on kfp 1.8.3. I'm using the dataproc components from the component store.

create_dp_cluster = kfp.components.ComponentStore.default_store.load_component('gcp/dataproc/create_cluster')

def train_and_evaluate_pipeline(
    dataproc_img: str,
    service_acc: str = SERVICE_ACC,
    project_id: str = PROJECT_ID,
    region: str = REGION
):
    create_dp_cluster_task = create_dp_cluster(
            project_id=project_id.ignore_type(),
            region=region.ignore_type(),
            cluster={
                'config': {
                    'gceClusterConfig': {
                        'serviceAccount': service_acc
                    },
                    'masterConfig': {
                        'numInstances': 1,
                        'imageUri': dataproc_img,
                        'machineTypeUri': 'n1-highmem-8'
                    },
                    'workerConfig': {
                        'numInstances': 8,
                        'imageUri': dataproc_img,
                        'machineTypeUri': 'n1-highmem-8'
                    }
                }
            }
        )

Compiler:

from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func=train_and_evaluate_pipeline,
    package_path='train_and_evaluate_pipeline.tar.gz')

Error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_29850/3251206080.py in <module>
      3 compiler.Compiler().compile(
      4     pipeline_func=train_and_evaluate_pipeline,
----> 5     package_path='train_and_evaluate_pipeline.tar.gz')

/opt/conda/lib/python3.7/site-packages/kfp/v2/compiler/compiler.py in compile(self, pipeline_func, package_path, pipeline_name, pipeline_parameters, type_check)
   1181                 pipeline_func=pipeline_func,
   1182                 pipeline_name=pipeline_name,
-> 1183                 pipeline_parameters_override=pipeline_parameters)
   1184             self._write_pipeline(pipeline_job, package_path)
   1185         finally:

/opt/conda/lib/python3.7/site-packages/kfp/v2/compiler/compiler.py in _create_pipeline_v2(self, pipeline_func, pipeline_name, pipeline_parameters_override)
   1106 
   1107         with dsl.Pipeline(pipeline_name) as dsl_pipeline:
-> 1108             pipeline_func(*args_list)
   1109 
   1110         if not dsl_pipeline.ops:

/tmp/ipykernel_29850/3958915055.py in train_and_evaluate_pipeline(features_path, training_labels_path, model_output_path, model_config_name, test_labels_path, test_predictions_out_path, coral_version, coral_build_str, dataproc_img, service_acc, job_name, project_id, region)
     24         project_id=project_id.ignore_type(),
     25         region=region.ignore_type(),
---> 26         name=job_name,
     27     )
     28 

/opt/conda/lib/python3.7/site-packages/kfp/components/_dynamic.py in dataproc_delete_cluster(project_id, region, name, wait_interval)
     51 
     52     def pass_locals():
---> 53         return dict_func(locals())  # noqa: F821 TODO
     54 
     55     code = pass_locals.__code__

/opt/conda/lib/python3.7/site-packages/kfp/components/_components.py in create_task_object_from_component_and_pythonic_arguments(pythonic_arguments)
    368             component_spec=component_spec,
    369             arguments=arguments,
--> 370             component_ref=component_ref,
    371         )
    372 

/opt/conda/lib/python3.7/site-packages/kfp/components/_components.py in _create_task_object_from_component_and_arguments(component_spec, arguments, component_ref, **kwargs)
    306         arguments=arguments,
    307         component_ref=component_ref,
--> 308         **kwargs,
    309     )
    310 

/opt/conda/lib/python3.7/site-packages/kfp/dsl/_component_bridge.py in _create_container_op_from_component_and_arguments(component_spec, arguments, component_ref)
    317             task.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    318 
--> 319     _attach_v2_specs(task, component_spec, original_arguments)
    320 
    321     return task

/opt/conda/lib/python3.7/site-packages/kfp/dsl/_component_bridge.py in _attach_v2_specs(task, component_spec, arguments)
    634 
    635     resolved_cmd = _resolve_commands_and_args_v2(
--> 636         component_spec=component_spec, arguments=arguments)
    637 
    638     task.container_spec = (

/opt/conda/lib/python3.7/site-packages/kfp/dsl/_component_bridge.py in _resolve_commands_and_args_v2(component_spec, arguments)
    473             input_path_generator=_input_artifact_path_placeholder,
    474             output_path_generator=_resolve_output_path_placeholder,
--> 475             placeholder_resolver=_resolve_ir_placeholders_v2,
    476         )
    477         return resolved_cmd

/opt/conda/lib/python3.7/site-packages/kfp/components/_components.py in _resolve_command_line_and_paths(component_spec, arguments, input_path_generator, output_path_generator, argument_serializer, placeholder_resolver)
    562 
    563     expanded_command = expand_argument_list(container_spec.command)
--> 564     expanded_args = expand_argument_list(container_spec.args)
    565 
    566     return _ResolvedCommandLineAndPaths(

/opt/conda/lib/python3.7/site-packages/kfp/components/_components.py in expand_argument_list(argument_list)
    553         if argument_list is not None:
    554             for part in argument_list:
--> 555                 expanded_part = expand_command_part(part)
    556                 if expanded_part is not None:
    557                     if isinstance(expanded_part, list):

/opt/conda/lib/python3.7/site-packages/kfp/components/_components.py in expand_command_part(arg)
    470                 arg=arg,
    471                 component_spec=component_spec,
--> 472                 arguments=arguments,
    473             )
    474             if resolved_arg is not None:

/opt/conda/lib/python3.7/site-packages/kfp/dsl/_component_bridge.py in _resolve_ir_placeholders_v2(arg, component_spec, arguments)
    435                 input_value = arguments.get(input_name, None)
    436                 if input_value is not None:
--> 437                     return _input_parameter_placeholder(input_name)
    438                 else:
    439                     input_spec = inputs_dict[input_name]

/opt/conda/lib/python3.7/site-packages/kfp/dsl/_component_bridge.py in _input_parameter_placeholder(input_key)
    393                     'Input "{}" with type "{}" cannot be paired with '
    394                     'InputValuePlaceholder.'.format(
--> 395                         input_key, inputs_dict[input_key].type))
    396             else:
    397                 return "{{{{$.inputs.parameters['{}']}}}}".format(input_key)

TypeError: Input "project_id" with type "GCPProjectID" cannot be paired with InputValuePlaceholder.

Someone else mentioned that .ignore_type() works, but it appears that's no longer the case.

This is the fundamental question that needs answering, imo, in order to get folks to pass parameters to pipelines the way that the v2 pipelines expect them to be:

I understand that this is not only a type problem, more a problem of kfp.v2 expecting not primary types to be Artifacts. But how can I provide the project_id as a Artifact in my pipeline definition?

Otherwise, it seems an inconsistency was introduced in the type conversion of strings -> dsl types with the migration to v2. This pipeline code compiles with the v1 compiler.

stale[bot] commented 2 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

rimolive commented 8 months ago

Closing this issue. No activity for more than a year.

/close

google-oss-prow[bot] commented 8 months ago

@rimolive: Closing this issue.

In response to [this](https://github.com/kubeflow/pipelines/issues/4710#issuecomment-2016829739): >Closing this issue. No activity for more than a year. > >/close Instructions for interacting with me using PR comments are available [here](https://git.k8s.io/community/contributors/guide/pull-requests.md). If you have questions or suggestions related to my behavior, please file an issue against the [kubernetes/test-infra](https://github.com/kubernetes/test-infra/issues/new?title=Prow%20issue:) repository.
praveen-cule commented 4 months ago

I am facing same issue :

import kfp.dsl as dsl
import json
@dsl.pipeline(
    name='Dataproc submit Hive job pipeline',
    description='Dataproc submit Hive job pipeline'
)
def dataproc_submit_hive_job_pipeline(
    project_id = PROJECT_ID, 
    region = REGION,
    cluster_name = CLUSTER_NAME,
    queries = json.dumps([QUERY]),
    query_file_uri = '',
    script_variables = '', 
    hive_job='', 
    job='', 
    wait_interval='30'
):
    dataproc_submit_hive_job_op(
        project_id=project_id, 
        region=region, 
        cluster_name=cluster_name, 
        queries=queries, 
        query_file_uri=query_file_uri,
        script_variables=script_variables, 
        hive_job=hive_job, 
        job=job, 
        wait_interval=wait_interval)

InconsistentTypeException: Incompatible argument passed to the input 'project_id' of component 'dataproc-submit-pyspark-job': Argument type 'STRING' is incompatible with the input type 'system.Artifact@0.0.1'

please help on how to fix this