nsqio / nsq

A realtime distributed messaging platform
https://nsq.io
MIT License
25k stars 2.9k forks source link

nsqd: resilience #510

Open stephensearles opened 9 years ago

stephensearles commented 9 years ago

Here's a proposal we're looking at to provide resilience on top of NSQ. This idea requires a small change to the nsqd library in that it would need to be notified when a message has been FIN'd.

EDIT: latest proposal link

mreiferson commented 9 years ago

@stephensearles thanks again for putting this proposal together, apologies for the delay in responding.

I've read through it and there are a lot of solid ideas. I think my biggest concern is that I don't think a delegate for nsqd is going to be sufficient for implementing this proposal.

I'm not sure how far you've gotten in your experiments, but I imagine that when you get to the point where state is being propagated via consensus (raft, whatever), you'll run into an issue due to the way nsqd currently handles its hybrid memory/disk message queues.

Basically, for messages that end up propagated to a remote ansqd, and are written to disk after the in-memory queue overflows, there isn't currently any way of clearing out those messages without them being read sequentially from disk, as if they were being consumed. They can't be simply "finished" like the ones that are in memory.

Fortunately, I've actually thought through this specific problem quite a bit and was already in the middle of writing up my thoughts on potential paths forward to achieving replication. It involves more significant changes, and I was hoping to complete the document before showing it to folks, but this feels like an appropriate time to have a discussion about it.

Take a look, https://github.com/mreiferson/mreiferson.github.com/blob/nsq_roadmap_1/posts/nsq_roadmap.md and let's continue to discuss.

stephensearles commented 9 years ago

Nice, I gave that a quick read through. Very cool.

So among all the things to talk about on this, this one piece does stand out to me as something I've been thinking about quite a bit: raft. Having read the paper, spent a lot of time with visualizations, and reading the code, I think we might be able to get away with not having true leaders for nsqd.

So the reason raft has a leader is so that two requests mutating the same state don't end up in conflict. It allows changes to be atomic across the whole cluster. With nsqd, we don't have state that could end up conflicted. Messages being published can't overwrite each other.

The closest we can have to conflict would be in the requeue counter. For that, though, I think we can just consider the nsqd node that owns it being able to always increment the counter. The other nsqd nodes should not attempt to send the message ... until the owner node goes down. Once that node is detected as down, an election should occur to choose the new owner of that node's messages.

I've thought about that quite a bit, and perhaps I'm missing something, but I think it allows us to forego a requirement limiting publishes to specific nodes.

A related thought: raft has increasing costs as the number of participating nodes increases, especially across the WAN. I have seen that etcd cockroachdb implements an idea (and package) for multi-raft, which allows a node to participate in (potentially overlapping) consensus groups. The idea is still stewing, but we might be able to compose that idea with the gossip idea to implement a self-optimizing resilience topology. As a whole, the cluster would analyze the network graph by latency and group nodes into raft clusters such that messages are replicated only as much as required to achieve the desired guarantee.

Anyway, thanks for the feedback. I'm still processing the whole of your comments and that doc, but I'm excited to move forward.

stephensearles commented 9 years ago

Basically, for messages that end up propagated to a remote ansqd, and are written to disk after the in-memory queue overflows, there isn't currently any way of clearing out those messages without them being read sequentially from disk, as if they were being consumed. They can't be simply "finished" like the ones that are in memory.

The "backup" messages propagated would need to be stored apart from regular messages being published through an ansqd node. For the FIN to happen well, they'd need to be stored in a map of some kind (in memory or on disk) so they can be looked up and cleared by message ID.

mreiferson commented 9 years ago

The "backup" messages propagated would need to be stored apart from regular messages being published through an ansqd node. For the FIN to happen well, they'd need to be stored in a map of some kind (in memory or on disk) so they can be looked up and cleared by message ID.

Right, exactly, I was simply pointing out that there is more design work to do.

Put more simply, I'm not fully convinced that we can "bolt on" replication given the current design and implementation of nsqd. I don't think that's a problem though - the only real requirement is that whatever path we choose going forward it doesn't eliminate use cases that nsqd currently excels at (a distributed topology of independent nodes that do not replicate or interact with each other).

I think this should be easily achievable via configuration.

mreiferson commented 9 years ago

So the reason raft has a leader is so that two requests mutating the same state don't end up in conflict. It allows changes to be atomic across the whole cluster. With nsqd, we don't have state that could end up conflicted. Messages being published can't overwrite each other.

Raft is just one way of guaranteeing the consistency of replicated data. In this case it seems like that would be an incredibly useful property, not so much around telemetry (although that's a nice side-effect), but rather around providing a guarantee that if you've published a message and received an OK - the data has been replicated to the appropriate number of nodes, in a consistent manner, to tolerate N failures. Furthermore, that the myriad edge cases when nodes return to the cluster, experience network partitions, etc. have all been validated with a more formal proof of the algorithm.

I've thought about that quite a bit, and perhaps I'm missing something, but I think it allows us to forego a requirement limiting publishes to specific nodes.

This is a crucial detail - because this affects backwards compatibility and is something that shouldn't overburden producers.

stephensearles commented 9 years ago

Another thought I've had recently: I think we might consider baking into the design a cancelable consistency requirement. In particular, there may be cases where it's much quicker to simply deliver the message than to achieve the desired redundancy for undelivered messages. That isn't to say we should wait to replicate, but that we shouldn't wait to deliver, and upon successful delivery, that we OK a message without waiting for replication. This may be something we do flexibly as an optimization.

stephensearles commented 9 years ago

So I've gone ahead and posted the work in progress implementation of the proposal I sent. The major work done so far is the delegate, storing messages grouped by an expiration and by ID, and starting to work on host recovery. In particular, this work is missing a fallback to on-disk storage and the raft implementation is incomplete.

Two observations I have about this work so far:

stephensearles commented 9 years ago

Furthermore, that the myriad edge cases when nodes return to the cluster, experience network partitions, etc. have all been validated with a more formal proof of the algorithm.

This is the big advantage of raft, but we don't need a replicated log to be as consistent as raft guarantees. I think we can trade off those things we don't need for the throughput and simplicity we do want. That said, my work so far has been to use an out-of-the-box raft implementation to get an initial proof of concept for the larger architecture before optimizing how and what I achieve with regard to consistency.

Related to that thought: I'd like to discuss narrowing down some short or short-ish term goals in light of your document. (This is motivated by a work project, after all.) Resilience is my primary goal, but I think the gossip piece is closely related enough that we should strongly consider tackling them together. Compaction and WAL seem like optimizations that improve, but are technically redundant to strong resilience, and unless performance really suffers, I think we can consider those as separate goals. What do you think?

So what this could look like broken down into slightly smaller goals:

Some of this I've done in the code I posted, some of this is vague, but I'm putting this here as a starting place.

In terms of solving problems:

What do you think? What am I missing here?

mreiferson commented 9 years ago

Another thought I've had recently: I think we might consider baking into the design a cancelable consistency requirement. In particular, there may be cases where it's much quicker to simply deliver the message than to achieve the desired redundancy for undelivered messages. That isn't to say we should wait to replicate, but that we shouldn't wait to deliver, and upon successful delivery, that we OK a message without waiting for replication. This may be something we do flexibly as an optimization.

That's a really neat idea - I dig it :+1: :100: :sparkles:

mreiferson commented 9 years ago

I've got to review your WIP implementation in detail, I haven't had the time to do that yet... but:

We need to strike a balance between allowing configuration of nsqd and writing new binaries to bolster features. Either in extreme sounds bad. That said, using nsqd as a communication layer for a higher-level resilience has seemed reasonable so far.

I've got obvious concerns about taking the approach of introducing new *nsqd* binaries into the ecosystem - just for feature X - and how that might confuse and add undesirable complexity for end users.

While I'm super happy that the nsq package is flexible enough to support things like this (granted you're in the midst of making changes :smile:), I'm still not sure how it should all end up.

I'm stewing on it...

mreiferson commented 9 years ago

Related to that thought: I'd like to discuss narrowing down some short or short-ish term goals in light of your document. (This is motivated by a work project, after all.) Resilience is my primary goal, but I think the gossip piece is closely related enough that we should strongly consider tackling them together. Compaction and WAL seem like optimizations that improve, but are technically redundant to strong resilience, and unless performance really suffers, I think we can consider those as separate goals. What do you think?

The WAL (and its compaction) provide a way for nodes to recover from failure, so no, it isn't strictly related to replication but is an orthogonal and complimentary feature.

Your list seems like a reasonable series of changes, the devil is in the details of course.

Bootstrapping gossip: I think my favorite path for that would be to allow requiring a minimum consistency level. If bootstrapping hasn't occurred with enough nodes, no message should be OKd. This seems similar to the Consul solution.

Bootstrapping gossip is a little different than bootstrapping this proposed replication strategy - think about the current use case, for example... We'd like it to be as simple as possible to run a bunch of nsqd that just gossip with each other, as an alternative option to running nsqlookupd. That's without replication. Even in that context we need to consider what those bootstrapping steps are.

It's fine for now for it to be "each node needs to specify a root node for gossip" similar to how the current state is "each node needs to specify a hard list of nsqlookupd"...

Pending message storage: what I've been working with is, so far, in-memory only, but stores messages both by ID in a map and in by expiration in a map.

My biggest concern is that it entirely defeats the purpose of the hybrid in-memory/on-disk storage mechanism. Relatedly, this is why considering the WAL may be more important than you think...

stephensearles commented 9 years ago

So in light of the gossip work, I ended up writing a re-implementation of raft that uses serf as its mode of communication and source of node-liveness information. Perhaps its not accurate to call it raft, but that's definitely what it's based on. Other than the obvious advantage of a resilient communication layer, we can adapt to use the same serf instance for the lookup and resilience tasks. While deduplicating work, that will also deduplicate the need for bootstrapping.

The documentation is here: http://godoc.org/github.com/shipwire/ansqd/internal/polity

For the WAL question, yes, I agree, there is likely value in making sure there's a persistent storage element to it. The current implementation for message storage is to group them by an "expiration time," removing messages as they get confirmed as handled by the cluster. The expiration time can be looked up periodically for any outstanding messages that need to be recovered. The implementation of that is here: https://github.com/shipwire/ansqd/blob/master/buckets.go

mreiferson commented 9 years ago

So in light of the gossip work, I ended up writing a re-implementation of raft that uses serf as its mode of communication and source of node-liveness information. Perhaps its not accurate to call it raft, but that's definitely what it's based on. Other than the obvious advantage of a resilient communication layer, we can adapt to use the same serf instance for the lookup and resilience tasks. While deduplicating work, that will also deduplicate the need for bootstrapping.

The documentation is here: http://godoc.org/github.com/shipwire/ansqd/internal/polity

I want to make sure I'm on the same page with the direction you're proposing. Polity would have the responsibility, on top of our ongoing gossip work, for "self-organizing" nodes in the cluster into various roles. We would then use these "roles" to form replication groups (of sizes 3-5, etc.)?

From there, we can layer on a replication strategy between those role-based groups, strongly consistent or otherwise?

mreiferson commented 9 years ago

The documentation is here: http://godoc.org/github.com/shipwire/ansqd/internal/polity

I don't have a better place to ask polity specific questions, so here goes...

I haven't read the raft paper in some time, but I thought its use of terms and commit index further protected it against split brain situations - how does polity handle this?

stephensearles commented 9 years ago

So polity is kind of a simplified raft. It has no replication log, so there's no commit index, and it has no single leader role, so terms don't make sense either.

You can think of polity roles as named, globally distributed locks. When one node proposes its candidacy for a role, and it is elected, no other node can take on that role until the first node is recalled. The three main operations, electing, recalling, and querying, all require a majority of all known nodes to succeed.

In a split brain situation, a few things could happen. If the serf/memberlist instance still recognizes the presence of the missing nodes from the other side of the brain, only the side with a majority could win elections. Once the missing nodes are timed out, you could start to win elections on both sides and end up with two different role-holders. Once the partition is resolved, queries will return the node that had the majority of votes.

Describing that has led me to a possible improvement: when joining two clusters (because of a partition), proactively resolve role conflicts by looking at which node has the most votes, and if there's a tie, break it by choosing the one that won at the earliest lamport time.

As far as how that works with replication: the current idea is to not use polity to replicate at all, but rather to use special topics to replicate messages (1) to peers. The FIN from a peer is a clear enough signal that it has received a message, and replication can be considered successful when a majority of peers have FIN'd it. When all is well, once the originating nsqd finishes with the message, it will notify peers about that success. The other nodes can stop worrying about that message. Now say the original node goes down. Once a timeout has been reached, some number of peer nodes (>1) will initiate an election through polity, asking for a role that is specific to recovering the node that went down. Once a peer wins that election, it is then responsible for delivering all the downed node's messages.

There isn't currently a notion of subdividing nodes into replication groups in the code I'm working on, but that seems like a reasonably good thing to do.

(1) By "message" here, I'm referring to a message as it sits on a particular channel's queue. So each nsqd can treat each "message" as a single required delivery.

stephensearles commented 9 years ago

Also, FWIW, depending on the settings you're using with memberlist, the various defaults for removing unreachable nodes range from about 3 seconds (Local), to about 5 seconds (LAN), to about 25 seconds (WAN).

mreiferson commented 9 years ago

There isn't currently a notion of subdividing nodes into replication groups in the code I'm working on, but that seems like a reasonably good thing to do.

I think this is going to be important for larger clusters, particularly to limit the number of replicated topics per-node (scalability).

As far as how that works with replication: the current idea is to not use polity to replicate at all, but rather to use special topics to replicate messages (1) to peers. The FIN from a peer is a clear enough signal that it has received a message, and replication can be considered successful when a majority of peers have FIN'd it. When all is well, once the originating nsqd finishes with the message, it will notify peers about that success. The other nodes can stop worrying about that message. Now say the original node goes down. Once a timeout has been reached, some number of peer nodes (>1) will initiate an election through polity, asking for a role that is specific to recovering the node that went down. Once a peer wins that election, it is then responsible for delivering all the downed node's messages.

What do you mean by "special topics"? What is the impact of this approach from an NSQ client's perspective?

stephensearles commented 9 years ago

Well, currently, that idea is just using a reserved topic name. I guess that might be a breaking change if there's nothing like that currently. Perhaps there's a way to do that communication in a different way. Like with a different TCP command.

shinzui commented 8 years ago

Is this something that's being worked on?

ploxiln commented 8 years ago

Yes, in #625

ashtonian commented 3 years ago

Looks like this was getting there with https://github.com/nsqio/nsq/pull/625 any updates ?

mreiferson commented 3 years ago

That branch needs quite the rebase :)

anarcher commented 3 weeks ago

Could we improve resilience using object storage (S3) like Warpstream1?