kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.56k stars 1.61k forks source link

[Feature Request] Add parallelism for dsl.ParallelFor #4089

Closed hlu09 closed 4 years ago

hlu09 commented 4 years ago

Use Case

Large data mining job is often split into many small jobs. Given the limit shared resource of external services (e.g., DataFlow), we can only run a few small jobs simultaneously.

Global parallelism works to some degree but lack flexibility, e.g., with global parallelism at 1, any in-cluster task can block launching jobs to external services.

Argo supports template-level parallelism https://github.com/argoproj/argo/blob/master/examples/parallelism-nested-workflow.yaml#L19 https://github.com/argoproj/argo/blob/master/examples/parallelism-nested-dag.yaml#L15

Feature request

with dsl.ParallelFor(loopidy_doop, parallelism=2) as item:
    // DAG in loop here

There will be at most 2 loop-DAG running in parallel.

Bobgy commented 4 years ago

/assign @Ark-kun /area sdk/dsl

NikeNano commented 4 years ago

I would be happy to help out as well! /assign

Ark-kun commented 4 years ago

This is an interesting feature request. It's not hard to implement, but I wonder whether the parallelism control is common in orchestrators. It looks like in Argo the parallelism option can be applied to any DAG. I wonder whether we should do the same and make the max_parallel_executions the property of the OpsGroup.

hlu09 commented 4 years ago

More generic way might be

    with dsl.Parallelism(2):
        with dsl.ParallelFor(loopidy_doop) as item:

Such block can be applied to any DAG outside parallel-for.

hlu09 commented 4 years ago

The tricky part is to design a way to define the unit of max_parallel_executions/parallelism. The argo yaml below will enforce parallelism limit at task level, rather than sub-DAG level (in this case, 2-step sequential DAG under ParallelFor).

  - name: for-loop-for-loop-0535d69b-1
    parallelism: 2
    inputs:
      parameters:
      - {name: loopidy_doop-loop-item-subvar-a}
    dag:
      tasks:
      - name: my-in-cop1
        template: my-in-cop1
        dependencies: [sleep-10-seconds]
        arguments:
          parameters:
          - {name: loopidy_doop-loop-item-subvar-a, value: '{{inputs.parameters.loopidy_doop-loop-item-subvar-a}}'}
      - {name: sleep-10-seconds, template: sleep-10-seconds}
        with dsl.ParallelFor(loopidy_doop, parallelism=2) as item:
            sleep = sleep_op(10)
            op1 = dsl.ContainerOp(
                name="my-in-cop1",
                image="library/bash",
                command=["sh", "-c"],
                arguments=["echo no output global op1, item.a: %s" % item.a],
            ).after(sleep)
NikeNano commented 4 years ago

The tricky part is to design a way to define the unit of max_parallel_executions/parallelism. The argo yaml below will enforce parallelism limit at task level, rather than sub-DAG level (in this case, 2-step sequential DAG under ParallelFor).

What do you mean by task levelvs ´sub-DAG` level? Do you mean that the for each DAG within the for loop there should be a limit not on the task it self? @hlu09?

I wonder whether we should do the same and make the max_parallel_executions the property of the OpsGroup. I think it makes sense for any ops that share resources to have the option and thus set it in the OpsGroup @Ark-kun.

hlu09 commented 4 years ago

Suppose the ParallelFor above generates 100 sub-DAGs, each contains 2 ops: sleep_op followed echo.

One way to enforce parallelism (2) is: 2 of these 100 sub-DAGs are executed first, followed by the next 2, and so on.

A different way: put 100 sleep_ops and 100 echo ops together in a group, 2 of these 200 ops are executed first, followed by the next 2, and so on. Certainly the echo.after(seelp_op) is still enforced for a single sub-DAG.

NikeNano commented 4 years ago

Thanks for the clarification @hlu09. I as I understand you suggest to keep the parallelism on the sub-DAGs level? Thus allowing X nbr of sub-DAG:s to run in parallel. I also think this makes the most sense from a users perspective.

NikeNano commented 4 years ago

I think this example use parallelism as you suggest @hlu09 : https://github.com/argoproj/argo/blob/master/examples/parallelism-nested-dag.yaml

hlu09 commented 4 years ago

@NikeNano right, it makes sense to keep the parallelism on the sub-DAGs level, since ParallelFor generates many sub-DAGs.

NikeNano commented 4 years ago

Will start the work this work today/tomorrow so we get it rolling!