kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.62k stars 1.63k forks source link

[bug] Nested pipelines fail to run #10039

Closed JosepSampe closed 2 weeks ago

JosepSampe commented 1 year ago

I'm trying to run the "Pipelines as Components" example from the documentation, but it seems I'm not able to run it successfully. it always compile, but it always produces an error after the subpipeline is executed properly. Looking at the logs of the pod that produces the error, I can see:

driver.DAG(pipelineName=superpipeline, runID=525fd190-491a-42c1-9558-65772a931637, task="square-and-multiply", component="comp-square-and-multiply", dagExecutionID=237, componentSpec) failed: failed to resolve inputs: resolving input parameter a with spec task_output_parameter:{producer_task:"square-and-sum" output_parameter_key:"Output"}: cannot find output parameter key "Output" in producer task "square-and-sum"

Is there a different way to access the output of a "pipeline as component"?

Environment

Using a kind cluster created with: kind create cluster --name kfp

Steps to reproduce

from kfp import dsl
from kfp.client import Client

@dsl.component
def square(x: float) -> float:
    return x ** 2

@dsl.component
def add(x: float, y: float) -> float:
    return x + y

@dsl.component
def square_root(x: float) -> float:
    return x ** .5

@dsl.pipeline
def square_and_sum(a: float, b: float) -> float:
    a_sq_task = square(x=a)
    b_sq_task = square(x=b)
    return add(x=a_sq_task.output, y=b_sq_task.output).output

@dsl.pipeline
def pythagorean(a: float = 1.2, b: float = 1.2) -> float:
    sq_and_sum_task = square_and_sum(a=a, b=b)
    return square_root(x=sq_and_sum_task.output).output

if __name__ == "__main__":
    client = Client()

    run = client.create_run_from_pipeline_func(
        pipeline_func=pythagorean,
        experiment_name="pythagorean-2",
        enable_caching=False,
        arguments={
            'a': 3.5,
            'b': 4.5,
        },
        run_name="pythagorean-run-3"
    )

I also tried to explicitly access the output with

return square_root(x=sq_and_sum_task.outputs['Output']).output

but I get the same error

Expected result

Successfull execution

Materials and reference

Labels


Impacted by this bug? Give it a 👍.

zijianjoy commented 1 year ago

/assign @chensun

chensun commented 1 year ago

I can reproduce this, looks like a backend bug.

github-actions[bot] commented 10 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 7 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 7 months ago

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

KyleKaminky commented 6 months ago

I'm still seeing this issue on kfp 2.7, anyone else? @JosepSampe @chensun

KyleKaminky commented 6 months ago

/reopen

google-oss-prow[bot] commented 6 months ago

@KyleKaminky: You can't reopen an issue/PR unless you authored it or you are a collaborator.

In response to [this](https://github.com/kubeflow/pipelines/issues/10039#issuecomment-2115879924): >/reopen Instructions for interacting with me using PR comments are available [here](https://git.k8s.io/community/contributors/guide/pull-requests.md). If you have questions or suggestions related to my behavior, please file an issue against the [kubernetes-sigs/prow](https://github.com/kubernetes-sigs/prow/issues/new?title=Prow%20issue:) repository.
JosepSampe commented 6 months ago

/reopen

google-oss-prow[bot] commented 6 months ago

@JosepSampe: Reopened this issue.

In response to [this](https://github.com/kubeflow/pipelines/issues/10039#issuecomment-2115943273): >/reopen Instructions for interacting with me using PR comments are available [here](https://git.k8s.io/community/contributors/guide/pull-requests.md). If you have questions or suggestions related to my behavior, please file an issue against the [kubernetes-sigs/prow](https://github.com/kubernetes-sigs/prow/issues/new?title=Prow%20issue:) repository.
github-actions[bot] commented 4 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 3 months ago

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

droctothorpe commented 2 months ago

/reopen

google-oss-prow[bot] commented 2 months ago

@droctothorpe: Reopened this issue.

In response to [this](https://github.com/kubeflow/pipelines/issues/10039#issuecomment-2327035859): >/reopen Instructions for interacting with me using PR comments are available [here](https://git.k8s.io/community/contributors/guide/pull-requests.md). If you have questions or suggestions related to my behavior, please file an issue against the [kubernetes/test-infra](https://github.com/kubernetes/test-infra/issues/new?title=Prow%20issue:) repository.
droctothorpe commented 2 months ago

I plan to investigate / debug this issue this sprint.

droctothorpe commented 2 months ago

I'm keeping a log in case anyone else wants to follow along or has any thoughts / suggestion. I'll keep it updated as I make progress.

2024-09-03

I'm going to start by validating that I can recreate the failure scenario. I'm going to use a simplified example though.

from kfp import dsl
from kfp.client import Client

@dsl.component
def inner_comp() -> str:
    return "foobar"

@dsl.component
def outer_comp(input: str):
    print("input: ", input)

@dsl.pipeline
def inner_pipeline() -> str:
    inner_comp_task = inner_comp()
    inner_comp_task.set_caching_options(False)
    return inner_comp_task.output

@dsl.pipeline
def outer_pipeline():
    inner_pipeline_task = inner_pipeline()
    outer_comp_task = outer_comp(input=inner_pipeline_task.output)
    outer_comp_task.set_caching_options(False)

if __name__ == "__main__":
    client = Client()

    run = client.create_run_from_pipeline_func(
        pipeline_func=outer_pipeline,
        enable_caching=False,
    )

Confirming that this failed.

Let's simplify and remove variables. I'm going to only run the inner pipeline:

from kfp import dsl
from kfp.client import Client

@dsl.component
def inner_comp() -> str:
    return "inner"

@dsl.pipeline
def inner_pipeline() -> str:
    inner_comp_task = inner_comp()
    inner_comp_task.set_caching_options(False)
    return inner_comp_task.output

if __name__ == "__main__":
    client = Client()

    run = client.create_run_from_pipeline_func(
        pipeline_func=inner_pipeline,
        enable_caching=False,
    )

That ran without any issues.

What if we modify it slightly such that we have a sub-DAG but we never reference its output? I'm just going to change the outer_pipeline to look like this:

@dsl.pipeline
def outer_pipeline():
    inner_pipeline()
    outer_comp_task = outer_comp(input="foo") # Note, we never reference the output of inner_pipeline.
    outer_comp_task.set_caching_options(False)

That worked!

Let's copy the successful and failed AWF manifests to a file and diff / interrogate them.

- indicates failed manifest. + indicates succeeded manifest.

metadata:
    annotations: 
        - pipelines.kubeflow.org/components-root: '{"dag":{"tasks":{"inner-pipeline":{"cachingOptions":{},"componentRef":{"name":"comp-inner-pipeline"},"taskInfo":{"name":"inner-pipeline"}},"outer-comp":{"cachingOptions":{},"componentRef":{"name":"comp-outer-comp"},"dependentTasks":["inner-pipeline"],"inputs":{"parameters":{"input":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"inner-pipeline"}}}},"taskInfo":{"name":"outer-comp"}}}}}'
        + pipelines.kubeflow.org/components-root: '{"dag":{"tasks":{"inner-pipeline":{"cachingOptions":{},"componentRef":{"name":"comp-inner-pipeline"},"taskInfo":{"name":"inner-pipeline"}},"outer-comp":{"cachingOptions":{},"componentRef":{"name":"comp-outer-comp"},"inputs":{"parameters":{"input":{"runtimeValue":{"constant":"foo"}}}},"taskInfo":{"name":"outer-comp"}}}}}'
spec:
    templates:
        dag:
            tasks:
                arguments:
                    name: task
                    value:
                        - '{"cachingOptions":{},"componentRef":{"name":"comp-outer-comp"},"dependentTasks":["inner-pipeline"],"inputs":{"parameters":{"input":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"inner-pipeline"}}}},"taskInfo":{"name":"outer-comp"}}'
                        + '{"cachingOptions":{},"componentRef":{"name":"comp-outer-comp"},"inputs":{"parameters":{"input":{"runtimeValue":{"constant":"foo"}}}},"taskInfo":{"name":"outer-comp"}}'

I wonder if the compiler is just misconfiguring the workflow manifest somehow. Potentially relevant information from stackoverflow: https://stackoverflow.com/a/64996549.

Let's take a minute to parse and grok the failed manifest.

Under spec/templates, we have 3 containers and 4 dags. Here's a simplified view of the tasks:

- container
  - name: system-container-driver
  - command: driver
- dag
  - name: system-container-executor
  - tasks:
    - template: system-container-impl
- container
  - name: system-container-impl
- dag
  - name: comp-inner-pipeline
  - tasks:
    1.
      - template: system-container-driver
      - name: inner-comp-driver
    1. 
       - template: system-container-executor
       - name: inner-comp
       - depends: inner-comp-driver.Succeeded
- container
  - name: system-dag-driver
- dag
  - name: root
- dag
  - name: entrypoint

The root.outer-comp-driver task is what fails.

Here are the corresponding logs:

Stream closed EOF for kubeflow/outer-pipeline-sq9vj-system-container-driver-3067012875 (main)
init time="2024-09-03T19:35:07.623Z" level=info msg="Starting Workflow Executor" version=v3.4.16
init time="2024-09-03T19:35:07.685Z" level=info msg="Using executor retry strategy" Duration=1s Factor=1.6 Jitter=0.5 Steps=5
init time="2024-09-03T19:35:07.691Z" level=info msg="Executor initialized" deadline="0001-01-01 00:00:00 +0000 UTC" includeScriptOutput=false namespace=kubeflow podName=outer-pipeline-sq9vj-system-container-driver-3067012875 template="{\"name\":\"system-container-driver\",\"inputs\":{\"parameters\":[{\"name\":\"component\",\"value\":\"{\\\"executorLabel\\\":\\\"exec-outer-comp\\\",\\\"inputDefinitions\\\":{\\\"parameters\\\":{\\\"input\\\":{\\\"parameterType\\\":\\\"STRING\\\"}}}}\"},{\"name\":\"task\",\"value\":\"{\\\"cachingOptions\\\":{},\\\"componentRef\\\":{\\\"name\\\":\\\"comp-outer-comp\\\"},\\\"dependentTasks\\\":[\\\"inner-pipeline\\\"],\\\"inputs\\\":{\\\"parameters\\\":{\\\"input\\\":{\\\"taskOutputParameter\\\":{\\\"outputParameterKey\\\":\\\"Output\\\",\\\"producerTask\\\":\\\"inner-pipeline\\\"}}}},\\\"taskInfo\\\":{\\\"name\\\":\\\"outer-comp\\\"}}\"},{\"name\":\"container\",\"value\":\"{\\\"args\\\":[\\\"--executor_input\\\",\\\"{{$}}\\\",\\\"--function_to_execute\\\",\\\"outer_comp\\\"],\\\"command\\\":[\\\"sh\\\",\\\"-c\\\",\\\"\\\\nif ! [ -x \\\\\\\"$(command -v pip)\\\\\\\" ]; then\\\\n    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\\\\nfi\\\\n\\\\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0' '--no-deps' 'typing-extensions\\\\u003e=3.7.4,\\\\u003c5; python_version\\\\u003c\\\\\\\"3.9\\\\\\\"' \\\\u0026\\\\u0026 \\\\\\\"$0\\\\\\\" \\\\\\\"$@\\\\\\\"\\\\n\\\",\\\"sh\\\",\\\"-ec\\\",\\\"program_path=$(mktemp -d)\\\\n\\\\nprintf \\\\\\\"%s\\\\\\\" \\\\\\\"$0\\\\\\\" \\\\u003e \\\\\\\"$program_path/ephemeral_component.py\\\\\\\"\\\\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         \\\\\\\"$program_path/ephemeral_component.py\\\\\\\"                         \\\\\\\"$@\\\\\\\"\\\\n\\\",\\\"\\\\nimport kfp\\\\nfrom kfp import dsl\\\\nfrom kfp.dsl import *\\\\nfrom typing import *\\\\n\\\\ndef outer_comp(input: str):\\\\n    print(\\\\\\\"input: \\\\\\\", input)\\\\n\\\\n\\\"],\\\"image\\\":\\\"python:3.8\\\"}\"},{\"name\":\"parent-dag-id\",\"value\":\"234\"},{\"name\":\"iteration-index\",\"default\":\"-1\",\"value\":\"-1\"},{\"name\":\"kubernetes-config\",\"default\":\"\",\"value\":\"\"}]},\"outputs\":{\"parameters\":[{\"name\":\"pod-spec-patch\",\"valueFrom\":{\"path\":\"/tmp/outputs/pod-spec-patch\",\"default\":\"\"}},{\"name\":\"cached-decision\",\"default\":\"false\",\"valueFrom\":{\"path\":\"/tmp/outputs/cached-decision\",\"default\":\"false\"}},{\"name\":\"condition\",\"valueFrom\":{\"path\":\"/tmp/outputs/condition\",\"default\":\"true\"}}]},\"metadata\":{\"annotations\":{\"sidecar.istio.io/inject\":\"false\"}},\"container\":{\"name\":\"\",\"image\":\"gcr.io/ml-pipeline/kfp-driver@sha256:3c0665cd36aa87e4359a4c8b6271dcba5bdd817815cd0496ed12eb5dde5fd2ec\",\"command\":[\"driver\"],\"args\":[\"--type\",\"CONTAINER\",\"--pipeline_name\",\"outer-pipeline\",\"--run_id\",\"2fda42a6-a492-4f81-8a7f-f08118a34bcb\",\"--dag_execution_id\",\"234\",\"--component\",\"{\\\"executorLabel\\\":\\\"exec-outer-comp\\\",\\\"inputDefinitions\\\":{\\\"parameters\\\":{\\\"input\\\":{\\\"parameterType\\\":\\\"STRING\\\"}}}}\",\"--task\",\"{\\\"cachingOptions\\\":{},\\\"componentRef\\\":{\\\"name\\\":\\\"comp-outer-comp\\\"},\\\"dependentTasks\\\":[\\\"inner-pipeline\\\"],\\\"inputs\\\":{\\\"parameters\\\":{\\\"input\\\":{\\\"taskOutputParameter\\\":{\\\"outputParameterKey\\\":\\\"Output\\\",\\\"producerTask\\\":\\\"inner-pipeline\\\"}}}},\\\"taskInfo\\\":{\\\"name\\\":\\\"outer-comp\\\"}}\",\"--container\",\"{\\\"args\\\":[\\\"--executor_input\\\",\\\"{{$}}\\\",\\\"--function_to_execute\\\",\\\"outer_comp\\\"],\\\"command\\\":[\\\"sh\\\",\\\"-c\\\",\\\"\\\\nif ! [ -x \\\\\\\"$(command -v pip)\\\\\\\" ]; then\\\\n    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\\\\nfi\\\\n\\\\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0' '--no-deps' 'typing-extensions\\\\u003e=3.7.4,\\\\u003c5; python_version\\\\u003c\\\\\\\"3.9\\\\\\\"' \\\\u0026\\\\u0026 \\\\\\\"$0\\\\\\\" \\\\\\\"$@\\\\\\\"\\\\n\\\",\\\"sh\\\",\\\"-ec\\\",\\\"program_path=$(mktemp -d)\\\\n\\\\nprintf \\\\\\\"%s\\\\\\\" \\\\\\\"$0\\\\\\\" \\\\u003e \\\\\\\"$program_path/ephemeral_component.py\\\\\\\"\\\\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         \\\\\\\"$program_path/ephemeral_component.py\\\\\\\"                         \\\\\\\"$@\\\\\\\"\\\\n\\\",\\\"\\\\nimport kfp\\\\nfrom kfp import dsl\\\\nfrom kfp.dsl import *\\\\nfrom typing import *\\\\n\\\\ndef outer_comp(input: str):\\\\n    print(\\\\\\\"input: \\\\\\\", input)\\\\n\\\\n\\\"],\\\"image\\\":\\\"python:3.8\\\"}\",\"--iteration_index\",\"-1\",\"--cached_decision_path\",\"/tmp/outputs/cached-decision\",\"--pod_spec_patch_path\",\"/tmp/outputs/pod-spec-patch\",\"--condition_path\",\"/tmp/outputs/condition\",\"--kubernetes_config\",\"\"],\"resources\":{\"limits\":{\"cpu\":\"500m\",\"memory\":\"512Mi\"},\"requests\":{\"cpu\":\"100m\",\"memory\":\"64Mi\"}}},\"archiveLocation\":{\"archiveLogs\":true,\"s3\":{\"endpoint\":\"minio-service.kubeflow:9000\",\"bucket\":\"mlpipeline\",\"insecure\":true,\"accessKeySecret\":{\"name\":\"mlpipeline-minio-artifact\",\"key\":\"accesskey\"},\"secretKeySecret\":{\"name\":\"mlpipeline-minio-artifact\",\"key\":\"secretkey\"},\"key\":\"artifacts/outer-pipeline-sq9vj/2024/09/03/outer-pipeline-sq9vj-system-container-driver-3067012875\"}}}" version="&Version{Version:v3.4.16,BuildDate:2024-01-14T05:29:17Z,GitCommit:910a9aabce5de6568b54350c181a431f8263605a,GitTag:v3.4.16,GitTreeState:clean,GoVersion:go1.20.13,Compiler:gc,Platform:linux/amd64,}"
init time="2024-09-03T19:35:07.829Z" level=info msg="Start loading input artifacts..."
init time="2024-09-03T19:35:07.830Z" level=info msg="Alloc=6975 TotalAlloc=12544 Sys=27773 NumGC=4 Goroutines=4"
wait time="2024-09-03T19:35:08.854Z" level=info msg="Starting Workflow Executor" version=v3.4.16
wait time="2024-09-03T19:35:08.897Z" level=info msg="Using executor retry strategy" Duration=1s Factor=1.6 Jitter=0.5 Steps=5
wait time="2024-09-03T19:35:08.900Z" level=info msg="Executor initialized" deadline="0001-01-01 00:00:00 +0000 UTC" includeScriptOutput=false namespace=kubeflow podName=outer-pipeline-sq9vj-system-container-driver-3067012875 template="{\"name\":\"system-container-driver\",\"inputs\":{\"parameters\":[{\"name\":\"component\",\"value\":\"{\\\"executorLabel\\\":\\\"exec-outer-comp\\\",\\\"inputDefinitions\\\":{\\\"parameters\\\":{\\\"input\\\":{\\\"parameterType\\\":\\\"STRING\\\"}}}}\"},{\"name\":\"task\",\"value\":\"{\\\"cachingOptions\\\":{},\\\"componentRef\\\":{\\\"name\\\":\\\"comp-outer-comp\\\"},\\\"dependentTasks\\\":[\\\"inner-pipeline\\\"],\\\"inputs\\\":{\\\"parameters\\\":{\\\"input\\\":{\\\"taskOutputParameter\\\":{\\\"outputParameterKey\\\":\\\"Output\\\",\\\"producerTask\\\":\\\"inner-pipeline\\\"}}}},\\\"taskInfo\\\":{\\\"name\\\":\\\"outer-comp\\\"}}\"},{\"name\":\"container\",\"value\":\"{\\\"args\\\":[\\\"--executor_input\\\",\\\"{{$}}\\\",\\\"--function_to_execute\\\",\\\"outer_comp\\\"],\\\"command\\\":[\\\"sh\\\",\\\"-c\\\",\\\"\\\\nif ! [ -x \\\\\\\"$(command -v pip)\\\\\\\" ]; then\\\\n    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\\\\nfi\\\\n\\\\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0' '--no-deps' 'typing-extensions\\\\u003e=3.7.4,\\\\u003c5; python_version\\\\u003c\\\\\\\"3.9\\\\\\\"' \\\\u0026\\\\u0026 \\\\\\\"$0\\\\\\\" \\\\\\\"$@\\\\\\\"\\\\n\\\",\\\"sh\\\",\\\"-ec\\\",\\\"program_path=$(mktemp -d)\\\\n\\\\nprintf \\\\\\\"%s\\\\\\\" \\\\\\\"$0\\\\\\\" \\\\u003e \\\\\\\"$program_path/ephemeral_component.py\\\\\\\"\\\\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         \\\\\\\"$program_path/ephemeral_component.py\\\\\\\"                         \\\\\\\"$@\\\\\\\"\\\\n\\\",\\\"\\\\nimport kfp\\\\nfrom kfp import dsl\\\\nfrom kfp.dsl import *\\\\nfrom typing import *\\\\n\\\\ndef outer_comp(input: str):\\\\n    print(\\\\\\\"input: \\\\\\\", input)\\\\n\\\\n\\\"],\\\"image\\\":\\\"python:3.8\\\"}\"},{\"name\":\"parent-dag-id\",\"value\":\"234\"},{\"name\":\"iteration-index\",\"default\":\"-1\",\"value\":\"-1\"},{\"name\":\"kubernetes-config\",\"default\":\"\",\"value\":\"\"}]},\"outputs\":{\"parameters\":[{\"name\":\"pod-spec-patch\",\"valueFrom\":{\"path\":\"/tmp/outputs/pod-spec-patch\",\"default\":\"\"}},{\"name\":\"cached-decision\",\"default\":\"false\",\"valueFrom\":{\"path\":\"/tmp/outputs/cached-decision\",\"default\":\"false\"}},{\"name\":\"condition\",\"valueFrom\":{\"path\":\"/tmp/outputs/condition\",\"default\":\"true\"}}]},\"metadata\":{\"annotations\":{\"sidecar.istio.io/inject\":\"false\"}},\"container\":{\"name\":\"\",\"image\":\"gcr.io/ml-pipeline/kfp-driver@sha256:3c0665cd36aa87e4359a4c8b6271dcba5bdd817815cd0496ed12eb5dde5fd2ec\",\"command\":[\"driver\"],\"args\":[\"--type\",\"CONTAINER\",\"--pipeline_name\",\"outer-pipeline\",\"--run_id\",\"2fda42a6-a492-4f81-8a7f-f08118a34bcb\",\"--dag_execution_id\",\"234\",\"--component\",\"{\\\"executorLabel\\\":\\\"exec-outer-comp\\\",\\\"inputDefinitions\\\":{\\\"parameters\\\":{\\\"input\\\":{\\\"parameterType\\\":\\\"STRING\\\"}}}}\",\"--task\",\"{\\\"cachingOptions\\\":{},\\\"componentRef\\\":{\\\"name\\\":\\\"comp-outer-comp\\\"},\\\"dependentTasks\\\":[\\\"inner-pipeline\\\"],\\\"inputs\\\":{\\\"parameters\\\":{\\\"input\\\":{\\\"taskOutputParameter\\\":{\\\"outputParameterKey\\\":\\\"Output\\\",\\\"producerTask\\\":\\\"inner-pipeline\\\"}}}},\\\"taskInfo\\\":{\\\"name\\\":\\\"outer-comp\\\"}}\",\"--container\",\"{\\\"args\\\":[\\\"--executor_input\\\",\\\"{{$}}\\\",\\\"--function_to_execute\\\",\\\"outer_comp\\\"],\\\"command\\\":[\\\"sh\\\",\\\"-c\\\",\\\"\\\\nif ! [ -x \\\\\\\"$(command -v pip)\\\\\\\" ]; then\\\\n    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\\\\nfi\\\\n\\\\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0' '--no-deps' 'typing-extensions\\\\u003e=3.7.4,\\\\u003c5; python_version\\\\u003c\\\\\\\"3.9\\\\\\\"' \\\\u0026\\\\u0026 \\\\\\\"$0\\\\\\\" \\\\\\\"$@\\\\\\\"\\\\n\\\",\\\"sh\\\",\\\"-ec\\\",\\\"program_path=$(mktemp -d)\\\\n\\\\nprintf \\\\\\\"%s\\\\\\\" \\\\\\\"$0\\\\\\\" \\\\u003e \\\\\\\"$program_path/ephemeral_component.py\\\\\\\"\\\\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         \\\\\\\"$program_path/ephemeral_component.py\\\\\\\"                         \\\\\\\"$@\\\\\\\"\\\\n\\\",\\\"\\\\nimport kfp\\\\nfrom kfp import dsl\\\\nfrom kfp.dsl import *\\\\nfrom typing import *\\\\n\\\\ndef outer_comp(input: str):\\\\n    print(\\\\\\\"input: \\\\\\\", input)\\\\n\\\\n\\\"],\\\"image\\\":\\\"python:3.8\\\"}\",\"--iteration_index\",\"-1\",\"--cached_decision_path\",\"/tmp/outputs/cached-decision\",\"--pod_spec_patch_path\",\"/tmp/outputs/pod-spec-patch\",\"--condition_path\",\"/tmp/outputs/condition\",\"--kubernetes_config\",\"\"],\"resources\":{\"limits\":{\"cpu\":\"500m\",\"memory\":\"512Mi\"},\"requests\":{\"cpu\":\"100m\",\"memory\":\"64Mi\"}}},\"archiveLocation\":{\"archiveLogs\":true,\"s3\":{\"endpoint\":\"minio-service.kubeflow:9000\",\"bucket\":\"mlpipeline\",\"insecure\":true,\"accessKeySecret\":{\"name\":\"mlpipeline-minio-artifact\",\"key\":\"accesskey\"},\"secretKeySecret\":{\"name\":\"mlpipeline-minio-artifact\",\"key\":\"secretkey\"},\"key\":\"artifacts/outer-pipeline-sq9vj/2024/09/03/outer-pipeline-sq9vj-system-container-driver-3067012875\"}}}" version="&Version{Version:v3.4.16,BuildDate:2024-01-14T05:29:17Z,GitCommit:910a9aabce5de6568b54350c181a431f8263605a,GitTag:v3.4.16,GitTreeState:clean,GoVersion:go1.20.13,Compiler:gc,Platform:linux/amd64,}"
wait time="2024-09-03T19:35:08.904Z" level=info msg="Starting deadline monitor"
wait time="2024-09-03T19:35:11.908Z" level=info msg="Main container completed" error="<nil>"
wait time="2024-09-03T19:35:11.908Z" level=info msg="No Script output reference in workflow. Capturing script output ignored"
wait time="2024-09-03T19:35:11.908Z" level=info msg="Saving output parameters"
wait time="2024-09-03T19:35:11.908Z" level=info msg="Saving path output parameter: pod-spec-patch"
wait time="2024-09-03T19:35:11.908Z" level=info msg="Copying /tmp/outputs/pod-spec-patch from base image layer"
wait time="2024-09-03T19:35:11.909Z" level=info msg="Successfully saved output parameter: pod-spec-patch"
wait time="2024-09-03T19:35:11.909Z" level=info msg="Saving path output parameter: cached-decision"
wait time="2024-09-03T19:35:11.909Z" level=info msg="Copying /tmp/outputs/cached-decision from base image layer"
wait time="2024-09-03T19:35:11.909Z" level=info msg="Successfully saved output parameter: cached-decision"
main time="2024-09-03T19:35:09.694Z" level=info msg="capturing logs" argo=true
main I0903 19:35:10.683550      18 main.go:108] input ComponentSpec:{
main   "executorLabel": "exec-outer-comp",
main   "inputDefinitions": {
main     "parameters": {
main       "input": {
main         "parameterType": "STRING"
main       }
main     }
main   }
main }
main I0903 19:35:10.696924      18 main.go:115] input TaskSpec:{
main   "cachingOptions": {},
main   "componentRef": {
main     "name": "comp-outer-comp"
main   },
main   "dependentTasks": [
main     "inner-pipeline"
main   ],
main   "inputs": {
main     "parameters": {
main       "input": {
main         "taskOutputParameter": {
main           "outputParameterKey": "Output",
main           "producerTask": "inner-pipeline"
main         }
main       }
main     }
main   },
main   "taskInfo": {
main     "name": "outer-comp"
main   }
main }
main I0903 19:35:10.700383      18 main.go:121] input ContainerSpec:{
main   "args": [
main     "--executor_input",
main     "{{$}}",
main     "--function_to_execute",
main     "outer_comp"
main   ],
main   "command": [
main     "sh",
main     "-c",
main     "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0' '--no-deps' 'typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"' \u0026\u0026 \"$0\" \"$@\"\n",
main     "sh",
main     "-ec",
main     "program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         \"$program_path/ephemeral_component.py\"                         \"$@\"\n",
main     "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef outer_comp(input: str):\n    print(\"input: \", input)\n\n"
main   ],
main   "image": "python:3.8"
main }
main I0903 19:35:10.712435      18 cache.go:116] Connecting to cache endpoint 10.43.52.53:8887
main I0903 19:35:10.863280      18 client.go:302] Pipeline Context: id:72  name:"outer-pipeline"  type_id:11  type:"system.Pipeline"  create_time_since_epoch:1725384213787  last_update_time_since_epoch:1725384213787
main I0903 19:35:10.904456      18 client.go:311] Pipeline Run Context: id:82  name:"2fda42a6-a492-4f81-8a7f-f08118a34bcb"  type_id:12  type:"system.PipelineRun"  custom_properties:{key:"namespace"  value:{string_value:"kubeflow"}}  custom_properties:{key:"pipeline_root"  value:{string_value:"minio://mlpipeline/v2/artifacts/outer-pipeline/2fda42a6-a492-4f81-8a7f-f08118a34bcb"}}  custom_properties:{key:"resource_name"  value:{string_value:"run-resource"}}  custom_properties:{key:"store_session_info"  value:{string_value:"{\"Provider\":\"minio\",\"Params\":{\"accessKeyKey\":\"accesskey\",\"disableSSL\":\"true\",\"endpoint\":\"10.43.150.231:9000\",\"fromEnv\":\"false\",\"region\":\"minio\",\"secretKeyKey\":\"secretkey\",\"secretName\":\"mlpipeline-minio-artifact\"}}"}}  create_time_since_epoch:1725392061240  last_update_time_since_epoch:1725392061240
wait time="2024-09-03T19:35:11.909Z" level=info msg="Saving path output parameter: condition"
wait time="2024-09-03T19:35:11.909Z" level=info msg="Copying /tmp/outputs/condition from base image layer"
wait time="2024-09-03T19:35:11.909Z" level=info msg="Successfully saved output parameter: condition"
wait time="2024-09-03T19:35:11.909Z" level=info msg="No output artifacts"
main I0903 19:35:11.089839      18 driver.go:252] parent DAG: id:234  name:"run/2fda42a6-a492-4f81-8a7f-f08118a34bcb"  type_id:13  type:"system.DAGExecution"  last_known_state:RUNNING  custom_properties:{key:"display_name"  value:{string_value:""}}  custom_properties:{key:"task_name"  value:{string_value:""}}  create_time_since_epoch:1725392061428  last_update_time_since_epoch:1725392061428
main I0903 19:35:11.201532      18 driver.go:926] parent DAG input parameters: map], artifacts: map]
main F0903 19:35:11.286410      18 main.go:79] KFP driver: driver.Container(pipelineName=outer-pipeline, runID=2fda42a6-a492-4f81-8a7f-f08118a34bcb, task="outer-comp", component="comp-outer-comp", dagExecutionID=234, componentSpec) failed: failed to resolve inputs: resolving input parameter input with spec task_output_parameter:{producer_task:"inner-pipeline"  output_parameter_key:"Output"}: cannot find output parameter key "Output" in producer task "inner-pipeline"
main time="2024-09-03T19:35:11.709Z" level=info msg="sub-process exited" argo=true error="<nil>"
main time="2024-09-03T19:35:11.709Z" level=error msg="cannot save parameter /tmp/outputs/pod-spec-patch" argo=true error="open /tmp/outputs/pod-spec-patch: no such file or directory"
main time="2024-09-03T19:35:11.710Z" level=error msg="cannot save parameter /tmp/outputs/cached-decision" argo=true error="open /tmp/outputs/cached-decision: no such file or directory"
main time="2024-09-03T19:35:11.710Z" level=error msg="cannot save parameter /tmp/outputs/condition" argo=true error="open /tmp/outputs/condition: no such file or directory"
main Error: exit status 1
wait time="2024-09-03T19:35:11.912Z" level=info msg="S3 Save path: /tmp/argo/outputs/logs/main.log, key: artifacts/outer-pipeline-sq9vj/2024/09/03/outer-pipeline-sq9vj-system-container-driver-3067012875/main.log"
wait time="2024-09-03T19:35:11.912Z" level=info msg="Creating minio client using static credentials" endpoint="minio-service.kubeflow:9000"
wait time="2024-09-03T19:35:11.914Z" level=info msg="Saving file to s3" bucket=mlpipeline endpoint="minio-service.kubeflow:9000" key=artifacts/outer-pipeline-sq9vj/2024/09/03/outer-pipeline-sq9vj-system-container-driver-3067012875/main.log path=/tmp/argo/outputs/logs/main.log
wait time="2024-09-03T19:35:11.965Z" level=info msg="Save artifact" artifactName=main-logs duration=53.683795ms error="<nil>" key=artifacts/outer-pipeline-sq9vj/2024/09/03/outer-pipeline-sq9vj-system-container-driver-3067012875/main.log
wait time="2024-09-03T19:35:11.966Z" level=info msg="not deleting local artifact" localArtPath=/tmp/argo/outputs/logs/main.log
wait time="2024-09-03T19:35:11.966Z" level=info msg="Successfully saved file: /tmp/argo/outputs/logs/main.log"
wait time="2024-09-03T19:35:12.016Z" level=warning msg="failed to patch task set, falling back to legacy/insecure pod patch, see https://argo-workflows.readthedocs.io/en/release-3.4/workflow-rbac/" error="workflowtaskresults.argoproj.io is forbidden: User \"system:serviceaccount:kubeflow:pipeline-runner\" cannot create resource \"workflowtaskresults\" in API group \"argoproj.io\" in the namespace \"kubeflow\""
wait time="2024-09-03T19:35:12.045Z" level=info msg="Alloc=8334 TotalAlloc=13898 Sys=28285 NumGC=4 Goroutines=10"
wait time="2024-09-03T19:35:12.047Z" level=info msg="Deadline monitor stopped"
wait time="2024-09-03T19:35:12.047Z" level=info msg="stopping progress monitor (context done)" error="context canceled"
Stream closed EOF for kubeflow/outer-pipeline-sq9vj-system-container-driver-3067012875 (init)
Stream closed EOF for kubeflow/outer-pipeline-sq9vj-system-container-driver-3067012875 (wait)

Here's the key line:

main F0903 19:35:11.286410      18 main.go:79] KFP driver: driver.Container(pipelineName=outer-pipeline, runID=2fda42a6-a492-4f81-8a7f-f08118a34bcb, task="outer-comp", component="comp-outer-comp", dagExecutionID=234, componentSpec) failed: failed to resolve inputs: resolving input parameter input with spec task_output_parameter:{producer_task:"inner-pipeline"  output_parameter_key:"Output"}: cannot find output parameter key "Output" in producer task "inner-pipeline"

Let's confirm that the write is happening. It looks like it is, judging by this log from the inner executor:

main I0903 19:34:54.738676      32 launcher_v2.go:705] ExecutorOutput: {
main   "parameterValues": {
main     "Output": "inner"
main   }
main }
main I0903 19:34:54.814778      32 launcher_v2.go:151] publish success.

I spent some time combing through the mysql databases and tables then asked where the MLMD value parameters are saved in the CNCF slack /

kubeflow-pipelines channel.

2024-09-4

@HumairAK generously ran the failing pipeline and confirmed that the output parameter is saved to the metadb database in the ExecutionProperty table and that the value is likely serialized.

We've confirmed it's not a write problem. It's likely (1) a read problem (the driver is attempting to read from the wrong location), or (2) a compiler problem (the workflow manifest is misconfigured).

Next steps to follow.

droctothorpe commented 2 months ago

Added some debug logging to the driver.

main I0904 19:30:29.527880      19 driver.go:1130] Beginning to iterate through task.GetInputs().GetParameters().
main I0904 19:30:29.527970      19 driver.go:1132] name: input
main I0904 19:30:29.528053      19 driver.go:1133] paramSpec: task_output_parameter:{producer_task:"inner-pipeline"  output_parameter_key:"Output"}
main I0904 19:30:29.528290      19 driver.go:1152] taskOutput: producer_task:"inner-pipeline"  output_parameter_key:"Output"
main I0904 19:31:29.582117      19 driver.go:1170] producer: id:252  type_id:13  type:"system.DAGExecution"  last_known_state:RUNNING  custom_properties:{key:"display_name"  value:{string_value:"inner-pipeline"}}  custom_properties:{key:"inputs"  value:{struct_value:{}}}  custom_properties:{key:"parent_dag_id"  value:{int_value:251}}  custom_properties:{key:"task_name"  value:{string_value:"inner-pipeline"}}  create_time_since_epoch:1725478188680  last_update_time_since_epoch:1725478188680
main I0904 19:31:29.585325      19 driver.go:1178] execution: id:252  type_id:13  type:"system.DAGExecution"  last_known_state:RUNNING  custom_properties:{key:"display_name"  value:{string_value:"inner-pipeline"}}  custom_properties:{key:"inputs"  value:{struct_value:{}}}  custom_properties:{key:"parent_dag_id"  value:{int_value:251}}  custom_properties:{key:"task_name"  value:{string_value:"inner-pipeline"}}  create_time_since_epoch:1725478188680  last_update_time_since_epoch:1725478188680
main I0904 19:31:29.585871      19 driver.go:1180] properties: map[display_name:string_value:"inner-pipeline" inputs:struct_value:{} parent_dag_id:int_value:251 task_name:string_value:"inner-pipeline"]
main I0904 19:31:29.587862      19 driver.go:1187] outputs: map]
main I0904 19:31:29.588125      19 driver.go:1189] param: <nil>
main F0904 19:31:29.589903      19 main.go:79] KFP driver: driver.Container(pipelineName=outer-pipeline, runID=225f0001-9e6b-4a69-9b90-cf2ecfebddbc, task="outer-comp", component="comp-outer-comp", dagExecutionID=251, componentSpec) failed: failed to resolve inputs: resolving input parameter input with spec task_output_parameter:{producer_task:"inner-pipeline"  output_parameter_key:"Output"}: cannot find output parameter key "Output" in producer task "inner-pipeline"
main time="2024-09-04T19:31:29.854Z" level=info msg="sub-process exited" argo=true error="<nil>"

Okay, here's the exact line of code that returns the error: https://github.com/kubeflow/pipelines/blob/master/backend/src/v2/driver/driver.go#L1159

This case handles inputs that are outputs from previous tasks. When the previous task is a container, the resulting producer looks like this:

main I0904 19:36:40.607523      21 driver.go:1170] producer: id:255  type_id:14  type:"system.ContainerExecution"  last_known_state:COMPLETE  custom_properties:{key:"display_name"  value:{string_value:"one"}}  custom_properties:{key:"image"  value:{string_value:""}}  custom_properties:{key:"inputs"  value:{struct_value:{}}}  custom_properties:{key:"namespace"  value:{string_value:"kubeflow"}}  custom_properties:{key:"outputs"  value:{struct_value:{fields:{key:"Output"  value:{string_value:"one"}}}}}  custom_properties:{key:"parent_dag_id"  value:{int_value:254}}  custom_properties:{key:"pod_name"  value:{string_value:"pipeline-jlhqs-system-container-impl-1061929419"}}  custom_properties:{key:"pod_uid"  value:{string_value:"734bcf94-36f2-4643-a955-f38f737263ef"}}  custom_properties:{key:"task_name"  value:{string_value:"one"}}  create_time_since_epoch:1725478520142  last_update_time_since_epoch:1725478533166

In particular, note the following in the custom_properties map:

custom_properties:{key:"outputs"  value:{struct_value:{fields:{key:"Output"  value:{string_value:"one"}

That's our previous / producer task output.

Now look at what producer looks like when the task is a sub-DAG rather than a container:

main I0904 19:31:29.582117      19 driver.go:1170] producer: id:252  type_id:13  type:"system.DAGExecution"  last_known_state:RUNNING  custom_properties:{key:"display_name"  value:{string_value:"inner-pipeline"}}  custom_properties:{key:"inputs"  value:{struct_value:{}}}  custom_properties:{key:"parent_dag_id"  value:{int_value:251}}  custom_properties:{key:"task_name"  value:{string_value:"inner-pipeline"}}  create_time_since_epoch:1725478188680  last_update_time_since_epoch:1725478188680

In particular, take note of the fact that there is no custom property with a key of outputs!

We know that the output is in the database, as @humairak demonstrated, but for some reason, the response to the call to GetProducerTask() here doesn't include it.

Why not?

This might actually be a write problem after all, i.e. it's possible that the relationship between DAG tasks / executions and their corresponding outputs is not being set properly at write time.

Will pick up where I left off tomorrow.

droctothorpe commented 2 months ago

Apologies for not updating more consistently. We really got into the weeds with this. The abstractions are complex enough that even just communicating about them is quite difficult. The good news is that we have a working POC.

All of the updates are restricted to driver.go. We updated the DAG function to add information about the producer subtask in the execution write to MLMD. That fixed the "write." We then updated resolveInputs (the "read") to handle DAGExecution type tasks by gathering their tasks and essentially flattening the tasks list (shoutout to @zazulam for this brilliant suggestion). We then redirected resolveInputs to lookup the producer subtask's output (which is now in the tasks list thanks to the aforementioned flattening), which it already handles since component output > component input works fine.

It requires a lot more polish, validation, test file updates, recursive flattening (for when sub-DAGs have sub-DAGs), and we need to test against NamedTuple updates, but having a functional POC is a great milestone. Adding this topic to the community call agenda today for a possible informal design review of sorts, even though it is still a WIP.

cc @chensun.

ianbenlolo commented 2 months ago

This actually seems related to my discussion started here: https://github.com/kubeflow/pipelines/discussions/11181. Out of curiosity, will this fix what I'm trying to do there? Namely, using retries with nested pipelines.

Thanks! Looking forward to this fix.

zazulam commented 2 months ago

This actually seems related to my discussion started here: #11181. Out of curiosity, will this fix what I'm trying to do there? Namely, using retries with nested pipelines.

Thanks! Looking forward to this fix.

Thanks for bringing that up @ianbenlolo! Right now we're aiming to support PipelineParameters & Artifacts to move between nested DAGs. The retry piece may be a bit out of scope but could definitely be tested against our solution.

droctothorpe commented 2 weeks ago

Closed by https://github.com/kubeflow/pipelines/pull/11196.

droctothorpe commented 2 weeks ago

/close

google-oss-prow[bot] commented 2 weeks ago

@droctothorpe: Closing this issue.

In response to [this](https://github.com/kubeflow/pipelines/issues/10039#issuecomment-2449798663): >/close Instructions for interacting with me using PR comments are available [here](https://git.k8s.io/community/contributors/guide/pull-requests.md). If you have questions or suggestions related to my behavior, please file an issue against the [kubernetes/test-infra](https://github.com/kubernetes/test-infra/issues/new?title=Prow%20issue:) repository.