kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.5k stars 1.57k forks source link

[sdk] ParallelFor does not apply parallelism on the right template. #6588

Open dithyrambe opened 2 years ago

dithyrambe commented 2 years ago

Environment

Steps to reproduce

#pipeline.py

import kfp

@kfp.components.func_to_container_op
def dummy_op(arg):
    pass

@kfp.components.func_to_container_op
def other_op(arg):
    pass

@kfp.dsl.pipeline(name="For loop pipeline")
def pipeline(args: list):
    with kfp.dsl.ParallelFor(args, parallelism=2) as item:
        task1 = dummy_op(item)
        task2 = other_op(item)

        task2.after(task1)

Expected result

The pipelines runs but compiling this pipeline with dsl-compile --py pipeline.py --output ./pipeline.yaml puts the parallelism: 2 on a supposedly wrong template. Thus, when running this pipeline with 4 items in args, all 4 task are scheduled at once instead of keeping a maximum parallelism at 2. In the compiled YAML I was expecting to see parallelism: 2 within the entrypoint template (the one with the withParam section). Actually, when manually moving parallelism: 2 to the entrypoint template, it works as intended.

See below Actual output:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: for-loop-pipeline-
  annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.2, pipelines.kubeflow.org/pipeline_compilation_time: '2021-09-27T18:40:10.602539',
    pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"name": "args", "type": "JsonArray"}],
      "name": "For loop pipeline"}'}
  labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.2}
spec:
  entrypoint: for-loop-pipeline
  templates:
  - name: dummy-op
    container:
      args: [--arg, '{{inputs.parameters.args-loop-item}}']
      command:
      - sh
      - -ec
      - |
        program_path=$(mktemp)
        printf "%s" "$0" > "$program_path"
        python3 -u "$program_path" "$@"
      - |
        def dummy_op(arg):
            pass

        import argparse
        _parser = argparse.ArgumentParser(prog='Dummy op', description='')
        _parser.add_argument("--arg", dest="arg", type=str, required=True, default=argparse.SUPPRESS)
        _parsed_args = vars(_parser.parse_args())

        _outputs = dummy_op(**_parsed_args)
      image: python:3.7
    inputs:
      parameters:
      - {name: args-loop-item}
    metadata:
      labels:
        pipelines.kubeflow.org/kfp_sdk_version: 1.8.2
        pipelines.kubeflow.org/pipeline-sdk-type: kfp
        pipelines.kubeflow.org/enable_caching: "true"
      annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
          {"args": ["--arg", {"inputValue": "arg"}], "command": ["sh", "-ec", "program_path=$(mktemp)\nprintf
          \"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n",
          "def dummy_op(arg):\n    pass\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Dummy
          op'', description='''')\n_parser.add_argument(\"--arg\", dest=\"arg\", type=str,
          required=True, default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs
          = dummy_op(**_parsed_args)\n"], "image": "python:3.7"}}, "inputs": [{"name":
          "arg"}], "name": "Dummy op"}', pipelines.kubeflow.org/component_ref: '{}',
        pipelines.kubeflow.org/arguments.parameters: '{"arg": "{{inputs.parameters.args-loop-item}}"}'}
  - name: for-loop-1
    parallelism: 2
    inputs:
      parameters:
      - {name: args-loop-item}
    dag:
      tasks:
      - name: dummy-op
        template: dummy-op
        arguments:
          parameters:
          - {name: args-loop-item, value: '{{inputs.parameters.args-loop-item}}'}
      - name: other-op
        template: other-op
        dependencies: [dummy-op]
        arguments:
          parameters:
          - {name: args-loop-item, value: '{{inputs.parameters.args-loop-item}}'}
  - name: for-loop-pipeline
    dag:
      tasks:
      - name: for-loop-1
        template: for-loop-1
        arguments:
          parameters:
          - {name: args-loop-item, value: '{{item}}'}
        withParam: '{{workflow.parameters.args}}'
  - name: other-op
    container:
      args: [--arg, '{{inputs.parameters.args-loop-item}}']
      command:
      - sh
      - -ec
      - |
        program_path=$(mktemp)
        printf "%s" "$0" > "$program_path"
        python3 -u "$program_path" "$@"
      - |
        def other_op(arg):
            pass

        import argparse
        _parser = argparse.ArgumentParser(prog='Other op', description='')
        _parser.add_argument("--arg", dest="arg", type=str, required=True, default=argparse.SUPPRESS)
        _parsed_args = vars(_parser.parse_args())

        _outputs = other_op(**_parsed_args)
      image: python:3.7
    inputs:
      parameters:
      - {name: args-loop-item}
    metadata:
      labels:
        pipelines.kubeflow.org/kfp_sdk_version: 1.8.2
        pipelines.kubeflow.org/pipeline-sdk-type: kfp
        pipelines.kubeflow.org/enable_caching: "true"
      annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
          {"args": ["--arg", {"inputValue": "arg"}], "command": ["sh", "-ec", "program_path=$(mktemp)\nprintf
          \"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n",
          "def other_op(arg):\n    pass\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Other
          op'', description='''')\n_parser.add_argument(\"--arg\", dest=\"arg\", type=str,
          required=True, default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs
          = other_op(**_parsed_args)\n"], "image": "python:3.7"}}, "inputs": [{"name":
          "arg"}], "name": "Other op"}', pipelines.kubeflow.org/component_ref: '{}',
        pipelines.kubeflow.org/arguments.parameters: '{"arg": "{{inputs.parameters.args-loop-item}}"}'}
  arguments:
    parameters:
    - {name: args}
  serviceAccountName: pipeline-runner

Applying this diff fixes the issue:

--- pipeline.yaml   2021-09-27 18:40:10.608158969 +0200
+++ fixed.yaml  2021-09-27 18:42:43.469024168 +0200
@@ -50,3 +50,2 @@
   - name: for-loop-1
-    parallelism: 2
     inputs:
@@ -68,2 +67,3 @@
   - name: for-loop-pipeline
+    parallelism: 2
     dag:

Materials and Reference

Am I somehow using ParallelFor in a wrong way ?

Impacted by this bug? Give it a 👍. We prioritise the issues with the most 👍.

chensun commented 2 years ago

I think the initial design and implementation of parallelism on dsl.ParallelFor didn't match everyone's expectation on how it should work. Thus, there was a subsequent work: https://github.com/kubeflow/pipelines/pull/4199

WangCHX commented 2 years ago

also affected by this.

chanansh commented 1 year ago

was this fixed?

chanansh commented 1 year ago

Here is hack:

def fix_parallelism(source_pipeline_path, parallelism = 10, target_pipeline_path = None):
    """
    limits the number of parallel tasks
    Args:
        source_pipeline_file(str) - path to the source pipeline yaml to be edited.
        parallelism(int) - parallelisim to use
        target_pipeline_path (str) - target path, default same as source.

    Returns:
        None - edits the source file.
    """
    # see https://github.com/kubeflow/pipelines/issues/6588

    with open(source_pipeline_path,'rt') as f:
        data = yaml.load(f, Loader=SafeLoader)
    pipeline_name = json.loads(data['metadata']['annotations']['pipelines.kubeflow.org/pipeline_spec'])['name']
    pipeline_index = [i for i, t in enumerate(data['spec']['templates']) if t['name']==pipeline_name][0]
    data['spec']['templates'][pipeline_index]['parallelism']=parallelism
    target_pipeline_path = target_pipeline_path or source_pipeline_path
    with open(target_pipeline_path,'wt') as f:
        yaml.dump(data, f)
github-actions[bot] commented 2 weeks ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.