kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.61k stars 1.63k forks source link

[backend] Nested pipeline does not pass artifacts outputs #10041

Open omriel1 opened 1 year ago

omriel1 commented 1 year ago

Hi! I was trying to run a nested pipeline, where the first pipeline outputs two artifacts (Dataset) and the second one consumes them. I was generally the following example from the docs as the motivation: https://www.kubeflow.org/docs/components/pipelines/v2/data-types/artifacts/#using-output-artifacts

Even though the pipeline compiled successfully and in the UI it seems right, it seems like the pipeline fails (a failure sign near the run, but in the graph itself it seems like the first pipeline still running but its outputs are ready. I attached an image below), and there is no indication in the second pipeline for the failure reason.

If I look at the logs of the failing pod I see:

31 main.go:76] KFP driver: driver.DAG(pipelineName=parent-pipeline, runID=b11af0ad-3793-4d8a-a13a-43cce00898e4, task="second-inner-pipeline", component="comp-second-inner-pipeline", dagExecutionID=1, componentSpec) failed: failed to resolve inputs: failed to resolve input artifact table_1 with spec task_output_artifact:{producer_task:"first-inner-pipeline" output_artifact_key:"table_1"}: cannot find output artifact key "table_1" in producer task "first-inner-pipeline"
time="2023-09-28T17:18:05.473Z" level=info msg="sub-process exited" argo=true error="<nil>"
time="2023-09-28T17:18:05.534Z" level=error msg="cannot save parameter /tmp/outputs/execution-id" argo=true error="open /tmp/outputs/execution-id: no such file or directory"
time="2023-09-28T17:18:05.536Z" level=error msg="cannot save parameter /tmp/outputs/iteration-count" argo=true error="open /tmp/outputs/iteration-count: no such file or directory"
time="2023-09-28T17:18:05.536Z" level=error msg="cannot save parameter /tmp/outputs/condition" argo=true error="open /tmp/outputs/condition: no such file or directory"
Error: exit status 1

The UI:

Screenshot 2023-09-28 at 20 23 54

Environment

I used Local deployment on kind:

kind create cluster --name kfp

export PIPELINE_VERSION=2.0.0
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic?ref=$PIPELINE_VERSION"

and I'm using kfp version 2.0.1

Steps to reproduce

Here's the code I've used:

from typing import NamedTuple

from kfp import dsl, compiler, Client
from kfp.components.types.artifact_types import Dataset
from kfp.dsl import Output, Input

# ----------------
# Components

@dsl.component(packages_to_install=['pandas==1.3.5'])
def generate_table_1(table_1_output: Output[Dataset]):
    import pandas as pd

    data = pd.DataFrame({"a": [1,2,3], "b": [4,5,6]})
    data.to_csv(table_1_output.path, index=False)

@dsl.component(packages_to_install=['pandas==1.3.5'])
def generate_table_2(table_2_output: Output[Dataset]):
    import pandas as pd

    data = pd.DataFrame({"a": [1,2,3], "c": [7,8,9]})
    data.to_csv(table_2_output.path, index=False)

@dsl.component(packages_to_install=['pandas==1.3.5'])
def join_tables(table_1: Input[Dataset], table_2: Input[Dataset], joined_table: Output[Dataset]):
    import pandas as pd

    left = pd.read_csv(table_1.path)
    right = pd.read_csv(table_2.path)
    result = pd.merge(left=left, right=right, on="a", how="inner")
    result.to_csv(joined_table.path, index=False)

@dsl.component
def get_table_size(table: Input[Dataset]) -> int:
    import os
    return os.path.getsize(table.path)

# ----------------
# Pipelines

@dsl.pipeline
def first_inner_pipeline() -> NamedTuple('Outputs', table_1=Dataset, table_2=Dataset):
    Outputs = NamedTuple('Outputs', table_1=Dataset, table_2=Dataset)
    generate_table_1_task = generate_table_1()
    generate_table_2_task = generate_table_2()
    return Outputs(
        table_1=generate_table_1_task.outputs['table_1_output'],
        table_2=generate_table_2_task.outputs['table_2_output'])

@dsl.pipeline
def second_inner_pipeline(table_1: Input[Dataset], table_2: Input[Dataset]) -> int:
    join_tables_task = join_tables(table_1=table_1, table_2=table_2)
    get_table_size_task = get_table_size(table=join_tables_task.outputs['joined_table'])
    return get_table_size_task.output

@dsl.pipeline
def parent_pipeline() -> int:
    first_pipeline_task = first_inner_pipeline()
    second_pipeline_task = second_inner_pipeline(
        table_1=first_pipeline_task.outputs['table_1'],
        table_2 = first_pipeline_task.outputs['table_2']
    )
    return second_pipeline_task.output

if __name__ == "__main__":
    compiler.Compiler().compile(
        pipeline_func=parent_pipeline,
        package_path="./kfp_usecase_2/issue.yaml")

    client = Client()
    run = client.create_run_from_pipeline_func(
        pipeline_func=parent_pipeline,
        experiment_name="nested-pipeline-exp",
        enable_caching=False,
        run_name="nested-pipeline"
    )

Impacted by this bug? Give it a 👍.

omriel1 commented 1 year ago

similar to https://github.com/kubeflow/pipelines/issues/10039 but with artifacts

zijianjoy commented 1 year ago

/assign @chensun

github-actions[bot] commented 10 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

HumairAK commented 9 months ago

/remove-lifecycle stale

github-actions[bot] commented 7 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 6 months ago

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

zetinaricky commented 5 months ago

I'm facing the same issue still. Was there a solution or workaround for this?

thesuperzapper commented 5 months ago

@zetinaricky what version of Kubeflow Pipeline (backend) are you using?

zetinaricky commented 4 months ago

backend version is v1.8.0

thesuperzapper commented 4 months ago

@zetinaricky that's seems very unlikely, given this issue is about KFP v2.

I assume you are using Kubeflow 1.8, which includes Kubeflow Pipelines 2.0.3?

zetinaricky commented 4 months ago

Kubefkow backend version: v1.8.0 Kubeflow pipelines version 2.2.0 Pipeline built and compiled with kfp version 2.7.0

Do you think the issue is unrelated to this thread then? Thanks!

boarder7395 commented 3 months ago

I'm running kubeflow 1.9 with kubeflow pipelines 2.2.0 and this is still an issue. Its concerning that code provided in documentation is broken.

Kubeflow Backend Version: v1.9.0 Kubeflow pipelines Version: 2.2.0 Kubeflow pipelines SDK: 2.8.0

Relevant logs of the issue:

I0808 00:48:51.435502      15 driver.go:252] parent DAG: id:96  name:"run/abb87731-e26c-4d9b-9d58-3be8d07fd5fe"  type_id:13  type:"system.DAGExecution"  last_known_state:RUNNING  custom_properties:{key:"display_name"  value:{string_value:""}}  custom_properties:{key:"inputs"  value:{struct_value:{fields:{key:"a"  value:{number_value:3.2}}  fields:{key:"b"  value:{number_value:9.2}}}}}  custom_properties:{key:"task_name"  value:{string_value:""}}  create_time_since_epoch:1723078026064  last_update_time_since_epoch:1723078026064
I0808 00:48:51.877802      15 driver.go:926] parent DAG input parameters: map[a:number_value:3.2 b:number_value:9.2], artifacts: map[]
F0808 00:48:52.134460      15 main.go:79] KFP driver: driver.Container(pipelineName=pythagorean, runID=abb87731-e26c-4d9b-9d58-3be8d07fd5fe, task="square-root", component="comp-square-root", dagExecutionID=96, componentSpec) failed: failed to resolve inputs: resolving input parameter x with spec task_output_parameter:{producer_task:"square-and-sum"  output_parameter_key:"Output"}: cannot find output parameter key "Output" in producer task "square-and-sum"
time="2024-08-08T00:48:52.832Z" level=info msg="sub-process exited" argo=true error="<nil>"
time="2024-08-08T00:48:52.832Z" level=error msg="cannot save parameter /tmp/outputs/pod-spec-patch" argo=true error="open /tmp/outputs/pod-spec-patch: no such file or directory"
time="2024-08-08T00:48:52.832Z" level=error msg="cannot save parameter /tmp/outputs/cached-decision" argo=true error="open /tmp/outputs/cached-decision: no such file or directory"
time="2024-08-08T00:48:52.832Z" level=error msg="cannot save parameter /tmp/outputs/condition" argo=true error="open /tmp/outputs/condition: no such file or directory"
Error: exit status 1

PipelineSpec: pipeline_in_pipeline_spec.txt

@chensun Can this issue be reopened and frozen from getting auto closed.

droctothorpe commented 1 month ago

This issue appears to be redundant with https://github.com/kubeflow/pipelines/issues/10039.

boarder7395 commented 1 month ago

@droctothorpe Thoughts on closing this in favor of #10039

droctothorpe commented 1 month ago

I think that's a good idea 👍 .