Open willyaranda opened 10 years ago
Right, the WORKER socket assumes you're doing just one thing at a time, and scaling up by having more workers.
This discussion may be apposite: https://github.com/squaremo/rabbit.js/issues/48
The solution discussed there is to construct a future for each task, and pass that along with the payload to the next stage. Then you can iterate through the promises and ack things in order.
I would love to have a extended ack method that can accept the message payload (or a uniqueId) for acking that single message.
Both of these are problematic: the first assumes no two payloads will ever be the same (uh-oh); the second requires some means of giving a buffer a unique ID, which is then required when acking it. I suppose it could be a property set on the buffer, but that seems kind of gross -- also, whatever you're passing it on to would have to know to tell you once it's done -- so you may as well give it a future or callback or whatever, rather than an ID.
@squaremo I think messages received from your lib to Rabbit.JS internals has a deliveryTag
(that is what you are using to ACK messages, based on https://github.com/squaremo/amqp.node/blob/master/lib/channel_model.js#L220-224 )
I think that you can send, along the message, the deliveryTag
that message has. As I see [1] the readable.push
method, I think this change will break the current API, as it allows only a Buffer
or a String
, and not an object
, or two parameters.
[1] http://nodejs.org/api/stream.html#stream_readable_push_chunk_encoding
I think that you can send, along the message, the deliveryTag that message has. As I see [1] the readable.push method, I think this change will break the current API, as it allows only a Buffer or a String
One can always set a property on the buffer or string -- it's JavaScript after all. But I don't like this solution, because the receiver has to know a very specific behaviour, which is to get the delivery tag from the buffer it's responding to, and somehow attach that to its answer. I'd rather there was a callback.
The library could help you out: maybe something like
var worker = rabbit.socker('WORKER');
worker.connect('jobs');
worker.on('data', function(job) {
doSomethingAsync(job, worker.lastMessageAcker());
});
When using a PUSH/WORKER scenario, if you receive a message on the
Worker
part, you canack
the worker socket to tell RabbitMQ that a message has been processed and can receive the next batch.The pitfall is that if you configure the
worker
with aprefetch
bigger than 1, you will alwaysack
the first message received (that is saved always on the first position of theunacked
array)But, in a async environment, you may finish processing the second message received before the first one, but the
ack
will make the first one to beack
ed, not the message actually finished.I would love to have a extended
ack
method that can accept the message payload (or a uniqueId) forack
ing that single message.@squaremo , what do you think?