Open alexec opened 2 years ago
I think your HPA could be trivial to set-up - just based on memory usage.
Jotting down some implementation / architectural notes here. I was interested in implementing this and discussed a fair bit with @juliev0 offline -- thanks for being a sounding board and finding very relevant edge cases and counter-examples already! 🙂
For searchability purposes, I also defined this as a "sharding" proposal, since, more specifically, each replica acts as a shard in this proposal.
I'm largely in favor with the above proposal, such as using StatefulSets
, properly scoping the Informer caches, and having a separate Controller to "assign work" -- I called this a "labeler", but maybe assignmentController
is more specific. There must only be a single assignmentController
so that there are no conflicts -- it could run only on the leader, as one option.
For reference, Argo CD uses a StatefulSet
as well for its Controllers, but more on that later.
There are some edge cases that don't quite work in this proposal and idealistic improvements I'd like to make if possible as well.
As a simplification, below I will refer to "Workflows", but as Alex stated above, this may include Workflow Pods, WorkflowTemplates, ClusterWorkflowTemplates, etc. If sharding is necessary for these resources, the implementation should work largely identically.
The modulus for scale down is actually a bit too simplistic. Specifically, it will fail when the replicas scale back up. Julie asked about a specific case that turns out to be a very good counter-example: say you scale from 14 -> 2 -> 3.
14 % 2 == 0
while 14 % 3 == 2
. The remainder is not equal to the shard ID anymore.
While thinking through sharding designs, I was specifically looking at possible coordination-free designs (which includes the space of lock-less designs). The CALM theorem in particular is one of my favorite recent reads and a very useful generalization (sometimes referred to as a generalization of CAP). Here's a great interview from the professor who theorized it. It's got some great real-life examples in it that are very easy to understand as well.
The most basic gist that it boils down to is: if your function is monotonically increasing, then there is a coordination-free design that can implement it (and if there is a coordination-free design, it is not limited by CAP).
This was totally incidental, but the function in Alex's proposal above reduces down simply to workflowId % N = shardId
. N
in this case is the total number of current replicas/shards; let's call it numReplicas
moving forward. numReplicas
is not monotonically increasing, it scales up and down. Therefore this assignment requires coordination (proof by reduction).
Or, putting it another way, there must be a "relabeler" or a "reassigner" that coordinates the work assignment when replicas change. That also means that Julie's counter-example works as a proof by contradiction.
Totally unintentional proofs here, but it makes for definitive arguments!
All that being said, there are potentially other ways of formulating this problem that are monotonically increasing and therefore would not require coordination.
If we were to go with this proposal, the simplest implementation of a reassigner would not change any in progress Workflows that have already been assigned to a shard.
In the case of scale-down, it would take the no longer assigned Workflows and reassign them (i.e. change their label) to existing shards.
In the case of scale-up, it would only look at new work and try to balance that out amongst new and existing shards.
This being the simplest implementation, it does not do any rebalancing of work, which could be suboptimal in some situations. Rebalancing, however, would be significantly more complicated to implement, as shards would need to be able to drop working on any in-progress Workflows at any time. There's a lot of possible race conditions to handle there as well.
This space of different problem formulations contain some possible improvements to this proposal.
The simplest possible coordination-free design follows a classic greedy lock-less design: when a shard is able to take on more work, it checks for unassigned Workflows and attempts to assign it to itself. Whichever shard gets it first takes it, the rest move to the next one. This is very similar to lock-less designs where you attempt an operation and retry if it fails, but instead of retrying, the shards in this case just move on.
In pseudo-code:
for workflow := range unassignedWorkflows {
if err := tryAssign(workflow); err {
woc.log.WithError(err).Debugf("failed to assign Workflow %s to Shard %s", workflow.id, shardId)
continue
}
woc.log.Infof("assigned Workflow %s to Shard %s", workflow.id, shardId)
}
The main problem with the greedy assignment is how it interacts with Informers. Informers need to act on a list option (tweakListOptions
is the parameter) / selector to filter down. With a greedy approach, all shards' informers must watch all Workflows, which is not memory-efficient.
As an Informer is a cache, it's possible that maybe some cache eviction can be done to improve this. But, uh, cache eviction is notoriously a hard problem. I don't know Informers that well to know if there are potentially some easy options to get around this though.
reassignmentController
There is also potentially a mash-up that improves this. Every shard runs a greedy reassignmentController
and that one only keeps track of assignment metadata and nothing else (i.e. does not cache the actual spec or anything else). Basically an in-memory map of workflowId -> shardId
. This shrinks the cache size significantly -- if possible (I'm not entirely sure if Informers support this).
Then, the rest of the Informers on the shard just filter on the shardId
and so don't need to watch all Workflows. So the bulk of the information, specs etc, are still evenly distributed.
This is one potential alternative implementation that I've thought of so far, but I do like it quite a bit. The simplest approaches tend to be easier to maintain, scale well, and are the easiest to reason about (which makes all of the above easier as well). If it is possible to do solve the Informer issues, then I think this would be an optimal approach to take.
If not, we may want to consider other implementations.
I haven't had time to dive into it yet, but Argo CD already implements some form of sharding, and I would like to compare proposals to its current implementation.
I joined SIG Scalability to talk about it, but we haven't had time to discuss yet. Maybe next week?
Obviously, it would be great if Argo Projects could share implementations or at the very least architectures. The Server + Controller split is common b/t Argo CD and Argo Workflows already.
@agilgur5 there is already a proposed design for horizontal scaling https://github.com/timebertt/kubernetes-controller-sharding. Can you post in #argo-workflow channel to get feedback from users?
Can you post in #argo-workflow channel to get feedback from users?
I already posted in #argo-contributors, #argo-wf-contributors, and #argo-sig-scalability. I also presented at the SIG Scalability meeting last week,
This is primarily on implementation details, which are not user-facing, so I don't think it is particularly valuable to general users. We already know that users want horizontal scaling as a general feature.
@agilgur5 there is already a proposed design for horizontal scaling https://github.com/timebertt/kubernetes-controller-sharding.
Thanks for the reference! The design there follows the single assignmentController
pattern that I mentioned above, along with some details on rebalancing. It relies on leases, so a lockless pattern as I outlined above would be more optimal.
I totally agree these are implementation details. But we should understand how valuable this feature is to Argo workflow users. if nobody is using this feature, There is no to implement this feature. I can see only one star.
I can see only one star [sic]
The related issues have plenty of upvotes:
Total: ~21 upvotes
Argo CD contributors and maintainers were interested in a similar feature for CD as well.
wow great. Can you create a proposal Doc PR with the above details? Please include the following items:
What would the purpose of a duplicative doc and duplicative diagrams be? Especially as this proposal has already had duplicates as well.
The purpose of a proposal is generally for a feature that is not fleshed out. Whereas this feature has been fleshed out multiple times at this point (at least 3 times and all 3 are very similar).
That seems very unnecessary and redundant to me as well as a poor use of already very limited contributor time... Not to mention, recent proposals have received very little feedback from maintainers and have gone stale as a result (which has been actively counter-productive).
@sarabala1979 I would ask again that you read the existing information. I have pointed a few times in my responses to existing information already.
It is a shame that this initiative looks like it is going to fizzle out. It would have been a selling point for organizations looking to migrate from Jenkins to Argo Workflows.
Jenkins can't scale horizontally, so the solution to handle the load is to spin up another Jenkins Controller. Which is very burdensome. This would be the number one reason for large deployments of Jenkins to migrate.
I second @agilgur5 's motion to prioritize this as a feature on the roadmap - there seems like enough community interest, and all core contributors see the benefit. It would be great if we could organize contributor & maintainer efforts around this for a 3.6 or 3.7 release. Shall we add to our next Contributor Meeting agenda on Feb 6? cc @Joibel so he's aware as well
I agree with @ryancurrah that this would help solidify a clear advantage for using Argo for CI over tools like Jenkins that aren't as cloud-native. Argo already out performs Jenkins on K8s for most use cases, but this would bolster that position while we are seeing dozens of migrations from Jenkins to Argo for CI/CD use cases now.
Yes, let's discuss this in our next contributors call. I'll review the proposal when I get a chance as well
I'm a bit worried about reassigning workflows between controller shards in cluster.
At the point reassignment is occurring we are either scaling up or down. Up: At this point we are already busy, and someone is going to rewrite a load of shard labels causing quite a lot of API churn and the workflow controller(s) to have to reconcile their workflows. This isn't a great time to be loading things up more. Down: I'm worried we'll oscillate back up again because suddenly we've added more load to each controller for the same reason as above.
I like using labels to limit the scope of the informers. I think cache eviction can be done through the informer filterFunc in the right design. It's hard to be sure the cost of unscoped informers. I'm not sure which informers beyond the workflow informer matter - the pod informer seems the most likely to want the same scoping.
Database requirement:
Almost anyone who's at the scale of needing this would probably already have considered getting a database in play to do status offloading. We could leverage this and require a database for sharded controller co-ordination. It provide nice features for atomic global updates and the like, but I can't see a way for it to properly work with label scoping the informers.
I'm leaving these words in here in case someone else can think of a way to make this work.
Big shard numbers
If instead of us labelling workflows with a specific shard number that matches the actual number of workflow-controller shards, we run shard numbers up to a much bigger number (virtualShard
) than we expect to ever run controllers we would then reassign dynamically just by all running controllers simultaneously agreeing on a new size of the controller cluster.
for a `maxVirtualShard of 12 (I'd expect this number to be bigger, but I don't want to make an even bigger table):
Workflow | virtualShard (label) | 3 controllers | 4 controllers | 5 controllers | 6 controllers |
---|---|---|---|---|---|
0 | 0 | 0 | 0 | 0 | 0 |
1 | 1 | 1 | 1 | 1 | 1 |
2 | 2 | 2 | 2 | 2 | 2 |
3 | 3 | 0 | 3 | 3 | 3 |
4 | 4 | 1 | 0 | 4 | 4 |
5 | 5 | 2 | 1 | 0 | 5 |
6 | 6 | 0 | 2 | 1 | 0 |
7 | 7 | 1 | 3 | 2 | 1 |
8 | 8 | 2 | 0 | 3 | 2 |
9 | 9 | 0 | 1 | 4 | 3 |
10 | 10 | 1 | 2 | 0 | 4 |
11 | 11 | 2 | 3 | 1 | 5 |
12 | 0 | 0 | 0 | 0 | 0 |
13 | 1 | 1 | 1 | 1 | 1 |
14 | 2 | 2 | 2 | 2 | 2 |
15 | 3 | 0 | 3 | 3 | 3 |
16 | 4 | 1 | 0 | 4 | 4 |
17 | 5 | 2 | 1 | 0 | 5 |
18 | 6 | 0 | 2 | 1 | 0 |
19 | 7 | 1 | 3 | 2 | 1 |
20 | 8 | 2 | 0 | 3 | 2 |
21 | 9 | 0 | 1 | 4 | 3 |
22 | 10 | 1 | 2 | 0 | 4 |
23 | 11 | 2 | 3 | 1 | 5 |
24 | 0 | 0 | 0 | 0 | 0 |
25 | 1 | 1 | 1 | 1 | 1 |
26 | 2 | 2 | 2 | 2 | 2 |
27 | 3 | 0 | 3 | 3 | 3 |
Maximum virtualShard number should be a balance between number of labels needed to be watched and expected maximum controllers. For small numbers of expected maximum controllers (maxControllers
)the perfect answer is !maxControllers (maxControllers factorial).
In the above example 5 controllers is bad, because 0 and 1 run more workflows than the others, because 12 doesn't divide nicely by 12. To get it to play nice up to 6 controllers maxVirtualShards is 6! = 720. Which is quite a lot of labels (360) to watch when we're down at 2 controllers. At 1 controller we'd stop using label selection.
I don't know whether this is useful or not.
This would be the number one reason for large deployments of Jenkins to migrate.
Personally I'd be pretty surprised if this was the "number one reason". IMO there are much better reasons to migrate off Jenkins to not just Argo, but plenty of other more modern CI systems. At my previous job working at a giant enterprise with many, many Jenkins controllers, I'd say top few reasons would be:
If that enterprise were to switch to using Argo, it may very well still have multiple controllers, either for dedicated performance for certain teams / departments, or for security purposes to limit blast radius and access.
Also, to be clear, Argo does support manual sharding already, which is significantly simpler than setting up new Jenkins controllers -- it requires one flag and a label on your Workflows, that's it. Note that manual sharding would still be supported even with an automatic sharding feature, for both backward compat and for manual tuning as needed. For instance, for dedicated performance, you could have one cluster-level controller that auto shards and a specific team's controller manually sharded out. They could also be used in tandem, i.e. multiple auto shards for one manual shard label.
I agree with @ryancurrah that this would help solidify a clear advantage for using Argo for CI over tools like Jenkins that aren't as cloud-native.
To Caelan's point though, I do usually expect that cloud-native tools can scale horizontally, so the lack of this was something that surprised me with Argo.
To be fair, when it comes to Argo and other k8s projects, sometimes scaling involves a lot more than just more Pods, and requires tuning your k8s control plane quite heavily. Some folks just spin up new clusters to get around this, at which point you're back to square one on needing more controllers (and in a multi-cluster scenario, if that were natively supported a la #3523, which I am still a bit interested in and do still need to give a talk on, it is more performant and less race-heavy to run a controller in each cluster, as I did myself). It depends. I do think there is a use-case for this.
It is a shame that this initiative looks like it is going to fizzle out.
I do also still have a whole window with like 10 tabs open on this investigating some existing approaches and the informer source code, but the initiative definitely went on hiatus. We did discuss in SIG Scalability as well and I still need to write some more on that. In an incredibly brief nutshell: CD shards per cluster, but now has multiple algorithms available, which is an interesting and potentially useful touch depending on the characteristics of your usage. CD does not usually have high memory, but high CPU, so it is optimized quite differently and shards quite differently (i.e. not necessarily applicable to Workflows).
Up: At this point we are already busy, and someone is going to rewrite a load of shard labels causing quite a lot of API churn and the workflow controller(s) to have to reconcile their workflows. This isn't a great time to be loading things up more.
In the "Reassigner details" section, I wrote that to avoid complex rebalancing scenarios (which would be loaded with race conditions), the simplest version would only assign new Workflows to new shards. As I wrote there, that is ofc not ideal for all scenarios, but I think it could still be sufficient for many. Complex rebalancing schemes could be added on top if desired, potentially opt-in with algorithm selection like in CD.
Down: I'm worried we'll oscillate back up again because suddenly we've added more load to each controller for the same reason as above.
This isn't really specific to the design I wrote above -- this can happen in any horizontally scaled application, which is why configuring HPAs properly is important. In the design I wrote above, scale down will only reassign Workflows that are now missing a shard, which is the minimum that must happen in any scale down design.
I think cache eviction can be done through the informer filterFunc in the right design.
Afaik, the FilterFunc
does not re-run on existing cached resources, i.e. it does not evict. I think this would only happen during the cache rebuild if relying on the FilterFunc
alone. So I don't think this is enough, but I'm not entirely sure on that, still need to read through informer source code.
Note that cache eviction (or similar cache limitation) is only needed for a full coordination-free & memory-sharded design.
With partial coordination with only a single assignmentController
this is not needed. I would much prefer having a coordination-free design, but it is very possible without one.
Also if we're not sharding memory, then cache eviction/limitation is not needed either. I.e. it is an optimization, but a pretty important one IMO
I'm not sure which informers beyond the workflow informer matter - the pod informer seems the most likely to want the same scoping.
Yes agreed. I have a sentence "As a simplification" that ignores these for the purpose of understanding the design, but the Pod informer should only watch for Pods of Workflows that its shard is assigned to. This might not require too much code change. The WorkflowTemplate and ConfigMap informers etc are significantly smaller and can extend across shards, so I think at least the initial implementation can leave those as is.
Almost anyone who's at the scale of needing this would probably already have considered getting a database in play to do status offloading.
Not necessarily. Status offloading is particularly necessary with larger Workflows, but if you have lots of small Workflows, you wouldn't need it. One of the fleets I maintained was exactly this use case.
We could leverage this and require a database for sharded controller co-ordination.
My entire approach was to be as simple and coordination-free as possible (which are typically better designs, 9 times out of 10, from both a theoretical and practical perspective). Requiring a DB throws that out of the water big time and also is a pretty significant infra requirement as well. It also adds a new layer to the failure scenarios and could easily be a single point of failure in and of itself if not designed or scaled correctly. IMO, I don't think this adds much usefulness for the amount of complexity it requires.
I also tend to expect that cloud native tools use k8s resources/etcd effectively and don't usually need separate DBs except for certain features (status offload makes sense, though it could also be split across multiple entries instead of needing a DB as discussed in https://github.com/argoproj/argo-workflows/issues/7121#issuecomment-1901730911, which would be a nice feature as well. workflow archive is a bigger use-case for a DB IMO)
but I can't see a way for it to properly work with label scoping the informers.
This would also limit its usefulness to situations where the bottleneck is not memory
Some brief thoughts
I was assuming that FilterFunc
would re-run over all of the data because we'll be rebuilding the informers at the sharding change boundary, causing them to List
and re-evaluate everything.
How do you propose to make scaling up work with new workflows being greedily run on the new controllers - or is it just a case where we're hoping it's less busy so "wins" a race to grab them?
How will acquisition work? We attempt to assign ourselves a workflow by observing an unassigned workflow, putting our shard number onto it, writing it out and then if that works we've got control and can process it? That seems viable.
In a simplistic model we could emit a metric of number of currently controlled active workflows and have a maximum that we'll ever attempt to run, the HPA can then know when we're approaching that number and scale up.
I realize oscillating isn't specific to this scenario, but this does seem more likely than normal to cause this, hence wanting to find a way to make it most likely to work well.
I am still not sure how you're intending the actual mechanics of reassignment on scale down to work.
I was assuming that
FilterFunc
would re-run over all of the data because we'll be rebuilding the informers at the sharding change boundary, causing them toList
and re-evaluate everything.
Oh. No, I wasn't planning on that and I don't think that would happen without an explicit change. An active informer's shard number and label stays consistent, so it wouldn't rebuild unless we added additional logic having it do so (which I hadn't mentioned).
I realize oscillating isn't specific to this scenario, but this does seem more likely than normal to cause this
Were you thinking this was the case because of the above rebuild? Without that, I don't see why this would be more likely.
Rebuilds at the shard change boundary would certainly increase load, and hence would be better to avoid, but I didn't even plan for the implementation to do that.
How do you propose to make scaling up work with new workflows being greedily run on the new controllers - or is it just a case where we're hoping it's less busy so "wins" a race to grab them?
If we can run an assignment controller on each shard (which would be optimal), then yes, effectively that's how a greedy scenario would play out. There are potentially more advanced techniques we could use to help that go as planned, but I was going for the simplest approach first.
How will acquisition work? We attempt to assign ourselves a workflow by observing an unassigned workflow, putting our shard number onto it, writing it out and then if that works we've got control and can process it? That seems viable.
Correct, it's a pretty standard lock-less pattern to do that as I mentioned in the proposal.
In a simplistic model we could emit a metric of number of currently controlled active workflows and have a maximum that we'll ever attempt to run, the HPA can then know when we're approaching that number and scale up.
This is perhaps overly simplistic in that there's several variables that could determine the maximum (resources given per shard, workers given, etc). From a more manual perspective of a cluster-admin, if you observe that the active workflows metric for your configuration is causing slow down, then yes, configuring an HPA to scale as you approach that could be optimal. Although active workflows is potentially too simplistic as well, since Workflows can vary pretty greatly in size and complexity (and therefore the resources the controller has to allocate for each).
I'm less concerned about the specific implementation details of an HPA; it is primarily important that it can be used consistently per shard, which is a bit more complicated when there is a leader as that shard will have more load by definition.
I haven't followed up in this issue in a while, but I did discuss some updates with SIG Scalability last month. Detailing that and some other updates below
reassignmentController
In particular, I discussed some solutions that should make the greedy, coordination-free approach possible!
ListWatch
+ Reflector
In https://github.com/argoproj/argo-workflows/issues/12025#issuecomment-1984885984 there was an approach taken that I noticed would be applicable here. #12736 uses a raw ListWatch
with a Reflector
and only stores partial Workflow metadata in an in-memory SQLite DB. That's for the Server, not the Controller, and the SQLite DB wouldn't be relevant for the Controller, but the general approach should be workable for the reassignmentController
to only store assignment metadata for Workflows and nothing else.
Very roughly, an Informer implements something similar to this -- see also this SO answer on a similar topic.
cache.TransformFunc
I also found a slightly different and perhaps easier approach that can shrink the Informer cache while working on https://github.com/argoproj/argo-workflows/pull/11855#pullrequestreview-1987125629. I ended up doing a deep dive on Informers for those fixes, including reading through a bunch of source code. I stumbled upon this SO answer (same as the one above) that mentioned using cache.TransformFunc
to reduce memory usage of the Informer cache. The API docs even specifically notate that this is the common use-case for the function.
Here's an example I eventually found that uses a cache.TransformFunc
. I also implemented it for Semaphore ConfigMap update notifications in https://github.com/argoproj/argo-workflows/pull/11855#discussion_r1556461731.
So this same approach could be used for the reassignmentController
to only store an in-memory map of workflowId
-> shardId
.
We did discuss in SIG Scalability as well and I still need to write some more on that. In an incredibly brief nutshell: CD shards per cluster, but now has multiple algorithms available, which is an interesting and potentially useful touch depending on the characteristics of your usage. CD does not usually have high memory, but high CPU, so it is optimized quite differently and shards quite differently (i.e. not necessarily applicable to Workflows).
This was from the September SIG Scalability meeting and sharding algorithms were brought up in last month's meeting too.
It would be great if the initial implementation of this proposal allowed for different sharding algorithms in the future.
CD has a sharding package that implements a few different algorithms that are configurable, including round-robin and soon consistent hashing: https://github.com/argoproj/argo-cd/pull/16564.
Sharding algorithms would be used by the reassignmentController
.
As CD shards by cluster it knows how many clusters there are based on the kubeconfig secrets for each cluster. Workflows would be a bit different of course (see below on statefulset sharding).
Carlos and I had also discussed a bit more offline about Knative's approach to sharding and he also reached out to Matt More (Knative's creator). In summary, here's the original design doc (alt copy link). They use statefulset sharding, with leader election, but split the shard space into buckets (which is more efficient) and avoid leases ("because it guarantees that each replica wins a shard (it gives better asymptotic failure guarantees)" as you don't have the scenario where you "'hot potato' a 'key of death'). That is potentially a different algorithm that could be used, but as it doesn't handle memory usage, its implementation details are not as useful for Workflows. A fully coordination-free approach could also be statefulset based, would avoid leases, and would not need buckets.
is there any progress?
We currently run a single controller per cluster or per namespace. Per cluster we only run one and use leader election to provide HA. Per namespace you have to manually configure each namespace (big operational overhead).
Instead, we could provide a way to horizontally scale workflows. To do this, we need to consider what each controller uses K8s informers to keep data in memory:
Let's assume we use stateful sets, and each replica is identified by an int between 0 and number of replicas.
Workflows can be assigned to a replicas by labelling them. Then we can use label filter in the informers to only load their own work.
We need to cater for scale-down, so we should assign it the work of
replica % 4
too:We need to assign work to replicas. We need a special controller to do that. It can listen to workflows without an assign replica and assign them.
Workflows and templates need to be sticky assigned. This could be
hash(namespace) % N
. Not a bad first idea, but does not cater for "hot namespaces", e.g. where a large amount of work goes into one namespace. Work should be distributed round-robin.Scale-up results in work being spread about.