Closed tj closed 10 years ago
@visionmedia - trying to interpret your logs here... where do you see the RDY 50
in that log? Also, you say 86031
in flight but the logs say 2
then 1
then 0
? How should I be reading this?
Basically in this case we accept 50 concurrent messages (inflight
), which are then buffered for a long time and we decrement inflight
once it's in the buffer. The total
represents the total in-flight number from an NSQ perspective, so NSQ's in-flight will match total
, and max
is just the client-side max-in-flight.
NSQ gets a max-in-flight of 50
and we just keep accepting more and more until it reaches 300k in this case, but we need to lock the concurrency at 50
and for whatever reason NSQ seems to keep dishing way more than we request. Hope that sort of makes sense, this is the client-side RDY concurrency limiter I was mentioning a while back
code makes more sense than me haha:
/**
* Module dependencies.
*/
var debug = require('debug')('ready');
var assert = require('assert');
/**
* Expose `Ready`.
*/
module.exports = Ready;
/**
* Initialize Ready with:
*
* - `concurrency` max at a given time
* - `max` max in total
* - `queue` the reader
*
* @param {Object} opts
* @api public
*/
function Ready(opts) {
assert(opts, 'settings required');
assert(opts.max, '.max required')
assert(opts.queue, '.queue required');
this.concurrency = opts.concurrency || opts.queue.maxInFlight;
this.queue = opts.queue;
this.max = opts.max;
this.inflight = 0;
this.total = 0;
}
/**
* Add a message.
*/
Ready.prototype.add = function(){
this.inflight++;
this.total++;
}
/**
* Remove a message.
*/
Ready.prototype.sub = function(){
this.inflight--;
this.maintain();
}
/**
* Mark a message as complete.
*/
Ready.prototype.done = function(n){
n = n || 1;
this.total -= n;
debug('done %s RDY %s', n, this.total);
this.queue.ready(this.total);
}
/**
* Maintain the RDY count.
*/
Ready.prototype.maintain = function(){
debug('inflight=%s total=%s max=%s', this.inflight, this.total, this.max);
if (this.total >= this.max) return debug('max');
if (this.inflight) return;
this.ready();
}
/**
* Send the RDY count.
*/
Ready.prototype.ready = function(){
var rem = this.concurrency - this.inflight;
var rdy = Math.min(this.max, this.total + rem);
debug('RDY %s', rdy)
this.queue.ready(rdy);
}
this.queue.ready()
distributes to the N connections just by division at the moment, but I need to fix if (this.inflight) return
for when a producer or two have no messages, that obviously gets stuck :D
I'm pretty sure what's happening is that since your RDY
count keeps going up (kinda), when you FIN
a bunch of messages you get a burst in return. For example:
RDY 500
250
messagesRDY 750
FIN abcd
for 250
messages750
messagesIf you've set a really high RDY
on a given nsqd
, while you have some % of that in-flight, but then FIN
those, you can then receive up to that RDY
.
But, just so I'm on the same page:
max = 300,000
concurrency = 50
Can you elaborate a bit more on the whole 50
thing? Are you processing in batches of 50? If so, why do you want 300,000
in flight? Is it because you want 300,000 / 50 = 6,000
batches to be processed concurrently?
wait how is it that RDY 750
followed by a FIN 250
would give you 750? shouldn't that just give you another 250?
yup max == 300k and concurrency == 50, just kind of sloppy prop names haha, shadowing nsq terms is probably not the best idea. We basically just consume all the messages we can and bucket them into specific s3 files that get flushed either at a given count or interval
this lame example works as expected, binning into three random buckets and flushing ever 100. I think maybe timeouts are causing the issue in production. If you had max 300k, total 100k and the last RDY was 100k, then 20k time out it's going to send those 20k back and skew the numbers right?
tracking timeouts in the client at this volume might be a little tricky, we'd have to loop all 5+ million to keep checking, this seems really error-prone to do at the client level. It would be really nice to maybe have:
MAX 5000000
RDY 50
RDY 50
RDY 50
instead of the current:
RDY 50
RDY 100
RDY 150
or some variant
That might be your issue. As soon as NSQD times out messages you have, it'll immediately send more to fill your messages up to your last requested RDY. Do you have nsqd sending stats to graphite? even if you don't it should be easy to see if nsqd times out your clients handling via stats/nsqadmin.
@jehiah yeah I think that might be the problem, it's hard to tell (we're sending them through statsd to stackdriver), I've been restarting the processes so much the last few days that a bunch are timing out, but they might not be from the workers running
eek, yeah checking 5m messages timestamps for a timeout state seems to take about ~1.5s with node so that's not too great
Yea, could be timeouts, and yes that would be a pain to track client side.
I've really been wanting to make a change such that you only set RDY
once if you don't need to adjust it. I'm not sure if that would really help you here, though.
(thinking)
FWIW I've done a bit of testing with this change in place (only need to send RDY
once). And it "just works", all tests pass, etc. It would even be backwards compatible with existing client libraries (and would provide an opportunity to simplify these code paths going forward).
The other odd this is that this still seems to happen even with a very long timeout of say 2 hours, unless that's not being applied correctly, I should double check that but I have --max-msg-timeout=5h
so it should be fine
Hmm yeah I guess that wouldn't help us unless you could control the concurrency, getting hammered with hundreds of thousands of messages is too much for node haha
are you also changing --msg-timeout=60s
which is the effective timeout? (max-msg-timeout sets limits when you are TOUCH'ing a message to extend the timeout)
the flag is set to 30s but in-app it's msgTimeout: ms('2h'),
I assume he's referring to the per-client msg timeout set via IDENTIFY
wait how is it that RDY 750 followed by a FIN 250 would give you 750? shouldn't that just give you another 250?
@visionmedia because if you've told nsqd
that you're "ready for 750" and you have none in-flight (after FIN
of 250 you previously had in-flight) you can receive up to that amount
in-app it's msgTimeout: ms('2h'),
Oh i forgot you can set per-connection default msg timeout via IDENTIFY. looks like that's missing from the docs here.
@jehiah care to update :grin: ?
@visionmedia - looking over the code is there a specific reason why you need to gate the ingestion (by slowly increasing it rather than just setting RDY
to max
aka 5000
)?
our real maxes are around 5 million, I had lowered it to 300k to try and debug easier, but if we don't throttle and FIN 200k after a batch or on a restart we just get smashed by messages and node's event-loop doesn't seem to tolerate it very well even if we apply a throttled queue at the app level, and then it basically bypasses the RDY backpressure if we had a very high initial RDY
We basically just consume all the messages we can and bucket them into specific s3 files that get flushed either at a given count or interval
@visionmedia - sorry this took so long to wrap back around to this issue, I've had a busy week!
Might I suggest an alternate architecture? I think these extremely high in-flight situations present a few problems, some of which we've outlined over time on some related github issues/comments:
Have you considered running 2 to 3 nsq_to_file
processes on "aggregation hosts", rolling the
files at whatever time interval you'd like to batch for (1min, 5min, 30min, 1hr, etc), and then
processing (filtering, aggregating, S3 upload) those rolled files "offline" (via cron, perhaps)?
The multiple processes/hosts give you the redundancy you need and you can more effectively process extremely large batches (hours worth).
The real win here is it's a hybrid best-of-both-worlds approach. It's been an incredibly effective strategy at bitly for situations that required really large batches...
no worries! Like I mentioned in the other issue I've restructured things so we don't need so many in-flight messages anymore. We were buffering in-memory and flushing, but I realize now that was kind of silly regardless of how NSQ handles things, I've refactored it all with Go and I'm just buffering to disk now between localized workers
ahh cool, sounds like a very similar approach. I suspect it will do the trick :smile:
Make sure you're using the newer go-nsq
API!
I think I am haha I'll double check :D
still looking into it, probably my fault, does this seem odd to you? This is for my manual client-side RDY management to allow for greater throughput. I basically just increment on entry, decrement before being buffered, and then decrement the "total" when buffered messages are batch flushed.
Now the weird behaviour I'm seeing is:
So we have 86031 messages in-flight, RDY for 50 more and then receive several thousand from nsq? Maybe I'm missing something about how RDY works but that seems weird, and happens regularly