nsqio / nsq

A realtime distributed messaging platform
MIT License
24.72k stars 2.89k forks source link

nsq: support region/zone aware msg consumption [RFC] #1300

Open jehiah opened 3 years ago

jehiah commented 3 years ago

NSQ does not facilitate easy ways to manage a cluster that spans a geographic boundary where it's ideal to prefer same-datacenter consumption to avoid extra network bandwidth and latency. This proposal is to provide a way to influence message consumption while preserving realtime behavior and promoting distributed fault tolerant configuration

Problem Overview

Consider a nsqd instance on host h1 and h2 both in different datacenters (d1 and d2) with a message on topic t and two consumers on hosts c1 and c2 also in datacenters d1 and d2. It's desirable to have c1 primarily consume messages from h1 and to have c2 primarily consume messages from h2 while still providing some reliability so that if c1 or c2 are interrupted the other will consume all messages w/o delay.

Current Implementation

nsqd uses a Go channel memChannel for each channel and it's consumed by a "message pump" goroutine for each client. When a new message arrives it's put onto memChannel and a message pump (at random) which is available will consume that message and write to the client. If the memChannel is not available (full) the message will be written to disk and separately read from disk to put back onto memChannel.


nsqd will have three Go channels for messages internal to each "channel" - an unbuffered zoneLocal channel, and a secondary unbuffered regionLocal channel and a third global channel (buffer controlled by CLI arguments). A new message will first attempt to be written to the zoneLocal channel, then the regionLocal and finally the global channel; if all nonblocking channel writes are skipped the message will be written to disk matching existing behavior.

The IDENTIFY message will provide new parameters "topology-region" and "topology-zone" that when matched with a new nsqd parameters "--topology-region" and "--topology-zone" will allow the message pump to consume from those channels in addition to the global channel. When either client or server values are unset it will cause a client to consume only from the global priority.

Expected Behavior

At low number of clients (and generally higher message arrival rate) it is expected that some new messages will arrive while the message pump is busy, and some messages that might otherwise be handled by the zoneLocal channel will get handled by the regionLocal or global channel. This behavior may be particularly frequent when messages are read from disk to recover from a backlog as messages will arrive immediately.

This may make picking good MaxInFlight values more difficult because clients will normally only receive messages from a zoneLocal portion of their max-in-flight.


This may lower message throughput in cases where messages are not sent to a zoneLocal client.

cc: @mreiferson @ploxiln cc: #1254 tracking "cloud native" improvements

ploxiln commented 3 years ago

nsqd will have three Go channels for messages internal to each "channel" - an unbuffered zoneLocal channel, and a secondary unbuffered regionLocal channel and a third global channel (buffer controlled by CLI arguments). A new message will first attempt to be written to the zoneLocal channel, then the regionLocal and finally the global channel; if all nonblocking channel writes are skipped the message will be written to disk matching existing behavior.

This is cool, I like it. I think one of the performance costs I'm uncertain of, is when many messages attempt a zero-timeout write to the unbuffered chans, and fail, e.g. when there are currently no same-region consumers. Also, if there is any queue depth, this very rarely sends messages to same-region, because every consumer ends up always reading from the memchan right away, rarely sticking around for the instant nsqd tries to write to the unbuffered chan.

An idea to address those concerns, but makes this messier: we could add a fast-skip based on a per-channel counter of same-region consumers, and we could even try to buffer these same-region chans, and empty that buffer into the memchan when the last same-region consumer disconnects ... maybe not worth the complexity at first, maybe just the usually-zero-depth situation is valuable enough to handle.

If the memChannel is not available (full) the message will be written to disk and separately read from disk to put back onto memChannel.

This is not quite accurate, the protocolV2.messagePump() reads from both the memchan and the backend (diskqueue) chan directly.

Also interesting to note that select blocks do not offer prioritization if multiple chans are ready right away, they choose semi-randomly.

jehiah commented 3 years ago

This is cool, I like it.

Thanks for the feedback.

This is not quite accurate, the protocolV2.messagePump() reads from both the memchan and the backend (diskqueue) chan directly.

correct. Thanks for clarifying (i wrote this from memory after a light reading of the relevant code yesterday - but i've been noodling on approaches to this for a little while) - essentially the messagePump would now sometimes have more channels to select on simultaneously - at most 5.

ploxiln commented 3 years ago

essentially the messagePump would now sometimes have more channels to select on simultaneously

Yup, this is pretty nifty. There are 6 "misc state-change" chans too, so it'll increase from 8 to 11 chans in that select 😁

mreiferson commented 3 years ago

Excited to see all this! I should have some time in the next few days to review.

mreiferson commented 3 years ago

This reminds me a bit of the issues discussed in https://segment.com/blog/the-10m-engineering-problem/

These are indeed problems worth solving, but why does this need to be built into nsqd? I find it odd for it to have knowledge of concepts like "zones" and "regions" within the core internal message transport. We already have a way for a client to hint to nsqd whether to send it messages or not... RDY. What if we improved the client rather than nsqd?

In the situation you describe, the desired behavior would be for c1 to have non-zero RDY on h1 and 0 on on h2. If c1 observes a "significant" backlog on h2, then divide capacity across both.

The client would need to know:

  1. locality vis a vis nsqd it is connected to
  2. queue depth for subscribed topic(s)

(1) could probably be handled out-of-band, but might be something we could improve with metadata shared b/w nsqd, nsqlookupd, and clients. (2) is something that could also arguably be handled by querying /stats, but might be more efficient if we pushed a periodic "response" from nsqd to clients, containing various stats and metadata.

jehiah commented 3 years ago

That segment post is relevant ; thanks for linking here.

These are indeed problems worth solving, but why does this need to be built into nsqd?

Because messages are push based, only nsqd has the ability to implement this in a way that fulfills the realtime promise of nsq. A client "could" make zone aware choices, but it would have to be reactive about depth (or number of consumers?) sacrificing low latency for that choice. I think having clients poll metrics (or if we built a way to stream them) also becomes inefficient.

I find it odd for it to have knowledge of concepts like "zones" and "regions" within the core internal message transport.

I've thought about this for a long time; for a while i had in mind that a client to send a "priority" or "weight" value with it's connection and allow nsqd to weight clients. The complexities of the implementation of that never felt right - and it requires some context of where the server is.

One of the things that sways me to the simplicity of making zone/regions explicit is because it's a) conceptually simple b) common terminology c) adopted by other tools. As an example i've watched how this terminology is now standard labeling for k8s to describe region/zone

You could view region/zone as part of a more generic way to "label" nsqd nodes, but I don't think there is a compelling need for that.

We already have a way for a client to hint to nsqd whether to send it messages or not

I think client side RDY logic is already too complicated and have some thoughts on a revision, but i think this has value server side because it keeps clients simpler. This wouldn't change what clients request, just the order nsqd prioritizes clients. I think those are orthogonal concerns that might not mix easily on the client side.

mreiferson commented 3 years ago

Because messages are push based, only nsqd has the ability to implement this in a way that fulfills the realtime promise of nsq. A client "could" make zone aware choices, but it would have to be reactive about depth (or number of consumers?) sacrificing low latency for that choice. I think having clients poll metrics (or if we built a way to stream them) also becomes inefficient.

Sure, but it depends on one's definition of realtime. For some use cases it might be perfectly reasonable to tolerate latency in this scenario. Also, the latency would presumably be something that would be configurable (w/ some associated cost). Perhaps we should start by establishing some goals/constraints?

I've thought about this for a long time <...> As an example i've watched how this terminology is now standard labeling for k8s to describe region/zone

The terminology makes perfect sense, I don't have issue with the general concepts. But we've historically tried really hard to protect the core when possible, so I'm looking to exhaust other possibilities before baking this in. The question is: should "zone" and "region" be a primitive?

I think client side RDY logic is already too complicated and have some thoughts on a revision

The implementation is indeed (very) complicated, particularly in go-nsq, the concept is not (IMO).

What this proposal feels like is another form of flow control. For example, if this feature existed, an adopter would still need to factor this into the RDY configuration of their multi-region consumers, right?

jehiah commented 3 years ago

But we've historically tried really hard to protect the core when possible, so I'm looking to exhaust other possibilities before baking this in.

Same here - I think there is and should be a higher bar for changes to nsqd; Stating the obvious, but both parts of this change (topology tagging & message steering) meet that bar for me.

Sure, but it depends on one's definition of realtime. For some use cases it might be perfectly reasonable to tolerate latency in this scenario.

Certainly some uses are not latency sensitive, but a core nsqd promise is realtime and I think this change provides value under any definition of realtime.

Perhaps we should start by establishing some goals/constraints?

My goal here is for nsqd to be easy to run in a cloud environment. Zones and regions are part of building an efficient and fault tolerant setup, but that trade off has so far been at the expense of latency (cross datacenter communication) and cost. The goal here is to preserve the fault tolerate nature of a cross-zone or cross-region configuration while avoiding the latency and cost impacts. Prefer same-zone and same-region message handling without sacrificing any aspect of the distributed independent nature of nsqd does that. Is there part of that i can cover better in the PR description, or is your question aimed at something else?

What this proposal feels like is another form of flow control. For example, if this feature existed, an adopter would still need to factor this into the RDY configuration of their multi-region consumers, right?

If you view a clients MaxInFlight as a setting for the max throughput a client can support, No this change should not affect the setting of that value.

It's also worth noting that as proposed a client can always opt out of any behavior change by not providing topology hints.

While this does affect messages flow, it does so in a form of flow steering w/o affecting flow rate.

jehiah commented 8 months ago

👋 @ploxiln @mreiferson FYI on the Bitly side we are interested in giving this some new attention - which @zoemccormick and @Ulminator will be leading.

Our current thought is to update nsqio/go-nsq#311 with a version we have been using successfully internally, then to work to get #1301 and nsqio/go-nsq#312 updated and running successfully internally at Bitly before following up to get those merged. We will also look to take on exposing topology information in nsqadmin.

mreiferson commented 8 months ago

sounds good, need to refresh my memory on this one :)