googleapis / python-aiplatform

A Python SDK for Vertex AI, a fully managed, end-to-end platform for data science and machine learning.
Apache License 2.0
628 stars 340 forks source link

Specifying artifact regististry images with tags fails #2181

Open khaerensml6 opened 1 year ago

khaerensml6 commented 1 year ago

Creating a PipelineJob based on a artifact registry URI using a tag instead of a version raises an internal server error. This is pretty annoying.

to be clear:

All of this combined makes it seem like there's a bug for executing tagged artifacts.

Environment details

Steps to reproduce

  1. Create a PipelineJob with a artifact repository URI using a tag instead of the hash
  2. Run the pipeline job

Code example

This fails with a 500 internal server error on the "run" call.

    from google.cloud import aiplatform as aip

    service_account = ... 
    pipeline_name = ...
    job_id = f"{pipeline_name}-{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"

    compiled_job = f"https://{region}-kfp.pkg.dev/{PROJECTID}/{REPOSITORY_NAME}/{PIPELINE_NAME}/{TAG}"
    pipeline_job = aip.PipelineJob(
        display_name="test-name",
        job_id=job_id,
        template_path=compiled_job,
    )

    pipeline_job.run(network=None,
                     service_account=service_account,
                     sync=True)

This runs without problem:

    from google.cloud import aiplatform as aip

    service_account = ... 
    pipeline_name = ...
    job_id = f"{pipeline_name}-{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"

    compiled_job = f"https://{region}-kfp.pkg.dev/{PROJECTID}/{REPOSITORY_NAME}/{PIPELINE_NAME}/sha:...."
    pipeline_job = aip.PipelineJob(
        display_name="test-name",
        job_id=job_id,
        template_path=compiled_job,
    )

    pipeline_job.run(network=None,
                     service_account=service_account,
                     sync=True)

Stack trace

Traceback (most recent call last):
  File "/home/.../Desktop/test/venv/lib/python3.9/site-packages/google/api_core/grpc_helpers.py", line 72, in error_remapped_callable
    return callable_(*args, **kwargs)
  File "/home/.../Desktop/test/venv/lib/python3.9/site-packages/grpc/_channel.py", line 1030, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/home/.../Desktop/test/venv/lib/python3.9/site-packages/grpc/_channel.py", line 910, in _end_unary_response_blocking
    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.INTERNAL
        details = "Internal error encountered."
        debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B2a00:1450:400e:800::200a%5D:443 {created_time:"2023-05-09T00:09:30.434205595+02:00", grpc_status:13, grpc_message:"Internal error encountered."}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/.../Desktop/test/mlpipelines/monthly_pipeline.py", line 98, in <module>
    pipeline_job.run(network=None,
  File "/home/.../Desktop/test/venv/lib/python3.9/site-packages/google/cloud/aiplatform/pipeline_jobs.py", line 314, in run
    self._run(
  File "/home/.../Desktop/test/venv/lib/python3.9/site-packages/google/cloud/aiplatform/base.py", line 814, in wrapper
    return method(*args, **kwargs)
  File "/home/.../Desktop/test/venv/lib/python3.9/site-packages/google/cloud/aiplatform/pipeline_jobs.py", line 345, in _run
    self.submit(
  File "/home/.../Desktop/test/venv/lib/python3.9/site-packages/google/cloud/aiplatform/pipeline_jobs.py", line 419, in submit
    self._gca_resource = self.api_client.create_pipeline_job(
  File "/home/.../Desktop/test/venv/lib/python3.9/site-packages/google/cloud/aiplatform_v1/services/pipeline_service/client.py", line 1347, in create_pipeline_job
    response = rpc(
  File "/home/.../Desktop/test/venv/lib/python3.9/site-packages/google/api_core/gapic_v1/method.py", line 113, in __call__
    return wrapped_func(*args, **kwargs)
  File "/home/.../Desktop/test/venv/lib/python3.9/site-packages/google/api_core/grpc_helpers.py", line 74, in error_remapped_callable
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.InternalServerError: 500 Internal error encountered.
matthew29tang commented 1 year ago

Thanks for the detailed report! I've filed this as an internal bug and I'll get back to you when I have further updates about this.

xRagnorokx commented 1 year ago

For anyone finding this via google. I encountered a very similar error to this when trying to run a template as a pipeline job.

Turns out that I was not specifying the service account in the job submit call. Adding the service account to that call (i.e job.submit(service_account=pipeline_service_account)) fixed the issue for me.

kab840 commented 1 year ago

I have also similar problem. In my case, the service account for setting to job.submit needs to have roles/artifactregistry.reader to the target artifact registry for uploading pipeline template.

Anyway in my understanding, the service account called in job.submit is for executing vertex ai but for call pipeline template.

ghost commented 1 year ago

Any update on this? I seem to have the same issue

ghost commented 1 year ago

BTW downloading the pipeline YAML from Artifact Registry via tag works fine using the KFP SDK registry functions

ghost commented 1 year ago

Similarly it works fine using a curl request as per the docs

ghost commented 1 year ago

A workaround for anyone with the same issue - first use the KFP SDK to resolve the tag to an exact version, then pass the exact version as template_path:

import re
from kfp.registry import RegistryClient
from google.cloud import aiplatform

_VALID_AR_URL = re.compile(
    r"https://([\w\-]+)-kfp\.pkg\.dev/([\w\-]+)/([\w\-]+)/([\w\-]+)/([\w\-.]+)",
    re.IGNORECASE,
)

template_path = f"https://{region}-kfp.pkg.dev/{PROJECTID}/{REPOSITORY_NAME}/{PIPELINE_NAME}/{TAG}"

match = _VALID_AR_URL.match(template_path)
if match and "sha256:" not in template_path:
    region = match.group(1)
    project = match.group(2)
    repo = match.group(3)
    package_name = match.group(4)
    tag = match.group(5)
    host = f"https://{region}-kfp.pkg.dev/{project}/{repo}"
    client = RegistryClient(host=host)
    metadata = client.get_tag(package_name, tag)
    version = metadata["version"][metadata["version"].find("sha256:") :]
    template_path = f"{host}/{package_name}/{version}"

# Instantiate PipelineJob object
pl = aiplatform.pipeline_jobs.PipelineJob(
    template_path=template_path,
    ...
)