kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.6k stars 1.62k forks source link

Problem with google.cloud.logging and set_*_limit #11302

Open mtambos-jet opened 6 months ago

mtambos-jet commented 6 months ago

/kind bug

What steps did you take and what happened:

  1. Setup logging in _logging.py as:
    
    import logging
    from loguru import logger
    import google.cloud.logging as gcp_logging

all = ["logger"]

LOG_LEVEL_APP = os.getenv("LOGURU_LEVEL", logging.getLevelName(logging.DEBUG)) LOG_LEVEL_SYSTEM = os.getenv("LOG_LEVEL_SYSTEM", logging.getLevelName(logging.WARNING)) os.environ["LOGURU_LEVEL"] = LOG_LEVEL_APP CLOUD_LOGGING_DISABLED = bool(os.environ.get("CLOUD_LOGGING_DISABLED", False))

def setup_loggers(): logger.remove() if CLOUD_LOGGING_DISABLED:

Local Logging:

    logger.add(sys.stdout, level=LOG_LEVEL_APP, colorize=True)
    logging.basicConfig(stream=sys.stdout, level=LOG_LEVEL_SYSTEM)
else:
    client = gcp_logging.Client()
    client.setup_logging(log_level=LOG_LEVEL_SYSTEM)
    handler = client.get_default_handler()
    logger.add(handler, level=LOG_LEVEL_APP, format="{name}:{function}:{line} - <level>{message}</level>")  # loguru
logger.info(
    "Logging setup complete. Level system: {}. Level app: {}. Local Logging: {}",
    LOG_LEVEL_SYSTEM, LOG_LEVEL_APP, CLOUD_LOGGING_DISABLED
)

setup_loggers()

2. Create two components, `a`, and `b`, and run them in a pipeline, as:

pipeline.py

from kfp.dsl import pipeline from kfp.dsl import ContainerSpec, container_component

@container_component # type: ignore[misc] def a() -> ContainerSpec: return ContainerSpec( image=DEFAULT_IMAGE_PLACEHOLDER, command=["python3", "fc_gps_rankings/test_main.py"], args=["a"], )

@container_component # type: ignore[misc] def b() -> ContainerSpec: return ContainerSpec( image=DEFAULT_IMAGE_PLACEHOLDER, command=["python3", "fc_gps_rankings/test_main.py"], args=["b"], )

@pipeline(name="fc-test") # type: ignore[misc] def pipeline() -> None: a_op = a() b().set_memory_limit("64G").after(a_op)

test_main.py

import typer

from _logging import logger

app = typer.Typer()

@app.command() def a() -> None: logger.info(DEFAULT_IMAGE_PLACEHOLDER)

@app.command() def b() -> None: logger.info("b")

if name == "main": app()

3. Component `a` is able to write logs without issues. Component `b` gets an error when trying to write logs:

Failed to submit 1 logs. Traceback (most recent call last): " File ""/opt/app/.venv/lib/python3.10/site-packages/google/cloud/logging_v2/handlers/transports/background_thread.py"", line 115, in _safely_commit_batch" batch.commit() " File ""/opt/app/.venv/lib/python3.10/site-packages/google/cloud/logging_v2/logger.py"", line 467, in commit" client.logging_api.write_entries( " File ""/opt/app/.venv/lib/python3.10/site-packages/google/cloud/logging_v2/_gapic.py"", line 163, in write_entries" self._gapic_api.write_log_entries(request=request) " File ""/opt/app/.venv/lib/python3.10/site-packages/google/cloud/logging_v2/services/logging_service_v2/client.py"", line 955, in write_log_entries" response = rpc( " File ""/opt/app/.venv/lib/python3.10/site-packages/google/api_core/gapic_v1/method.py"", line 131, in call" " return wrapped_func(*args, *kwargs)" " File ""/opt/app/.venv/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py"", line 293, in retry_wrapped_func" return retry_target( " File ""/opt/app/.venv/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py"", line 153, in retry_target" _retry_error_helper( " File ""/opt/app/.venv/lib/python3.10/site-packages/google/api_core/retry/retry_base.py"", line 212, in _retry_error_helper" raise final_exc from source_exc " File ""/opt/app/.venv/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py"", line 144, in retry_target" result = target() " File ""/opt/app/.venv/lib/python3.10/site-packages/google/api_core/timeout.py"", line 120, in func_with_timeout" " return func(args, **kwargs)" " File ""/opt/app/.venv/lib/python3.10/site-packages/google/api_core/grpc_helpers.py"", line 78, in error_remapped_callable" raise exceptions.from_grpc_error(exc) from exc "google.api_core.exceptions.PermissionDenied: 403 Permission 'logging.logEntries.create' denied on resource (or it may not exist). [reason: ""IAM_PERMISSION_DENIED""" "domain: ""iam.googleapis.com""" metadata { " key: ""permission""" " value: ""logging.logEntries.create""" } ", type_url: ""type.googleapis.com/google.logging.v2.WriteLogEntriesPartialErrors""" "value: ""\nX\010\000\022T\010\007\022PPermission \'logging.logEntries.create\' denied on resource (or it may not exist).""" ]



**What did you expect to happen:**
Both components to be able to write logs successfully 

**Anything else you would like to add:**
This happens only if any of the `set_*_limit` methods are used on any of the components. The `after` methods doesn't seem to have an impact.

**Environment:**

Vertex AI
andreyvelich commented 2 days ago

/transfer pipelines