squaremo / rabbit.js

Messaging in Node.JS made easy using RabbitMQ
Other
1.52k stars 142 forks source link

hybrid sockets for topic routing and acknowledged messages? #57

Open mxriverlynn opened 10 years ago

mxriverlynn commented 10 years ago

I find myself wanting a hybrid between push/pub and worker/sub, so that i can do topic workers (topic based routing with acknowledged messages, optionally persistent/durable).

any chance the push socket can be updated to use an exchange with routing type option, instead of pushing directly to a queue? the worker would also to allow topic subscriptions, too. or would you prefer to see new socket types for these?

i might be able to take a shot at modifying or building these, in the next week or two... been looking for a way to contribute more than just issue tickets :)

mxriverlynn commented 10 years ago

found myself hitting a brick wall today, needing this. imma go ahead and implement it and attach the changes as a PR, for this ticket.

squaremo commented 10 years ago

Part of the contract of push or req is that the message goes to a single consumer. Routing (i.e., sending to an exchange rather than a queue) breaks the contract by allowing a message to go to more than one consumer, so one might get several replies back for a request, for example.

Do you want to round-robin messages among connections, like push and request do? Or just have pub/sub but with replies; there is some precedent for that pattern, in nanomsg: http://nanomsg.org/v0.3/nn_survey.7.html

What's the high-level requirement -- maybe there's another way to accomplish it.

mxriverlynn commented 10 years ago

i have multiple scenarios with multiple projects that see a need for this, all of which come down to either doing more work on the receiving end, or having a ton of queues names for specific message types.

in one project, i am sending job requests - things that need to be run. i had originally set up the job queues with their own name, like job:job-a and job:job-b. I was using push / worker for this, because I need jobs that will wait in the queue until something is available to handle them, and I want to .ack() the message.

in another project, i have Stripe integration for payment processing. Stripe sends me a webhook for events related to customers, subscriptions, etc. I don't want to process the webhook in the http thread, so I'm sending the messages across my queues. I need a different handler for each of the events that I need to handle. i also need to make sure these events are handled. These need to be persistent messages. i was originally going to do the same as the previous project where I have named queues for each message type and use push / worker for this. i would end up with 5 or 10 names queues for this, to start with... stripe:invoice.updated, stripe:customer.created, etc.

i just keep coming back to the need for named queues, with persistence, to send messages based on routing.

am i thinking about this wrong? does it matter that i end up with 20 or 30 queues, named after the message type? that seems a bit backward to me... and every time i try to go down this path just one more time, i find myself thinking that the problem i'm facing would be better solved with exchanges and routing slips.

mxriverlynn commented 10 years ago

in the case of my stripe integration, i will end up doing 1 of 3 things, I think:

  1. using pub/sub and not having a guarantee of the message being handled (no thanks)
  2. using a single queue, and having a switch statement on the other end to determine which code to run for the message type
  3. using a worker queue with routing for the type of event from stripe, sending each event to the correct handler

option 2 seems the most viable at this point, but also seems like it's reproducing in code what should be done at a queue level (option 3)

squaremo commented 10 years ago

I think I see: you have a bunch of different kinds of job, and you want to be able to partition them into a smaller number of queues, for workers to pick up.

You could partition things statically, by choosing your partitions ahead of time and naming the queues for them. In effect the producers and consumers then have to know the routing -- e.g., "I have a stripe:invoice.created, so it goes in the 'stripe:invoice' queue, then the consumer can look at it in more detail". But I can see why your mind returns to exchanges and routing -- it seems like this ought to be dynamic.

A bit of ancient history: In JMS there's the notion of a 'selector', which is like a queue query. You can consume from a queue while supplying a selector, to filter the messages that you're interested in.

AMQP, which derives in large part from JMS, doesn't have selectors. They are supposed to be subsumed by exchanges and routing (which also supposedly cover JMS Topics), but sadly, the designers of AMQP missed a trick: selectors do something like routing while also guaranteeing that 1. messages won't be dropped on the floor if no-one's there to listen, and 2. only one consumer will see each message. There's just no way to do those in AMQP.

Now back to the present. You can probably see where problems will arise: there's no way to guarantee that messages won't be dropped on the floor, if the producers don't know how they will get routed, since each message will go through an exchange and there may not be a binding in place to route it to a queue.

There's ways to mitigate this in AMQP -- using the mandatory ("return if unroutable") flag, among others -- but they break the simplicity of the rabbit.js model, and I'd like to avoid that.

There's no inherent problem with having tens of queues, by the way -- the messages would get buffered somewhere, and partitioning them into queues gives the right semantics (each message to one worker). Maybe there's some way the API can be more convenient though? For workers it's not too bad, you just #connect to each queue you care about. For producers though, well you'd have to have a socket per type of job, and pick the right one. On the other hand, something like #publish, where you mention the job type when you send the message, wouldn't guarantee that there's a queue at the other end (or at least, it would be expensive to do so). Perhaps:

var jobs = ctx.socket('TASK');
// Connect to all the task queues we will want to send to
jobs.connect('stripe:customer.created');
jobs.connect('stripe:customer.updated');
// ...

// Send to a specific task queue
jobs.send('stripe:customer.created', JSON.stringify(job), 'utf8');

Note there's absolutely no wildcard trickiness possible, it's all still sending straight to queues.

mxriverlynn commented 10 years ago

Thanks for the detailed response - the comparison to JMS is especially helpful (spent 2 years in that world, previously)

In the case of Stripe, it's pretty easy for me to know up-front which queues I need. I only have a few specific webhooks to handle.

But with the job system... there are hundreds of jobs and dozens of job types that will be built, changed, maintained and updated over the years. It's not something that can be set up front and have hard coded. I could have a configuration file that I read as needed, but this would be an additional process added to the job configuration process - which is already a complex system and prone to human error. It's technically possible, but is very much a usability problem with all the maintenance and work that happens on the jobs.

... what about dead letter queues? i haven't done anything with these in rabbitmq, yet, but i would wonder if an exchange can be configured to catch missed messages in the dead letter queue?

squaremo commented 10 years ago

there are hundreds of jobs and dozens of job types that will be built, changed, maintained and updated over the years. It's not something that can be set up front and have hard coded.

Right, I understand. It would be nice to have the best of both worlds -- that is, to be able to verify that you won't miss messages, and to be able to have a rolling evolution of the system. But that's probably outside the scope of rabbit.js, if not the system you're building.

In any case, alternate exchanges and dead lettering will help you keep track of exceptions at runtime -- messages that don't get processed either because no-one's listening, or because whoever got it was unable to process it. In recent versions of RabbitMQ you can set these with a "policy", i.e., apply them to exchanges and queues as matched by a regexp you give. (In less recent versions they are an argument to exchange.declare or queue.declare.)

Setting an "alternate exchange" will give you catch-all route for messages that would otherwise get dropped. Dead lettering and per-queue expiration will give you unprocessed messages. I think these are out-of-scope for rabbit.js, but you can certainly use rabbit.js to get at the messages, by e.g., routing dead letters to a queue and having a worker socket in its own process to log them all, blink a red LED, email an operator or whatever.

So with all that in mind, what do you think of the TASK socket?

squaremo commented 10 years ago

So with all that in mind, what do you think of the TASK socket?

I think this would survive dozens of job types (after all, every sub socket has its own queue, as does every request socket), and online repartitioning of the workers, and so on. The question is, do you need wildcard matching?

I think you're suggesting that you will, so you can send to "job.foo.frobnicate" and have a worker that's dealing with "job.foo.*", rather than mention frobnication specifically. I would point out that you have to change code somewhere to send new kinds of job, and that you can start extra workers to pick up the new kinds of job while you change the old workers to handle it. In the wildcard case, the old worker would just start getting jobs it didn't know how to handle. But perhaps I'm off on a tangent.

squaremo commented 10 years ago

I've implemented the TASK socket in master. (If it turns out I'm off track, we can think of it as an hilarious April Fools prank.)

squaremo commented 10 years ago

@derickbailey I'm interested to hear where this ended up for you -- did you get something working in a fork, try the TASK socket, do something completely different ..?