awslabs / kubeflow-manifests

KubeFlow on AWS
https://awslabs.github.io/kubeflow-manifests/
Apache License 2.0
156 stars 116 forks source link

ParallelFor does not start as many pods as expected #550

Open philcrooks opened 1 year ago

philcrooks commented 1 year ago

Running this code:

parallel_task = dsl.ParallelFor(repo_tasks.output, parallelism=50)
    with parallel_task as item:
        # this is a func_to_container_op with boto3
        unpack_task = get_file(item)

        # this is a func_to_container_op
        commits_input_task = add_log_data_to_input(unpack_task.output, log_stream_name)
        # docker image 
        commits_task = get_commits(commits_input_task.output, "/output/commits/output.json")

We expected to see 50 pods (or something approaching that) being started up to run the defined task. Instead we only ever saw a maximum of 9 pods running in the kubelow-user-example-com namespace. At any one time there were a small number of pods being prepared to be run but there was certainly no backlog indicating a constriction in the pipeline. In fact were running a six-node cluster with eight cores per node. None of the nodes were heavily loaded.

We also tried setting these values:

    dsl.get_pipeline_conf().set_parallelism(100)
    dsl.get_pipeline_conf().set_pod_disruption_budget("100%")
    dsl.get_pipeline_conf().set_image_pull_policy("IfNotPresent")  # Always, Never, IfNotPresent.

We concluded that the parallelism parameter provided to the ParallelFor method did nothing as we could neither reduce or increase the parallelism using this method. Using the set_parallelism() method allowed us to reduce the parallelism of the ParallelFor but we could not increase it beyond the modest level.

it is important for us to be able to perform many tasks in parallel in order to keep the elapsed time to a sensible level but we cannot work out what to do to get close to our desired level. This may be a bug or it may be something that we are doing that we could address by changing our approach but there is no easy way to find out. It may be, for example, that our access to the Internet to pull in files reduces our ability to reach maximum parallelism. Or perhaps saving interim values in EFS has an impact. However, we have tried using a very simple container in place of get_file() and we still did not reach the requested level of parallelism (although parallelism did improve).

In summary, when setting parallelism to 50 we expected to see 50 pods spawned in quick succession to start each of the 'iterations'. We did not see this and have no idea how we can achieve it.

Environment

surajkota commented 1 year ago

I suspect it may be a problem with how the pipeline is authored, after compilation maybe the parameters are not set in right template. We will try to reproduce this

ananth102 commented 1 year ago

Which version of the Pipelines SDK are you using? And do the pods get stuck Pending state or are they not scheduled. And what are the CPU/Memory requests of the pipeline pods being created? Ex: kubectl describe node <>

<Other things>
  Namespace                   Name                                             CPU Requests  CPU Limits  Memory Requests  Memory Limits  Age
  ---------                   ----                                             ------------  ----------  ---------------  -------------  ---
  sample                 big-pipeline-wt669-1462709459                    10m (0%)      500m (12%)  32Mi (0%)        512Mi (3%)     5s
ananth102 commented 1 year ago

Update, I was able to replicate this on a simple pipeline on the v1 KFP sdk. The pods do not appear instantly when I try to list all of them with a kubectl get pods -A. And they dont even have a status of unschedulable.

import kfp
import kfp.dsl as dsl
from kfp.components import func_to_container_op, InputPath, OutputPath

@func_to_container_op
# The "text" input is untyped so that any data can be printed
def print_text(text_path: InputPath()):
    '''Print text'''
    with open(text_path, 'r') as reader:
        for line in reader:
            print(line, end='')

@func_to_container_op
def storelist(numbers_path: OutputPath(list), count: int = 100):
    import json
    with open(numbers_path, 'w') as writer:
        arr = [i for i in range(count)]
        writer.write(json.dumps(arr))

@dsl.pipeline(name="Dynamic Pipeline", description="Testing the limits of the kubeflow frontend")
def big_pipeline(runs: int = 100):
    expansion = storelist(runs)
    with kfp.dsl.ParallelFor(expansion.output) as param:
        print_text(param)