rebus-org / Rebus.Oracle

:bus: Oracle transport for Rebus
https://mookid.dk/category/rebus
Other
5 stars 10 forks source link

Feature: Partitioned Queue #26

Closed dstrijbos closed 4 years ago

dstrijbos commented 4 years ago

Hi there!

First off, thanks for your work on Rebus! For a project of ours, we needed an Oracle-based queue, and Rebus seemed to fit the bill! However, we wanted to add a partitioning mechanism to our queue to prevent different consumers from handling messages related to the same topic.

I uploaded a working solution and wanted to discuss it before opening up a PR: https://github.com/rebus-org/Rebus.Oracle/compare/master...dstrijbos:feature/partitioned-transport I should mention this is my first time working with Oracle, so I may have missed some obvious flaws in my approach.

Overview

I added a OraclePartitionedTransport class, as well as a separate table and procedure definition that includes a PartitionId and a Status column. By adding the appropriate header, a Producer can add a PartitionId to a message. Once a Consumer dequeues a message, it's Status is set to 'Processing'. If that message had a PartitionId, no further messages with that PartitionId will be handled until the first message is 'Completed'.

Locking Hurdles

Initially, I replaced the last line of the StoredProcedure with an update of the Status field to 'Processing'. However, I noticed that in both the original and in my implementation, the final line of the procedure isn't actually executed until the Consumer finishes handling the message. Is this behavior intended? In the original implementation, this doesn't seem to cause any problems but for me this meant the Status was never correct during handling.

I eventually figured out I force the status to update via the following lines: update {table.Name} set status = 'Processing' where id = messageId; commit; And then use the final line to set the status to 'Completed'. However, this approach feel a bit hacky to me, do you see any issues with it?

Extensibility questions

I initially tried to make OraclePartitionedTransport a subclass of OracleTransport but ran into some issues, like variables being private. It also seemed that the baseclass' Send and Receive methods were called by the core Rebus code, as opposed to the subclass as I intended. Would you mind adjustments to the OracleTransport class to support this extension?

I now have a separate table definition and a SendCommandPartitioned because of the new GroupId and Status. Would you mind if those two fields were added to the regular Transport table/command? This would make the PR a lot sleeker.

mookid8000 commented 4 years ago

(...) However, we wanted to add a partitioning mechanism to our queue to prevent different consumers from handling messages related to the same topic (...)

But.... queues already do that, including if you're using Oracle as the "queues".

When you start multiple processes that consume messages from the same queue, each message will be processed by one and only one of the consumer instances.

Consider this example: If you have a publisher that publishes to a topic topic-a, and you have a subscriber with the input queue subscriber-1 that subscribes to it, then a copy of each even published to topic-a will be delivered in the subscriber-1 queue.

If you then start multiple instances of the subscriber, all consuming messages off of the subscriber-1 queue, then messages will be consumed in a round-robin fashion from it.

This is called "competing consumers". Almost all queueing systems can do that. It's usually how you distribute load, when your messages represent work to be done.

I assume you're inspired by how Kafka is modeled, but the reason Kafka needs partitions, is because events are stored as streams in the topic, and it's up to the consumers to keep track of how far they have read. Therefore, to be able to distribute load among multiple consumers, Kafka introduces the concept of "consumer groups", which are 1-n instances of the same consumer, which then get assigned a subset of the partitions from the topic.

With Rebus, and with message queues in general, a subscriber gets its own copy of each event published to it. When the event is received, it is removed from the queue(*). This fact means that multiple instances that take messages out of the queue can simply be started in parallel, thus distributing the events among them.

I hope all of this makes sense. 🙂 Please let me know if there's something I've missed.


(*) Usually with a lease-based protocol that means that messages can be safely processed, only actually deleting them from the queue when that has been done.

dstrijbos commented 4 years ago

Thanks for your quick response. It seems I glossed over the explanation in trying to be concise, apologies!

One crucial aspect of this feature is the preservation of message order for a subset of messages. A practical example: My system can receive messages that either read of modify data for a single company, so if we first receive an update-request for company-a and then a read-request for company-a, I want to handle messages for that company in the order they arrived in by locking the handling of messages related to company-a while the first message is being processed. So consumers don't compete over company-a, but can still be scaled up to handle messages for other companies.

Hope that clarifies the situation.

mookid8000 commented 4 years ago

Ah ok, it makes sense then. I am just a little bit afraid that it's going to increase the complexity of the code.

Generally, when you're using queues to distribute messages, you should not make too many assumptions around message ordering. Many things can happen to messages that make them arrive out-of-order, like parallel processing and temporary stays in dead-letter queues, so the general advice is to tolerate out-of-sequence messages as much as possible.

While you could of course make your particular queue (Oracle in this case) guarantee ordering of the messages, you're still exposed to a message failing, going into the dead-letter queue, and then be returned to the queue at a later time.

For this reason, I generally recommend against making (too many) assumptions about the order of messages.