kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.56k stars 1.61k forks source link

[backend] Unable to submit pipeline for map reduce job using parallel-for and collected #10035

Closed VinithKrishnan closed 11 months ago

VinithKrishnan commented 11 months ago

Environment

Steps to reproduce

Create the pipeline.yaml by compiling the following code snippet:

from typing import List
from kfp import compiler, dsl
from kfp.dsl import Input, Model, Output

@dsl.component
def double(num: int, model: Output[Model]):
    model = num*2

@dsl.component
def add(nums: Input[List[Model]]) -> int:
    total = 0
    for num in nums:
        total = total + num
    return total

@dsl.pipeline
def main_pipeline() -> int:

    with dsl.ParallelFor(items=[1, 5, 10, 25]) as f:
         t = double(num=f)

    return add(nums=dsl.Collected(t.output)).output

compiler.Compiler().compile(main_pipeline, 'map_reduce_compiled.yaml')

Error message shown I try to create run from compiled yaml file:

Run creation failed

{"error":"Failed to create a new run: InternalServerError: Failed to validate workflow for (): templates.entrypoint.tasks.root templates.root sorting failed: invalid dependency for-loop-2","code":13,"message":"Failed to create a new run: InternalServerError: Failed to validate workflow for (): templates.entrypoint.tasks.root templates.root sorting failed: invalid dependency for-loop-2","details":[{"@type":"type.googleapis.com/google.rpc.Status","code":13,"message":"Internal Server Error"}]}

Expected result

Pipeline backend should be able to accept the compiled yaml and run it.

Materials and Reference

Followed the instructions in this doc to create the above map reduce job.


Impacted by this bug? Give it a 👍.

chensun commented 11 months ago

Apologize for the confusion, but dsl.Collected is currently not supported on KFP backend yet. This is called out in the documentation: https://www.kubeflow.org/docs/components/pipelines/v2/pipelines/control-flow/#parallel-looping-dslparallelfor.

VinithKrishnan commented 11 months ago

Got it. Do you have any estimate on when support for the feature is expected to be released? If nobody is working on it, I am open to contributing. Would appreciate any pointers on what needs to be done to add support for it.

mitchell-lawson commented 7 months ago

Is there any solution for collecting output artifacts from dsl.ParallelFor since dsl.Collected is not supported by the KFP orchestration backend? I need to join outputs after splitting them, should I skip creating artifacts and instead save to some persistent file storage like EFS? I can also help contribute if this issue is open and being worked on.