kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.55k stars 1.6k forks source link

[sdk] Unable to Use BigqueryQueryJobOp Component with DockerRunner #10915

Closed jambonne closed 3 days ago

jambonne commented 2 months ago

Environment

Steps to reproduce

import kfp

from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp

kfp.local.init(runner=kfp.local.DockerRunner())

dataset = "<gcp-project>.<dataset-id>.<table-id>"
bq_out = BigqueryQueryJobOp(
            query=f"SELECT * FROM `{dataset}` LIMIT 100",
            project="<gcp-project>",
            location="US",
        )

Throws the following error:

13:10:38.473 - INFO - Executing task 'bigquery-query-job'
13:10:38.474 - INFO - Streamed logs:

    Found image 'gcr.io/ml-pipeline/google-cloud-pipeline-components:2.14.1'

    /usr/lib/python3.8/runpy.py:111: FutureWarning:  Google Cloud Pipeline Components will drop support for Python 3.8 on Oct 1, 2024. To use new versions of the GCPC SDK after that date, you will need to upgrade to Python >= 3.9. See https://devguide.python.org/versions/ for more details.
      __import__(pkg_name)
    Traceback (most recent call last):
      File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
        return _run_code(code, main_globals, None,
      File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
        exec(code, run_globals)
      File "/usr/local/lib/python3.8/dist-packages/google_cloud_pipeline_components/container/v1/aiplatform/remote_runner.py", line 322, in <module>
        main()
      File "/usr/local/lib/python3.8/dist-packages/google_cloud_pipeline_components/container/v1/aiplatform/remote_runner.py", line 318, in main
        print(runner(args.cls_name, args.method_name, executor_input, kwargs))
      File "/usr/local/lib/python3.8/dist-packages/google_cloud_pipeline_components/container/v1/aiplatform/remote_runner.py", line 261, in runner
        cls = getattr(aiplatform, cls_name)
    TypeError: getattr(): attribute name must be string
    python3
    -u
    -m
    google_cloud_pipeline_components.container.v1.bigquery.query_job.launcher
    --type
    BigqueryQueryJob
    --project
<project>
    --location
    US
    --payload
    {"configuration": {"query": {}, "labels": {}}}
    --job_configuration_query_override
    {"query": "SELECT * FROM `<dataset>` LIMIT 100", "query_parameters": [], "destination_encryption_configuration": {"kmsKeyName": ""}}
    --gcp_resources
    /home/jupyter/local_outputs/bigquery-query-job-2024-06-17-13-10-38-471885/bigquery-query-job/gcp_resources
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[2], line 11
      8 kfp.local.init(runner=kfp.local.DockerRunner())
     10 dataset = "<dataset>"
---> 11 bq_out = BigqueryQueryJobOp(
     12             query=f"SELECT * FROM `{dataset}` LIMIT 100",
     13             project="<project>",
     14             location="US",
     15         )

File /opt/conda/lib/python3.10/site-packages/kfp/dsl/base_component.py:101, in BaseComponent.__call__(self, *args, **kwargs)
     94     arguments = ', '.join(
     95         arg_name.replace('-', '_') for arg_name in missing_arguments)
     97     raise TypeError(
     98         f'{self.name}() missing {len(missing_arguments)} required '
     99         f'{argument_or_arguments}: {arguments}.')
--> 101 return pipeline_task.PipelineTask(
    102     component_spec=self.component_spec,
    103     args=task_inputs,
    104     execute_locally=pipeline_context.Pipeline.get_default_pipeline() is
    105     None,
    106 )

File /opt/conda/lib/python3.10/site-packages/kfp/dsl/pipeline_task.py:184, in PipelineTask.__init__(self, component_spec, args, execute_locally)
    175 self._channel_inputs = [
    176     value for _, value in args.items()
    177     if isinstance(value, pipeline_channel.PipelineChannel)
   (...)
    180     if not isinstance(value, pipeline_channel.PipelineChannel)
    181 ])
    183 if execute_locally:
--> 184     self._execute_locally(args=args)

File /opt/conda/lib/python3.10/site-packages/kfp/dsl/pipeline_task.py:199, in PipelineTask._execute_locally(self, args)
    194     self._outputs = pipeline_orchestrator.run_local_pipeline(
    195         pipeline_spec=self.pipeline_spec,
    196         arguments=args,
    197     )
    198 elif self.component_spec is not None:
--> 199     self._outputs = task_dispatcher.run_single_task(
    200         pipeline_spec=self.component_spec.to_pipeline_spec(),
    201         arguments=args,
    202     )
    203 else:
    204     # user should never hit this
    205     raise ValueError(
    206         'One of pipeline_spec or component_spec must not be None for local execution.'
    207     )

File /opt/conda/lib/python3.10/site-packages/kfp/local/task_dispatcher.py:57, in run_single_task(pipeline_spec, arguments)
     52 pipeline_resource_name = executor_input_utils.get_local_pipeline_resource_name(
     53     pipeline_spec.pipeline_info.name)
     55 # all global state should be accessed here
     56 # do not access local config state downstream
---> 57 outputs, _ = run_single_task_implementation(
     58     pipeline_resource_name=pipeline_resource_name,
     59     component_name=component_name,
     60     component_spec=component_spec,
     61     executor_spec=executor_spec,
     62     arguments=arguments,
     63     pipeline_root=config.LocalExecutionConfig.instance.pipeline_root,
     64     runner=config.LocalExecutionConfig.instance.runner,
     65     raise_on_error=config.LocalExecutionConfig.instance.raise_on_error,
     66     block_input_artifact=True,
     67     unique_pipeline_id=placeholder_utils.make_random_id())
     68 return outputs

File /opt/conda/lib/python3.10/site-packages/kfp/local/task_dispatcher.py:179, in run_single_task_implementation(pipeline_resource_name, component_name, component_spec, executor_spec, arguments, pipeline_root, runner, raise_on_error, block_input_artifact, unique_pipeline_id)
    177 msg = f'Task {task_name_for_logs} finished with status {logging_utils.format_status(task_status)}'
    178 if raise_on_error:
--> 179     raise RuntimeError(msg)
    180 else:
    181     logging.error(msg)

RuntimeError: Task 'bigquery-query-job' finished with status FAILURE

Expected result

13:13:51.003 - INFO - Executing task 'bigquery-query-job'
13:13:51.006 - INFO - Streamed logs:

    Found image '<image>'

13:14:14.865 - INFO - Task 'bigquery-query-job' finished with status SUCCESS
13:14:14.868 - INFO - Task 'bigquery-query-job' outputs:
    gcp_resources: '{
  "resources": [
    {
      "resourceType": "BigQueryJob",
      "resourceUri": "https://www.googleapis.com/bigquery/v2/projects/<project>/jobs/job_r5P7CzjYF9GoFtxgfT4Ufp7upppo?location=US"
    }
  ]
}'
    destination_table: Artifact( name='destination_table',
                                 uri='https://www.googleapis.com/bigquery/v2/projects/<project>/datasets/_a8149b15efd25a6a646be213c21bdba59b44c213/tables/anoneadae69bc490c369b12cbe9f946408dbc689eb12f729bb7946ffe7a57ef11813',
                                 metadata={'datasetId': '_a8149b15efd25a6a646be213c21bdba59b44c213', 'tableId': 'anoneadae69bc490c369b12cbe9f946408dbc689eb12f729bb7946ffe7a57ef11813', 'projectId': '<project>'} )

Materials and Reference

The expected result above was achieved by switching out the docker image used by the google-cloud-pipeline-components components by default. The error looks like the entrypoint isn't being overridden as intended. google_cloud_pipeline_components/container/v1/aiplatform/remote_runner.py is used as the executed file instead of google_cloud_pipeline_components.container.v1.bigquery.query_job.launcher

Impacted by this bug? Give it a 👍.

jambonne commented 2 months ago

Adding to this; it seems like almost none of the google-cloud-pipeline-components are usable with DockerRunner. From what I can tell, its because the Dockerfile that makes the image used by all gcpc components here executes the aiplatform.remote_runner with an ENTRYPOINT command which can't be overridden by the command submitted by the Docker runner. This causes the command submitted to the container (something like the below:

['python3', '-u', '-m', 'google_cloud_pipeline_components.container.v1.bigquery.query_job.launcher', '--type', 'BigqueryQueryJob', '--project', "{{$.inputs.parameters['project']}}", '--location', "{{$.inputs.parameters['location']}}", '--payload', '{"Concat": ["{", "\\"configuration\\": {", "\\"query\\": ", "{{$.inputs.parameters[\'job_configuration_query\']}}", ", \\"labels\\": ", "{{$.inputs.parameters[\'labels\']}}", "}", "}"]}', '--job_configuration_query_override', '{"Concat": ["{", "\\"query\\": \\"", "{{$.inputs.parameters[\'query\']}}", "\\"", ", \\"query_parameters\\": ", "{{$.inputs.parameters[\'query_parameters\']}}", ", \\"destination_encryption_configuration\\": {", "\\"kmsKeyName\\": \\"", "{{$.inputs.parameters[\'encryption_spec_key_name\']}}", "\\"}", "}"]}', '--gcp_resources', "{{$.outputs.parameters['gcp_resources'].output_file}}", '--executor_input', '{{$}}']

to be submitted as args to the ENTRYPOINT script instead of a command. This results in problems with the argparser and renders a user unable to run anything from GCPC with the Docker runner.

I was able to fix this on my machine with a with a pretty minor modification. Modify the docker run command here from the original:

    container = client.containers.run(
        image=image,
        command=command,
        detach=True,
        stdout=True,
        stderr=True,
        volumes=volumes,
    )

to:

    container = client.containers.run(
        image=image,
        entrypoint=[],
        command=command,
        detach=True,
        stdout=True,
        stderr=True,
        volumes=volumes,
    )

While small, I can imagine this is a bigger change than is desirable. Would it be possible to add an additional runner class for running GCPC components locally?

github-actions[bot] commented 3 weeks ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 3 days ago

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.