argoproj / argo-workflows

Workflow Engine for Kubernetes
https://argo-workflows.readthedocs.io/
Apache License 2.0
15.06k stars 3.2k forks source link

Support larger workflows #7121

Open jli opened 3 years ago

jli commented 3 years ago

Summary

It seems there's a limit on the size of Argo workflows (in terms of number of tasks and task specification size). Argo compresses the workflow status.nodes field into status.compressedNodes, but the spec.templates field can still become very large if there are a lot of tasks and/or if the tasks have large specs.

https://argoproj.github.io/argo-workflows/offloading-large-workflows/ seems to only apply to the status.nodes field, not to spec.templates.

Would it be possible for the spec.templates field to also be gzip+b64 encoded, like status.nodes? I think this would allow for much larger workflows. Offloading to Postgres would be another option to scale even bigger, though that may not be necessary.

Use Cases

I'm using Argo via Kubeflow Pipelines, and a workflow with ~200 tasks exceeds the limit of what KFP/Argo can run: I get the error offload node status is not supported, more details here. KFP adds some metadata annotations which makes each task template spec kind of large (the KFP devs suggested a hack here to make this a bit smaller).

Other users have run into this problem as well. Example from the CNCF slack on 2021-10-29: https://cloud-native.slack.com/archives/C01QW9QSSSK/p1635517397302400


Message from the maintainers:

Impacted by this bug? Give it a ๐Ÿ‘. We prioritise the issues with the most ๐Ÿ‘.

alexec commented 3 years ago

Interesting. Thoughts:

  1. Shrink the size of the templates in the short term by not adding so much meta-data.
  2. We could provide spec.compressedTemplate that could be uncompressed at runtime.
  3. We could fully offload the whole spec to external storage and leave just a reference to it in the spec.

Option 2 would not be too hard and could buy time for option 3.

sarabala1979 commented 3 years ago

The above requirement can be done by Workflows of workflow. This is way load and the process can be distributed across multiple workflows.

jli commented 3 years ago

@sarabala1979 thanks, though unfortunately we don't have access to workflow of workflows via KFP. But also, I think it would be nice even for direct Argo users to be able to create a much larger single workflow without figuring out how to break up their workflow into a workflow-of-workflows setup.

@alexec I'm not that familiar with the Argo code base. Do you think it would be tricky for an external contributor to implement option 2? I've briefly skimmed code related to the status.nodes compression - would a similar pattern work for spec.templates?

alexec commented 3 years ago

I've briefly skimmed code related to the status.nodes compression - would a similar pattern work for spec.templates?

Yes. I wonder if we only need compression? That will be much easier to implement. We/you also need to think about interplay between offloading of nodes and offloading of templates.

simster7 commented 3 years ago

Also you could have complicated templates stored on WorkflowTemplates and only referenced at runtime?

terrytangyuan commented 3 years ago

We implemented an optimizer (part of the work we did for Couler) that could convert a large workflow to use the "workflow of workflows" pattern automatically based on algorithms. Unfortunately, it's not open-sourced yet. I can reach out to see if that can be open-sourced at some point (potentially bring it to Argo code base?).

sarabala1979 commented 3 years ago

@alexec compress/offload template will not work because the first client should create workflow object in k8s. k8s will reject if the object is greater than 1 mb. OffloadNodestatus is processed output that will generate in runtime that's why the offload feature is easy to implement.

They could create workflowtemplates refer it as templateRef or workflowtemplateRef that will save a lot of space.

One improvement we can do in workflowcontroller is compress storedTemplate and storeWorkflowSpec

isubasinghe commented 2 years ago

Just a comment just to generate discussion. Feel free to ignore if assumptions are wrong.

Assumptions:

Pros:

Cons:

Suggestion: You could alternatively deconstruct the data into a set of key/value pairs on write and on read reconstruct back the original data. This is commonly done in newer NoSQL style DBs that use rocksdb for example so this is a proven viable solution. You would need to do this as a transaction in etcd.

I think the code change could be fairly minimal as long as the retrieval and storage of data from etcd is fairly localised in the source code.

Imo given my assumptions being correct, this is the ideal solution in a perfect world with infinite time and infinite resources but obviously we do not live in such a world but I thought that it is still worth to put this comment up here because if someone thinks this effort is worth it they could more seriously consider it.

I think storing in a database is far easier and the better (more pragmatic) route. But I probably don't have enough experience to make that call so a more experienced dev's opinion would be appreciated.

@alexec

isubasinghe commented 2 years ago

I have done the above process for processing JSON into key value pairs in Rust. That codebase is not open source however I would be happy to share the key value deconstruction part if that helps.

pogacsa commented 2 years ago

Has anything been done to solve this instead of us doing workarounds? Could this limit be set higher?

agilgur5 commented 9 months ago

Could this limit be set higher?

Per above and the docs, this is a hard limit set by etcd, not Argo

isubasinghe commented 9 months ago

As I stated before, we could potentially deconstruct the workflows into multiple key/value pairs @agilgur5, it is kind of an ugly solution but one that will work with no issues (well we might run into API rate limiting if we allow any size).

This is how databases are built on top of KV stores, and I've personally done this before for a little DB I built on top of rocksdb

agilgur5 commented 9 months ago

I was responding to the upvoted comment above for clarity, since this is not a limit set by Argo and so is not something that can be "set higher".

It requires a workaround in user-land, such as Workflows of Workflows, or in the Controller, as you stated

As I stated before, we could potentially deconstruct the workflows into multiple key/value pairs @agilgur5, it is kind of an ugly solution but one that will work with no issues (well we might run into API rate limiting if we allow any size).

Correct me if I'm wrong, but I think even this workaround will hit a limitation: the initial resource passed to k8s / etcd -- before the Controller has had a chance to process it -- cannot exceed the etcd 1MB limit. I don't think there's a way of working around that in the Controller; one way or another, at a large enough size eventually a Workflow of Workflows will be necessary

isubasinghe commented 9 months ago

You are correct that the this will still be limited by the initial resource passed to k8s.

It definitely isn't a nice solution (arguably, it is a hack) I was more just giving an possible option if we want to push the limits a bit more.

agilgur5 commented 9 months ago

It definitely isn't a nice solution (arguably, it is a hack) I was more just giving an possible option if we want to push the limits a bit more.

Since it won't push it that much further, I would probably say that this is not worth adding such a hack / workaround.

Some more compression could potentially be added as Bala mentioned above to push some of the generated pieces in our control.

Otherwise, the best DX may be to have a documented user-land optimizer for Workflows of Workflows as Terry suggested. We could make that part of the CLI

isubasinghe commented 9 months ago

@agilgur5 Makes sense, I do like Terry's suggestion the best.

agilgur5 commented 9 months ago

we could potentially deconstruct the workflows into multiple key/value pairs @agilgur5, it is kind of an ugly solution but one that will work with no issues (well we might run into API rate limiting if we allow any size).

I did think of a slightly different use-case for this -- this could be a different option for node status offloading that doesn't require a separate DB. That's actually probably how I would've implemented status offloading initially as splitting into multiple keys is a more natural progression, although a DB may still eventually be necessary when the status gets too large and the key splitting algorithm can no longer handle it.

Joibel commented 9 months ago

How would key splitting work given that you cannot guarantee atomic writes to multiple objects?

You may end up writing some status changes but then failing to write others because of resourceVersion?

I guess you could have an index object which could be used to verify integrity (with the necessity of keeping old indexes and data around) or some kind of journaled status.

I had assumed this complexity was why offloading to a databases was implemented instead.

isubasinghe commented 9 months ago

How would key splitting work given that you cannot guarantee atomic writes to multiple objects?

You may end up writing some status changes but then failing to write others because of resourceVersion?

I guess you could have an index object which could be used to verify integrity (with the necessity of keeping old indexes and data around) or some kind of journaled status.

I had assumed this complexity was why offloading to a databases was implemented instead.

I think if you are willing to use etcd directly, you should be able to do atomic writes/reads to etcd, etcd does have transactions

isubasinghe commented 9 months ago

we could potentially deconstruct the workflows into multiple key/value pairs @agilgur5, it is kind of an ugly solution but one that will work with no issues (well we might run into API rate limiting if we allow any size).

I did think of a slightly different use-case for this -- this could be a different option for node status offloading that doesn't require a separate DB. That's actually probably how I would've implemented status offloading initially as splitting into multiple keys is a more natural progression, although a DB may still eventually be necessary when the status gets too large and the key splitting algorithm can no longer handle it.

Honestly even splitting the status and the workflow itself, would probably roughly double the max size.

Joibel commented 9 months ago

We cannot use kubernetes own etcd directly, there is no access to it in most managed kubernetes clusters. If we're going to install etcd as a service in the cluster we may as well use something more suited to this problem.

agilgur5 commented 9 months ago

I think if you are willing to use etcd directly, you should be able to do atomic writes/reads to etcd, etcd does have transactions

I was thinking about this too, as most key-value stores have transactions. Although I was thinking of avoiding using etcd directly if possible.

We cannot use kubernetes own etcd directly, there is no access to it in most managed kubernetes clusters. If we're going to install etcd as a service in the cluster we may as well use something more suited to this problem.

I usually run self-managed kOps clusters, so I forgot about that, good point. But there are already tools that use a secondary etcd in the ecosystem, such as Cilium, in order to offload some of the load from the central k8s API server / etcd. kOps supports running extra etcd instances for this purpose as well.

I don't think we necessarily need to do that, though that is an interesting option. If we already support a (SQL) DB though, not much reason to implement another alternative offload store.

How would key splitting work given that you cannot guarantee atomic writes to multiple objects?

You may end up writing some status changes but then failing to write others because of resourceVersion?

This is a good point overall, though as we already have the status/compressedNodes subresource I assume this is already being done in some places. We also already lock each Workflow, so we can do atomicity ourselves, though I agree it would be less complex to avoid having to write that ourselves. I would suspect the subresource logic is either already doing that or not even considering atomicity currently ๐Ÿ˜…

I had assumed this complexity was why offloading to a databases was implemented instead.

This is why I put such importance on historical context ๐Ÿ™‚ I'm not sure, we should probably check the PR history and see if it was discussed in the repo

alexec commented 9 months ago

Couple of notes here. The way that we offload workflows today is that the node status ($.status.nodes) which has always been the largest component for a workflow is either (a) compressed if it is small enough or (b) offloaded if too large.

The benefit of compression is that you donโ€™t need to operate and manage a new database. Even thought garbage collection and retention of that database is managed by the controller, setting up an external database is onerous, and you can no longer uses kubectl to directly interacte with Argo Workflows, you must use the argo cli and you must use the API and UI, so you must deploy all that too.

I.e. large workflows create operational overhead many users donโ€™t want.

Offloading is nothing more than saving the data and replacing it with a reference. The reference itself is a deterministic hash of the data. This allows it to be robustness, even if there is a network error when writing the status (i.e. it does an upsert).

By having a reference you keep the benefits of Kubernetes, when the reference changes, the workflow changes, and you get Kubernetes events. Watches continue to work.

I think the exact same pattern would work for workflow with a large volume of templates, but rather than just do the templates, do the whole workflow and just have a reference in the status field.

RyanDevlin commented 3 months ago

My team is experiencing similar issues where we are looking for a way to reduce pressure on etcd. We run extremely large-scale batch jobs using Argo Workflows, and currently these jobs are eating up 50%+ of etcd when at full scale. Ideally we'd like a mechanism to offload a big chunk of this data to a separate DB so we can run multiple jobs at once on a single cluster. Etcd consumption is currently the only scaling dimension preventing us from doing this.

It would be amazing if Argo had a feature where you could use something like DynamoDB (or any other distributed DB that has similar performance to etcd) as a backing store instead of etcd. Currently the "offloading large workflows" feature doesn't necessarily support our use case, because it only offloads if the workflows exceed the 1MB limit, which ours do not. Having a way to back Argo's state with a separate DB would unlock serious scaling capabilities where our workflows could run hundreds of thousands of Pods.

agilgur5 commented 3 months ago

Currently the "offloading large workflows" feature doesn't necessarily support our use case, because it only offloads if the workflows exceed the 1MB limit, which ours do not.

There is an existing, separate feature to offload in all cases, the environment variable ALWAYS_OFFLOAD_NODE_STATUS. There is a trade-off in performance when doing that though (since you're constantly reading and writing to a DB), see also #13107

RyanDevlin commented 3 months ago

@agilgur5 Interesting thanks for that pointer! Do you have any anecdotal evidence of the performance with offloading? We'll obviously have to test it with our own setup, but should I expect significantly degraded performance when using that flag?

Also I'm curious, what specifically is the performance bottleneck when offloading? Shouldn't Argo be able to write out state to a separate DB with similar performance as Etcd? Since etcd uses Raft it's not as performant as similar distributed key-value DBs, so I'm having trouble visualizing why the offloading feature is much slower than natively using etcd.

agilgur5 commented 3 months ago

Shouldn't Argo be able to write out state to a separate DB with similar performance as Etcd?

There's two main differences:

  1. Argo is still writing to the k8s api-server/etcd, just substantially less for the node status specifically. That means that offloading adds an additional DB. That does get quite nuanced as to when that happens and how performant each DB is respectively with large JSON payloads (see also #13290), so yes I would suggest testing the performance before/after.
  2. The network latency also depends on where you host each; k8s api-server typically has very close proximity in a cluster.

Also this is better discussed in #13107 as this addresses that exact question and is a bit adjacent to this issue.