Closed thomastaylor312 closed 1 year ago
⚠️ Diatribe Warning ⚠️ Disclaimer: the following comes from years and years of watching production distributed systems crash and burn in production.
Let's assert that the requirement that led to the initial decision on leader election was that we need high availability and resiliency while also ensuring that we don't do the same work multiple times. I think we can do this without having to resort to leader election.
Downsides of Leader Election Leader election is one of those things that will inevitably fail even if your code is 100% flawless. It is designed to solve a very particular problem in the face of a distributed system, but it comes with drawbacks. The first and most common is split-brain syndrome. Network partition events can quickly turn a happy quorum into 3 quabbling leaders within seconds, at which point you end up issuing multiple work requests. This can even lead to a death spiral where the leaders stop doing real work while they argue about who should be leader. NATS servers have been known to do this from time to time depending on the severity of the network partition.
Another issue with distributed reconciliation that shows up more often in leader-based systems is compensating for message loss. If you have a highly available system and the leader issues a compensatory action (e.g. publishes a reconciliation action like start actor
), if that message goes unheard, the leader might be able to react by retrying, but if the cause of the message going unheard is a network partition event, you'll end up with two leaders compensating for the same failure.
Alternative Approach So regardless of whether or not we have an elected leader, we need to be sure that work dispatch is exactly once processed. There are some tricks you can pull with things like high watermark values or checking the unique ID of the inciting event, but NATS has a ton of that functionality built into its streaming system, so let's look at how we could use that to not only give us exactly once processing, but do so in a way that doesn't require us to have leader elections that is also highly tolerant of network partition events.
In this setup, first let's assume that we've created a durable stream that is listening on the wasmbus.evt.{lattice}
topic. Now every single event that comes out of the lattice is pumped into this stream and saved (even if there are no wadm processes running). Next, we have a durable queue consumer called something like wadm_events
. We then set up the wadm
process to create a pull consumer from the wasmbus_events
consumer. This then acts like the most reliable distributed work dispatch pattern - "pull when ready". Each wadm instance will make a request for the next available event. The event, once acked (processing successful), will go away. As a result of processing the event, the reconciler may kick out one or more commands that need to be issued. Compensatory commands can be published with a ttl
so that if all wadms die and then come up 10 minutes later, they won't follow stale orders, they'll re-observe the lattice and re-evaluate.
If we simply published these commands on the lattice control interface, then we now have a couple more possibilities for failure, including publication failures, timeouts, processing failures, etc. Also, what happens if the wadm
process that received the event dies before being able to properly process and emit a command?
An answer might be another durable stream, wadm_commands
. This is a stream of compensatory actions that the reconciler has decided need to be taken. Just like the swarm of wadm agents pulls lattice events by request, it also pulls these commands by request (and because the consumers are queue consumers, guarantee is only one gets it at a time).
So the flow goes something like this:
wasmCloud Host ----. . wadm Process ----. .- wadm
| | | |
wasmCloud Host ----+----< wadm events >----+ wadm Process ----+----< wadm commands >----+- wadm
| | | |
wasmCloud Host ----' ` wadm Process ----' `- wadm
This shows two durable streams, wadm events
and wadm commands
. Each wadm process then has two core handlers:
Each wadm process would then only ack the inbound event after the outbound command had been acked by the stream itself. This guarantees that failure in the pipeline will allow any other wadm process to simply pick up the event and try again, without duplicating the work.
Each wadm process is also free to subscribe to change events on the observed state so that every process has an up to date in-memory view of the state (or it can read it during event processing time for the most consistency). The key characteristic of this architecture is that the wadm process does not contain conditional logic based on whether it thinks it's a leader. All wadms behave the same no matter how many of them are running, and NATS streams and durable queue consumers provide the exactly once processing guarantee.
If wadm does need to make decisions as a leader, another trick may be to let jetstream do the leader election for you - it already does elections for nats servers ..
Pair each wadm with a nats server in a nats server cluster and let the nats servers do their leader election in the JS domain. The rust nats client can tell you if your server is the leader - leader election done!
For publishing leader decisions, like updates to global state, onto a durable stream, a node must check that it's a leader via the nats client, before publishing to the stream. If they are in a split-brain network and not the real leader, the nats client should indicate that there is no leader.
Any subscriber to the stream always gets the correct current global state
@autodidaddict I really like the streams idea. One thing to follow up on is around what @stevelr was getting at. The only state we are storing are the submitted OAM manifests, but would the solution be to have a durable stream for the put and delete commands so that only one process updates state?
You wouldn't need another stream for the put and delete commands since that's basically what NATS implements for KV under the hood. I alluded to it above but you can have the wadm processes listen to changes to the specs so that if they're caching anything internally, they can refresh it.
Because each wadm process will pull a single event off the consumer rather than receive it as a broadcast, you'll want some way to ensure that all of those processes are making decisions based on the same information. This could be done with a KV store and then having each wadm process subscribe to the changes on that bucket.
So would things change to basically put a wadm manifest directly into the KV bucket?
This actually made me realize something that applies to both the manifests and the events. There are two things that have to happen:
As far as I know a stream can't do that, correct? Would the solution be to create something like a push consumer that is used purely for updating internal caches and then actual processing of an event would be done by the streams as described above?
I just saw your updated comment. Looks like you answered the question by saying that each process that receives an event should update that state into a KV bucket? So for lattice state, would we write that to the KV too?
yeah, so there's some wiggle room here but I'm thinking we end up with:
wadm-events
- stream populated by publications to wasmbus.evt.>
, processor pulls from this, generates new state, writes new state, publishes compensating action, acks messagewadm-commands
- stream populated by publishing compensation action requests, processor pulls from this, converts into lattice controller (et al) commands, ack to removewadm-observed
- a KV bucket with the key being a lattice ID and the value is the state as written by the last event processorthis last KV bucket simplifies the architecture so that we don't have to manually synchronize the observed state across all running wadm instances - instead they pull the state relevant for the inbound event (which can be for many different lattices)
This almost sounds like a blackboard design pattern! https://en.wikipedia.org/wiki/Blackboard_(design_pattern), would that be a good tangent to draw here?
@stephenandary I hadn't seen that one before. At a high level wadm is basically a control loop
. As state changes, it performs a reconciliation that determines a gap between the current state and the desired state. Then it issues commands that are designed to close that gap to make desired == reality. This loop continues every time the current state diverges from desired.
Kubernetes controllers are fairly well known examples of control loops, which some people also call autonomous agent
s
Where NATS streams come in is to provide a durable way to guarantee responding to incoming events and guarantee that outbound commands are stored and acted upon by these autonomous agents.
@autodidaddict
I am fairly familiar with k8s and infra though it has been some time since I have worked in a production environment now. I am very excited about WADM because what it can accomplish feels like the next stage of distributed systems due to NATS as you said, and when I search for other solutions, they don't seem great.
What you are describing almost perfectly aligns with how a blackboard design pattern works, I learned about that pattern in a book called The Pragmatic Programmer, and it has fascinated me for some time.
I will break it down quickly because I think it is an excellent design pattern for this situation, and I think what was discussed above fits the model:
The blackboard model describes three main components of a non-deterministic solution:
A Blackboard allows multiple Knowledge Sources (or agents) to communicate by reading and writing requests and information to a global data store (“the blackboard” - maybe NATS in this case).
Each Knowledge Source is a participating agent that has expertise in their own field and has knowledge that is applicable to a part of the problem, i.e., the problem cannot be solved by an individual agent only. Knowledge Sources communicate strictly through a common blackboard whose contents are visible to all agents. When a problem is to be solved, candidate agents that can handle it are listed.
A Control unit is responsible for selecting among the candidate knowledge sources to assign the task to one of them (I am not sure if this would be the same as a leader, but I don’t think it is).
When we start to think about deploying applications across multiple lattices/clusters, it becomes very interesting to consider the possibility of having agents that have knowledge of data like network bandwidth, memory bandwidth, I/O bandwidth, call latency, etc that can communicate this information through to some blackboard to determine to scale, or even rightsize across the supercluster as a whole such that no individual lattices/clusters are seeing excessively high metrics in relation to their counterparts.
To accomplish this Knowledge Sources contain thresholds by which they become activated and can update the blackboard and await commands to do some execution. There are then higher-level thresholds in the Blackboard, which when met can trigger an event whereby it can list activated knowledge sources capable of solving the problem, and the Control can select the best candidate from the available list within the Blackboard to do so.
:brain: This is fascinating. Right now wadm really only has one knowledge source - the state of an observed lattice. But as you point out, it could include things like network measurements, CPU activity from hosts, latency, and so forth.
What information does that state contain? Health metrics?
It contains a state representation of the current lattice. So, all of the claims, link definitions, actors, providers, hosts, and invocation counts. Wadm currently uses the model in lattice-observer for its observation state.
FWIW, I personally think that wadm could be the way we expose current stats/metrics in the future (a la control plane), but haven't defined anything specific yet
The trick there is to not do that in a way that makes it so you can't get metrics/stats/control plane from lattices not using wadm.
I feel this loops back to the Blackboard pattern again, specifically knowledge sources. In the current WADM state, there is already a knowledge source via lattice observer.
In K8s that may just be some sidecar that sends telemetry back to WADM. I feel that the issue isn't getting telemetry, but how the telemetry is delivered or how the knowledge source can perform some action when selected to do so.
NATS clients can communicate via several different protocols, right? Could the lattice observer and other knowledge sources be implemented in a way by which they could be deployed exclusive of wadm, and if using wadm you can somehow enable the type of observers or knowledge sources you want with minimal config via the chart.
I feel wadm is more simple and clean k8s, it would be nice if that stayed true. Maybe this also leads to a community library of various knowledge sources able to be activated, inform blackboard, and perform some operation across various platforms, not just wadm (this seems to be goal already).
@autodidaddict I thought about maybe making metrics be exported on a specific topic!
Ok, back to topic now. I had an offline convo to discuss some more with @autodidaddict about his idea for using streaming. We came up with what I feel like is a good plan to move forward and mitigate any risk of too much complexity.
At the beginning of stage 1, we propose that we spike out a simple implementation of the stream plan as suggested in https://github.com/wasmCloud/wadm/issues/40#issuecomment-1440686958. It doesn't have to actually do any work, but we want to test receiving events from a stream and emitting commands (and updating state in the kv store). Then make sure the command is received and compensatory actions are done.
If for some reason that ends up being too complex or has other problems, we will fall back to the original leader election plan with two changes:
This would allow us to more easily evolve the architecture to something more complex in the future. It is also important to note that leader election should be ok in the meantime since our only chance of a network partition is with NATS and if NATS is messed up, so is the rest of the lattice.
Is this the slice? Or does the observer not perform an action, it just watches? I wonder about correlated events, especially from multiple hosts and observers.
flowchart TD
A[lattice_observer - Knowledge Source] -->| wadm_event | B(NATS stream - Blackboard )
B --> | wadm_event | C(wadm_process - Control)
C --> | wadm_action | B
B --> | wadm_action | A
Correlation is actually pretty "easy", given that everything emitted by the wasmCloud host is a cloud event with a unique ID. that can be used both to help ensure idempotency as well as correlate corrective actions taken by the controller in response to a given eent.
Yes, but there may be multiple IDs related to multiple events that deal with the same action requirement. That is why in a Blackboard model the control makes a decision. I think FIFO events would not be a good choice architecturally here (hopefully that is not what is happening, but want to confirm).
I think we're on the same page here, the autonomous control loop within wadm is the thing that makes decisions. I think the only semantic difference is we just call the NATS stream the NATS stream, whereas your pattern has a name for it (Blackboard).
Ok awesome, and that is because the NATS stream is not persistent global memory?
the NATS stream is durable and replicated and managed across a NATS cluster by the NATS server according to however we configure the stream(s) - e.g. eviction policy (ttl), number of replicas, etc. It's not global in-memory cache.
I understand that, but I don't think a Blackboard needs to be in memory on a specific host, I think it is just structured global memory which is what a stream is in a way (just in motion). In my head, I am aligning with Kinesis or Kafka. Anyways, don't think that is important for this conversation. This sounds exciting. Maybe give Nomad a run for their money lol. Thanks for helping me understand.
NATS streams can be used in place of Kafka in architecture diagrams. Nobody's suggesting the blackboard be in-memory.
re: nomad, wadm's responsibilities including scheduling providers and actors. It's not currently in scope to manage the starting and stopping of wasmCloud hosts.
That is good to know. I was planning to use terraform for setting up wasmcloud/wadm hosts if it is no longer necessary to have nomad or k8s manage some containers or pods for applications to run in. Seems more simple that way, maybe that won't work, but I don't see why using wasm shouldnt enable one to remove the need for pods and containers on a host completely. Then it would be a matter of adding knowledge sources for hosts through the wadm chart if that becomes possible. Maybe I am imagining things incorrectly, but it seems a fit for WASI.
On Thu, Mar 2, 2023, 8:20 AM Kevin Hoffman @.***> wrote:
NATS streams can be used in place of Kafka in architecture diagrams. Nobody's suggesting the blackboard be in-memory.
re: nomad, wadm's responsibilities including scheduling providers and actors. It's not currently in scope to manage the starting and stopping of wasmCloud hosts.
— Reply to this email directly, view it on GitHub https://github.com/wasmCloud/wadm/issues/40#issuecomment-1451856687, or unsubscribe https://github.com/notifications/unsubscribe-auth/ADAZJBQL77VK3FZ342BYNCLW2CNAZANCNFSM6AAAAAAVEUI6WM . You are receiving this because you were mentioned.Message ID: @.***>
@stephenandary I think this convo may have gotten into the weeds. What is important here is the high level view:
That first, simple point is the whole goal of the wadm project and this RFC is purely for why we are rewriting in Rust, and some basic design principles. I'm not trying to follow some specific "blackboard" pattern or anything else per se, just making sure you can run this in production and have work done once. Since that initial work is now front loaded, if there are any concerns around how that is designed/functioning, that conversation can be had in concrete terms around actual code.
Does that help clarify things?
Now that we have https://github.com/orgs/wasmCloud/projects/8/views/1, I am going to close this RFC
Overview
We started Wadm a long time ago, but have mostly let it sit since then as we've worked on polishing the host. However, the time has come to get this all working and productionized. This document is a proposal for rewriting and releasing wadm as a fully supported and featured part of the wasmCloud ecosystem, complete with waterslide!
Goals and Non-goals
This section covers the goals and non-goals of this scope of work. Please note that some of the items in non-goals are possible future work for wadm, but are not in scope for this work
Goals
wash up
Non-goals
Key technical decisions
Language choice
For this work, I propose we rewrite Wadm in Rust. This decision was made for the following reasons, in order of importance. As part of making this decision, two other languages (Elixir and Go) were considered. Reasons for rejecting are described in the last 2 sections
Need for contributors
Schedulers and application lifecycle management are topics that many people in the cloud native space have deep knowledge of. If we are going to be writing something that does those things for wasmCloud, then we need as many eyes on it as possible. Based on current metrics of wasmCloud repos, we have very few contributors to our Elixir code and a lot more to our Rust repos. Other projects in, or consumed by, the wasm ecosystem are in Rust and also have higher numbers of contributors. Go would have also been an excellent choice here, but the other reasons listed here precluded it. We also have multiple contributors in the wasmCloud community right know who already know Rust.
The tl;dr is that we need contributors to be successful and the current language does not attract enough people.
Static Typing and Generics
One problem we've run into consistently in our elixir projects is issues with dynamic typing. Although this can be mitigated somewhat by tools like dialyzer, it requires programmer and maintainer discipline and still doesn't catch everything. Having a static type for each type of event that will drive a system like Wadm is critical for ensuring correct behavior and avoiding panics.
In addition to the need for static typing is the preference for having generics. In my experience with writing large applications for Kubernetes in both Rust and Go, a generic type system makes interacting with the API much easier. There is less generated code and need for rolling your own type system as what happens in many large Go projects. Go has added generics, but its system is nowhere near as strong as other statically typed languages such as Rust.
Support for wasm and wasmCloud libraries
To support custom scalers, we will likely be supporting at least an actor based extension point and possibly a bare wasm module. Either way, most of the wasm libraries/runtimes out in the wild are written in Rust or have first-class bindings for Rust. Also, many of our wasmCloud libraries are written in Rust, which will allow for reuse.
Static binaries and small runtime size
This is the lowest priority reason why I am suggesting Rust, but it is still an important one. The current implementation requires bundling an Erlang VM along with the compiled Elixir. That means someone who runs Wadm as it currently stands will likely need to tune a VM. It is also larger, which leads to more space requirements on disk and longer download times.
Rust (and even Go moreso) has great support for static binaries and both run lighter than a VM without much additional tuning (if any).
Disadvantages of Rust
As with any tool choice, there are tradeoffs that occur. Below is a list of disadvantages I think will be most likely to cause friction
Why not Elixir?
One of the biggest questions here is why not continue with Elixir. By far the biggest thing we are giving up is the code around the lattice observer. However, writing this in Rust gives us the advantage of creating something that we could eventually make bindings for in any language (this also helps enable the reusability described below), although that isn't a goal here.
With that said, the previous sections cover in depth the advantage of using Rust over Elixir in this case
Why not Go?
In my comparisons, I was looking for languages that would fit the requirements above. Due to the overlap of languages used for wasm as well as languages familiar to those in the Cloud Native space, that whittled things down to Go and Rust. Go in many ways excels at many of these requirements. It is much more popular that Rust and Elixir (probably combined) and has great support for statically compiling binaries. Also, things like NATS are native to Go.
It came down to a few main concerns of why Rust would be better:
To be clear, there are other smaller reasons, but those could be considered nitpicky.
State machine vs event-driven "filtering"
One of the items I most thought about when drafting this was whether or not we should implement wadm as a true state machine. Given the simplicity of what it is trying to do, I propose we focus more on implementing an event-driven filtering approach. Essentially, a state machine approach is going to overkill for this stage of the project and the near future.
Loosely, I am calling these "Scalers" (name subject to change). Every scaler can take a list of events (that may or not be filtered) that returns a list of actions to take.
This does not mean we might iterate into a state machine style in the future (if you are curious, you can see Krator for an example of how this could be done in code) or that a scaler implementation can't use a state machine. This only means that for this first approach, we'll filter events into actions.
I have purposefully not gone into high levels of detail of what this looks like in code as it will probably be best just to try and see how this looks like as we begin to implement it. What we currently have in wadm is probably a good way of going about this (i.e. Scalers output commands)
Scalers are commutative
One important point is that these "Scalers" should be commutative (i.e. if a+b=c then b+a=c, the order of operations doesn't matter). That means when a manfest is sent to wadm, it can run through the list of supported Scalers in any order and it will return the same output.
API is NATS-based
For this first production version, we will only be supporting a NATS API. This is because pretty much all wasmCloud tooling already uses NATS mostly transparently to an every day user. We can take advantage of that same tooling to keep things simple this time around. If we were to add an HTTP API right now, we'd have to figure out authn/z and figure out how we want to handle issuing tokens. So to keep it simple, we'll focus only on NATS to start.
One very important note here is that we definitely do want an HTTP API in the future. We know that many people will want to integrate with or extend WADM and an HTTP API is the easiest way to do that. But not for this first go around (well, second, but you get my point)
Data is stored in NATS KV
This is fairly self explanatory, but we want to store everything in one place now that NATS has KV support so we don't need any additional databases. Only the manifest data is stored in NATS. Lattice state is built up by querying all hosts on startup and then responding to events
High availability
A key requirement is that wadm can be run in high availability scenarios, which at its most basic means that multiple copies can be running.
I propose that this be done with leader election. Only one wadm process will ever be performing actions. All processes can gather the state of the lattice for purposes of fast failover, but only one performs actions. This is the simplest way to gain basic HA support
Custom scheduler support
This is purely here as a design note and is not required for completing the work, but based on experiences with tools like Kubernetes and Nomad, extending with a custom scheduler is a common ask for large deployments. In code, adding a scaler will be as simple as implementing a
Scaler
trait.For most people however, I propose that custom "Scalers" be added via a wadm manifest. The application provider must have an actor that will implement a new
wasmcloud:wadm-scaler
interface, but can be as arbitrarily complex as desired. This manifest will have 2 special requirementswasmcloud.dev/scaler: <scaler-name>
Once again, this is not going to be implemented here, and will likely be another, smaller, proposal than this one
Reusability and a canonical scheduler
One key point to stress here is that wadm is meant to be the canonical scheduler for wasmCloud. This means that it is the general purpose scheduler that most people use when running wasmCloud, but no one is forced to use it. You can choose not to use it at all, or to write your own entirely custom scheduler.
To that end, I propose we publish the key functionality as a Rust crate. Much of the functionality could be used in many other applications besides a scheduler, but it can also be used to build your own if so desired. Basically we want to avoid some of the problems of what occurred in Kubernetes where everything must go through the built-in scheduler
Basic roadmap of work
Whew, we made it to the actual work! As part of thinking through these ideas, I started a branch that has implemented some of the basic building blocks like streaming events and leader election. When we actually begin work, it will be against a new
wadm_0.4
branch in the main repo until we have completed work. Please note that these are a general roadmap, I didn't want to try and give minute details here. Below is the basic overview of needed work.Stage 1
All of this work can be worked on in parallel. This is a bit shorter because we are about 40-50% there with the branch I started work on
Stage 2
This is a bit more difficult as these things must be worked on roughly in order. This work is more spike-like as it is spiking out the design of the Scaler
Scaler
trait and implement the spreadscaler type (at least the number of replicas functionality). Scalers will need to handle manifests and state changes given by events (such as if a host stops). We want to start with implementing so we can see what kind of info is neededStage 3
Stage 4
This is the "tying a bow on it" stage of work
wash up
by default with an optional flag to not use it