Open Georgette opened 8 years ago
@Georgette So this is something I had thought might explode. @AdrianRossouw brought it up before the work started. Basically I think we need some form of regularisation on the queue. We can do this in two ways,
1: On the queue directly, EG, we add the ability to force the queue to tick at x per second. 2: We add our own batching logic to our service, this is how it was done originally.
Personally I feel this is a function of the queue, how useful is a queue if I can't control the speed. There may be more to it than that. @pelger What are your feeling here?
Wouldn't it be at least reasonable to have the queue to accept an optional acknowledgement response so it knows when to offload the next thing?
If you need a fast in-process queue, use http://npm.im/fastq :).
The one thing that this plugin solves is failure recovery. If we add concurrency control within this module, and something would fail, the queue is being stored within redis so we can just restart at the position we left off. If we try to control flow somewhere else, well we are getting records out of redis faster than we process with no way to tell the queue to stop passing data, and end up buffering, losing everything in the buffer in a failure.
@Georgette So is the queue not respecting the callback? I would assume it would only offload as we call the callback per process. Is it just dumping them at you?
I think it is just dumping, and I don't see anything in the code telling it to wait.
If I'm entirely off base on this, I'm really sorry. Was hoping to get some feedback of its inner workings :)
I agree with @Georgette that needs to be implemented at redis-queue level. In our case, even if the call was fire-and-forget type and many observers are set for that action the redist-transport/balancer should be aware when the action is finished (when all registered executors for that pattern are finished). In this case we can have a queue there for which we can control the level of concurrency (and possible timeouts between executions).
However, I think that we need a queue (and its configuration) for each defined pin.
require('seneca')()
.use('redis-queue-transport')
.add('foo:one',function(args,done){ done(null,{bar:args.bar}) })
.add('foo:two',function(args,done){ done(null,{bar:args.bar}) })
.listen( {type:'redis-queue',pin:'foo:one,bar:*'/*, here is configuration for corresponding queue*/} )
.listen( {type:'redis-queue',pin:'foo:two,bar:*'/*, here is configuration for corresponding queue*/} )
Wondering how this plugin is doing concurrency control?
I'm using PR #19 within nodezoo for the npm registry download, and in a bit of a debug phase.
I was sifting through the code within the PR, and this code block looks like it is consuming from the queue as fast as it can. If it is, Is this intended?
I'd expect that since this is a queue, that there would be a way to control the data flow.
Apologies if I'm off base.