kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.61k stars 1.63k forks source link

[Limitation] SDK Complier creates bigger YAML exceeding Kubeflow limits quickly #4170

Open radcheb opened 4 years ago

radcheb commented 4 years ago

What steps did you take:

This problem is only noticed in 1.0.0-rc.3, it does not exist in 0.5.1. Given the same pipeline, 1.0.0-rc.3 Complier creates a YAML file far bigger than the same created with 0.5.1 Compiler. Thus, this may be a problem for huge workflows, that previously run successfully and can no more be run if compiled with new Compiler.

What happened:

The pipeline compiled with 1.0.0-rc.3 fails to run with the following error: Workflow is longer than maximum allowed size. Size=1048602 image

What did you expect to happen:

The pipeline must run successfully

Environment:

Kubeflow 1.0 Build commit: 743746b python 3.7 + kfp 1.0.0-rc.3

How did you deploy Kubeflow Pipelines (KFP)?

Anything else you would like to add: : Reproducing the limit

For this same pipeline:

from kfp import Client
from kfp.components import func_to_container_op
from kfp.dsl import pipeline

def some_func(i: int) -> str:
    msg = f"""{i}
    This is a huge function, with a lot of code.
    Lorem Ipsum is simply dummy text of the printing and typesetting industry. 
    Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, 
    when an unknown printer took a galley of type and scrambled it to make a type specimen book. 
    It has survived not only five centuries, but also the leap into electronic typesetting, 
    remaining essentially unchanged. It was popularised in the 1960s with the release of Letraset sheets 
    containing Lorem Ipsum passages, and more recently with desktop publishing software like Aldus PageMaker 
    including versions of Lorem Ipsum.
    """
    return msg

@pipeline(name="huge pipeline")
def test_pipeline():

    component = func_to_container_op(func=some_func, base_image="library/python:3.6")

    previous = None
    for i in range(239):
        op = component(i=i)
        if previous:
            op.after(previous)
        previous = op

Compiler().compile(test_pipeline, package_path="toto.yaml")

If compiled with kfp 0.5.1 it produces a file of size 625kb while if compiled with kfp 1.0.0-rc.3 it produces a file of size 1129kb and thus fails to run in cluster.

For kfp 0.5.1 we can increase the size of this example pipeline up to 438 component. At 439 it fails to run (exceeded workflow size limit). While with 1.0.0-rc.3 this limit decreases to 239 component because of additional YAML size.

I am not sure whether it's a bug, but it's a huge limitation for complex training and data preparation workflows.

/area sdk

Bobgy commented 4 years ago

/assign @Ark-kun /priority p0

Ark-kun commented 4 years ago

Ideally, this pipeline compiled from that code should be very small since it only needs a single template. But with the current pipeline, each task has its own template, increasing the workflow size.

How limiting is this for your scenarios? How do you create that many tasks? Do you use a loop like in the description? It's possible to use the more powerful dsl.ParallelFor loop or recursive loop using @graph_component. Those do not result in size explosion. Check https://github.com/kubeflow/pipelines/blob/master/samples/core/loop_parallelism/loop_parallelism.py https://github.com/kubeflow/pipelines/blob/2268ddd/components/XGBoost/_samples/recursive_training.py

radcheb commented 4 years ago

Thanks @Ark-kun for your quick reply.

Our real pipeline is mush richer with a mix of data preapration, training and model evaluation. It uses multiple sources of data and thus we run the same compnent (defined ounce) for each source of data. The whole pipeline can get up to more than 100 tasks.

In the example I gave, I used the for loop only to reproduce the issue. In our real case, the pipeline can no more run on cluster with the same error if compiled with 1.0.0-rc.3.

Ark-kun commented 4 years ago

The whole pipeline can get up to more than 100 tasks.

That's great to hear. The real value of the Pipelines starts to manifest when the pipelines are bigger.

we run the same compnent (defined ounce) for each source of data

Roughly, how many instances do you have of same components?

In our real case, the pipeline can no more run on cluster with the same error if compiled with 1.0.0-rc.3.

Sorry to hear about the problem. There is a workaround to squeeze the size a bit (although you'll be losing some features like artifact types in the Metadata UX)

import yaml

with open('toto.yaml') as f:
    workflow = yaml.load(f)
for template in workflow['spec']['templates']:
    del template.setdefault('metadata', {}).setdefault('annotations', {})['pipelines.kubeflow.org/component_spec']
with open('toto_fixed.yaml') as f:
    yaml.dump(workflow, f)
Ark-kun commented 4 years ago

Everyone else independently affected by this issue, please speak up. I'd like to know about your situation.

Bobgy commented 4 years ago

Removing from 1.0 project because this is intended behavior. If there's a need to change, we can fix it in later releases.

nikhil-dce commented 4 years ago

We are getting this error too. We are using kfp 0.5.1. The yaml created is 1.3 MB in size, and we have 400+ components in the pipeline. We are working on a benchmarking tool that has multiple datasets and sub-pipelines in the graph. Is there a plan to allow larger pipelines than currently allowed? This seems like a common use case.

Ark-kun commented 4 years ago

we have 400+ components in the pipeline

Are they different components or component instances?

Is there a plan to allow larger pipelines than currently allowed? This seems like a common use case.

Unfortunately, this is a limitation of Kubernetes itself (and partially Argo).

There is a limit on the size of any Kubernetes object. It was 512KiB some time ago, then 1MiB, and now 1.5MiB.

It might be possible to increase the limit though: https://github.com/etcd-io/etcd/blob/master/Documentation/dev-guide/limit.md#request-size-limit

Ark-kun commented 4 years ago

@radcheb Are you currently blocked by this issue?

radcheb commented 4 years ago

@Ark-kun We are still using kfp 0.5.1 for production pipelines. However, this issue is blocking us from migrating to 1.0.0. We didn't yet try your solution since most of the time we use directly create_run_from_pipeline_func, I will get back to you soon for this.

radcheb commented 4 years ago

@Ark-kun we actually implemented and tested your workaround squeezing pipeline size and it has been working with no problems. Thus, we upgraded to 1.0.0 and it's got validated in pre-production. Thanks again for the solution 👏

@nikhil-dce you could use this workaround to reduce yaml size after compilation:

import yaml

with open("big_pipeline.yaml") as f:
    workflow = yaml.load(f)
for template in workflow['spec']['templates']:
    annotations = template.setdefault('metadata', {}).setdefault('annotations', {})
    if 'pipelines.kubeflow.org/component_spec' in annotations:
        del annotations['pipelines.kubeflow.org/component_spec']
with open("smaller_pipeline.yaml", "w") as f:
    yaml.dump(workflow, f)
radcheb commented 4 years ago

@Ark-kun shall we close this issue?

brt-timseries commented 4 years ago

The hack workaround is appreciated. I think there should be an option to cull the metadata->annotations->component_spec in the kfp compiler.

ghost commented 4 years ago

We're hitting the problem in kubeflow 1.0. The amount of data coming into the system is variable and so the DAG grows with it. We're on GKE and according to this answer we're stuck:

... a suggestion to try the --max-request-bytes flag but it would have no effect on a GKE cluster because you don't have such permission on master node.

Is it possible for kubeflow to break up the resources into smaller chunks? Otherwise this is quite limiting

stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

Bobgy commented 3 years ago

/lifecycle frozen

susan-shu-c commented 3 years ago

Encountering this - following and trying out suggestions above

Situation is similar to several above. Using multiple of similar components. Full complete run requires 1000s of components built (dynamically) of those core ~10 component templates.

Been working with setting limits/requests to deal with OOM issues so far, but still encountering this Workflow is longer than maximum allowed size.

susan-shu-c commented 3 years ago

Might be a silly question, but where are you putting this workaround? @Ark-kun @radcheb I was trying it in the pipeline script after Compiler().compile(test_pipeline, package_path="toto.yaml") since it makes sense there... the file is created after this line

The whole pipeline can get up to more than 100 tasks.

That's great to hear. The real value of the Pipelines starts to manifest when the pipelines are bigger.

we run the same compnent (defined ounce) for each source of data

Roughly, how many instances do you have of same components?

In our real case, the pipeline can no more run on cluster with the same error if compiled with 1.0.0-rc.3.

Sorry to hear about the problem. There is a workaround to squeeze the size a bit (although you'll be losing some features like artifact types in the Metadata UX)

import yaml

with open('toto.yaml') as f:
    workflow = yaml.load(f)
for template in workflow['spec']['templates']:
    del template.setdefault('metadata', {}).setdefault('annotations', {})['pipelines.kubeflow.org/component_spec']
with open('toto_fixed.yaml') as f:
    yaml.dump(workflow, f)
Ark-kun commented 3 years ago

Might be a silly question, but where are you putting this workaround? @Ark-kun @radcheb I was trying it in the pipeline script after Compiler().compile(test_pipeline, package_path="toto.yaml") since it makes sense there... the file is created after this line

It's expected to be placed after the Compiler().compile(test_pipeline, package_path="toto.yaml") line. Does it not work?

jli commented 3 years ago

My team is also affected by this issue. We're on KFP 1.5.1. Workflows with ~200 tasks run into the size limit. Interestingly, we seem to be running into this issue at the Argo level.

The workflow is able to start executing, but after a few task completions, it stops. The KFP UI has an error at the top: Error: found errors when executing run: <run id>. Click Details for more information. And the details box shows:

An error occurred
offload node status is not supported

(screenshot attached below)

I think what's happening is:


Screenshot of error when running our large workflow:

KFP UI error message
jli commented 3 years ago

FWIW, I wrote a script to compute how much size is taken by all the fields: kfp_spec_field_sizes.py. I ran it on one of our large pipelines:

--> descending size (top 20):
(total)                                                                             54,702 (100.0%)
spec                                                                                54,289 (99.2%)
spec.templates                                                                      54,097 (98.9%)
spec.templates.[]                                                                   54,080 (98.9%)
spec.templates.[].metadata                                                          36,451 (66.6%)
spec.templates.[].metadata.annotations                                              30,412 (55.6%)
spec.templates.[].metadata.annotations.pipelines.kubeflow.org/component_spec        17,832 (32.6%)
spec.templates.[].container                                                         12,741 (23.3%)
spec.templates.[].container.command                                                  9,258 (16.9%)
spec.templates.[].container.command.[]                                               9,022 (16.5%)
spec.templates.[].metadata.annotations.pipelines.kubeflow.org/arguments.parameters   8,123 (14.8%)
spec.templates.[].metadata.labels                                                    5,675 (10.4%)
spec.templates.[].metadata.labels.our_run_id                                          2,632 (4.8%)
spec.templates.[].outputs                                                            1,216 (2.2%)
spec.templates.[].metadata.annotations.pipelines.kubeflow.org/component_ref          1,176 (2.1%)
spec.templates.[].outputs.artifacts                                                  1,062 (1.9%)
spec.templates.[].outputs.artifacts.[]                                               1,038 (1.9%)
spec.templates.[].dag                                                                  915 (1.7%)
spec.templates.[].retryStrategy                                                        896 (1.6%)
spec.templates.[].dag.tasks                                                            895 (1.6%)

The suggested hack above to delete the component_spec annotation would reduce the total space by ~1/3! We'll try this out and see how much it helps.

jli commented 3 years ago

There is a workaround to squeeze the size a bit (although you'll be losing some features like artifact types in the Metadata UX)

@Ark-kun could you say more on what features we lose if we remove the pipelines.kubeflow.org/component_spec annotation?

jli commented 2 years ago

tl;dr One solution could be if KFP supported Argo's "workflow of workflows" feature, to allow arbitrarily sized workflows. Is there any chance KFP would adopt this? @Ark-kun

some more details:

pogacsa commented 2 years ago

Hi all, I just stumbled upon this error. Are you working on a solution, or is it expected, that we use some kind of a workaround?

jli commented 2 years ago

@pogacsa there hasn't been any update on this for a while it seems. fwiw, I worked around this by doing a variety of hacks to post-process the YAML generated by the KFP compiler.

This is an easy one: https://github.com/kubeflow/pipelines/issues/4170#issuecomment-655764762 I also removed the annotations pipelines.kubeflow.org/component_ref and pipelines.kubeflow.org/arguments.parameters.

Another big change my team made was to factor out common fields across all our templates into the templateDefaults field.

pogacsa commented 2 years ago

@pogacsa there hasn't been any update on this for a while it seems. fwiw, I worked around this by doing a variety of hacks to post-process the YAML generated by the KFP compiler.

This is an easy one: #4170 (comment) I also removed the annotations pipelines.kubeflow.org/component_ref and pipelines.kubeflow.org/arguments.parameters.

Another big change my team made was to factor out common fields across all our templates into the templateDefaults field.

We found another workaround when we figured out why the yaml gets so large: If we use "for" cycles in the pipeline definition, everything within is repeated x times in the yaml. This also explains why we can't use variables as basis for these cycles. Workaround: use dsl.Parallelfor instead and use the input-output variables of the components. This way the cycle is generated at runtime, will not appear in the yaml. (Might be something to correct in Kubeflow though... not to repeat code x times in the yaml...) Do not use "for". Ever. (One problem with dsl.Parallelfor is though, that you can't embed one within another, so you first have to generate a permutation list...) Might be obvious to everyone else, for me it was new. :)

chensun commented 2 years ago

/cc @chensun /cc @connor-mccarthy

tClelford commented 1 year ago

tl;dr One solution could be if KFP supported Argo's "workflow of workflows" feature, to allow arbitrarily sized workflows. Is there any chance KFP would adopt this?

My team are using KFP for running very large pipelines (>20k pods) and we've got a solution for this. I'm in the process of getting approval to open-source it

jli commented 1 year ago

My team are using KFP for running very large pipelines (>20k pods) and we've got a solution for this. I'm in the process of getting approval to open-source it

@tClelford Could you share some details about how your solution works? Do you post-process the KFP YAML and create a few Argo workflow template objects, or make use of Argo's workflow-of-workflows feature?

connor-mccarthy commented 1 year ago

In KFP v2 (currently in beta), you can now use KFP pipelines [example] objects as components (Argo workflow of workflows pattern) in combination with dsl.ParallelFor [docs]. Would this help?

tClelford commented 1 year ago

My team are using KFP for running very large pipelines (>20k pods) and we've got a solution for this. I'm in the process of getting approval to open-source it

@tClelford Could you share some details about how your solution works? Do you post-process the KFP YAML and create a few Argo workflow template objects, or make use of Argo's workflow-of-workflows feature?

Hello @jli! Sorry for the delayed reply, I've been on paternity leave :)

To be more accurate, we aren't using argo workflow-of-workflows in KFP, rather we've implemented the same pattern to get the same results.

Basically, we split our massive pipeline into several child pipelines and wrote a "pipeline-runner" KFP component that uses the kfp.Client class to submit and track runs of these child pipelines. We then have a parent pipeline that orchestrates the pipeline-runner components. The pipeline-runner doesn't return until the child pipeline has completed (or failed) so you can keep your workflow's structure intact.

The bit we want to open-source is the pipeline-runner component as it knows nothing about the content of the pipelines it's running.

There are a couple of caveats/limitations to be aware of before you try using this to solve your kfp scale problem.

  1. This is the main one - this doesn't allow for using the results of one child pipeline in another. Not a problem for us as our jobs currently write their results to the DB 🤮 but will be a problem for anyone using KFP 'properly'. I've got an idea of how to solve this by using volume mounts and passing around the parent run IDs but we haven't needed it so I haven't tried it.
  2. You'll find that there are limits on how many pods KFP can track concurrently, regardless of if they're in different pipelines. If your job is a big bad fan-out like mine then you'll want to limit how many child pipelines run concurrently.

Hope this helps, give me a shout if you want more detail or if you'd like us to open-source our implementation.

tClelford commented 1 year ago

In KFP v2 (currently in beta), you can now use KFP pipelines [example] objects as components (Argo workflow of workflows pattern) in combination with dsl.ParallelFor [docs]. Would this help?

Maybe :) I think it depends on whether KFP treats them as different pipelines under the hood. What I've observed is that it's the size of the generated yaml that causes it to fall over so if it's still one pipeline, probably not.

maganaluis commented 1 year ago

Has there been any additional work on this area? We have a workflow that's 460 tasks and we've tried submitting the run in both KFP V1 and KFPV2 (splitting the workflow in 2 with above feature) and we still get the same failure, likely hitting the 1.5MB limit in etcd.

Ideally the Argo team needs to de-construct the workflow before writing to etcd and send it in smaller chunks, although I'm not sure if the Kube API exposes such a functionality from a controller.

From the KFP side, there still probably some more optimizations that can be done.