kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.62k stars 1.63k forks source link

[backend] `set_retry` for pipelines does not work #11288

Open ianbenlolo opened 1 month ago

ianbenlolo commented 1 month ago

Environment

Steps to reproduce

In the docs here it says "Pipelines can themselves be used as components in other pipelines, just as you would use any other single-step component in a pipeline".

I was testing this out to see if a pipeline within a pipeline can be retried but i can't get it to work. Here is what I've tried (based on this.)

from kfp import compiler
from kfp import dsl

@dsl.component
def print_op1(msg: str) -> str:
    print(msg)
    return msg

@dsl.container_component
def print_op2(msg: str):
    return dsl.ContainerSpec(
        image='alpine',
        command=['echo', msg],
    )

@dsl.component
def fail_job():
    raise ValueError('This job failed')

@dsl.pipeline
def inner_pipeline(msg: str):
    task = print_op1(msg=msg)

    fail_job().after(task).set_retry(num_retries = 2)

    print_op2(msg=task.output)

@dsl.pipeline(name='pipeline-in-pipeline')
def my_pipeline():
    op1_out = print_op1(msg='Hello')
    inner_out = inner_pipeline(msg='world').set_retry(num_retries=10).after(op1_out)
    print_op1(msg='bye').after(inner_out)

if __name__ == '__main__':
    compiler.Compiler().compile(
        pipeline_func=my_pipeline,
        package_path=__file__.replace('.py', '.yaml'))

The fail_job will retry, but the pipeline-in-pipeline does not. Am i wrong in my thinking?

Expected result

The pipeline-in-pipeline should retry as well.

This is related to my discussion here but making an issue for visibility.


Impacted by this bug? Give it a 👍.

Faakhir30 commented 1 month ago

@ianbenlolo set_retry is not working even without nested pipelines, see #9950 , I've also tried it without nested pipelines. It doesn't reties even once based on this spec:

      tasks:
        fail-job:
          cachingOptions:
            enableCache: true
          componentRef:
            name: comp-fail-job
          dependentTasks:
          - print-op1
          retryPolicy:
            backoffDuration: 0s
            backoffFactor: 2.0
            backoffMaxDuration: 3600s
            maxRetryCount: 2
          taskInfo:
            name: fail-job

I didn't try with vertexAI, but I guess set_retry is also not supported by VertexAI yet, see https://issuetracker.google.com/issues/226569351

ianbenlolo commented 1 month ago

@Faakhir30 Please see my comment in the original thread. It works for me on vertex ai. The issue is pipelines-in-pipelines that do not.

ishisakok-nttd commented 1 week ago

I am also encountering the same issue where retries are not functioning as expected.

The environment details are as follows:

How did you deploy Kubeflow Pipelines (KFP)?
Using a custom deployment on AWS EKS.

KFP version:
Kubeflow version: 1.9.0

KFP SDK version:

Here is the code:

import os
import kfp
from kfp.v2 import dsl
from kfp import kubernetes

@dsl.component(base_image="python:3.8")
def test1() -> None:
    import sys
    print("complete")
    sys.exit(1)

@dsl.pipeline(
    name="TestRetry",
    description="test retry"
)
def test_pipeline():
    test_task = test1()
    test_task.set_retry(
        num_retries=3,
        backoff_duration="60s",
        backoff_factor=2,
        backoff_max_duration="3600s"
    )
    test_task.set_caching_options(enable_caching=False)

print("Compiling pipeline for cloud execution...")
kfp.v2.compiler.Compiler().compile(test_pipeline, "./test_retry.yaml")

The compiled result is as follows:

# PIPELINE DEFINITION
# Name: testretry
# Description: test retry
components:
  comp-test1:
    executorLabel: exec-test1
deploymentSpec:
  executors:
    exec-test1:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - test1
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.9.0'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)

          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\n\ndef test1() -> None:\n    import sys\n    print(\"complete\")\n \
          \   sys.exit(1)\n\n"
        image: python:3.8
pipelineInfo:
  description: test retry
  name: testretry
root:
  dag:
    tasks:
      test1:
        cachingOptions: {}
        componentRef:
          name: comp-test1
        retryPolicy:
          backoffDuration: 60s
          backoffFactor: 2.0
          backoffMaxDuration: 3600s
          maxRetryCount: 3
        taskInfo:
          name: test1
schemaVersion: 2.1.0
sdkVersion: kfp-2.9.0