rebus-org / Rebus

:bus: Simple and lean service bus implementation for .NET
https://mookid.dk/category/rebus
Other
2.31k stars 362 forks source link

Feature: Support for batching messages together to process in one go? #936

Closed kendallb closed 3 years ago

kendallb commented 3 years ago

I don't know if this is feasible with all message transports, but I know it certainly would be for the MySQL transport that I am working on. I am planning to replace our own ad-hoc message system for running background tasks with Rebus, but one of the things we do in our background processing tool is we have the ability to snag large batches of messages to process in one go. Sometimes for processing tasks (such as reindexing product records for search etc), it's a lot more efficient if we can process a batch of them in one go, than to do them all one after the other.

If we wanted to support that, how would we set it up so that a consumer could opt to get up to X messages to process in one go, rather than just one at a time?

Also related to this, how does Rebus currently make sure messages are all processes in the correct sequence, if there are multiple threads or task parallelism enabled for the consumer? Clearly if messages are things like incrementing or decrementing stock levels in eCommerce (or stock trades in a trading environment), it would be critical that all messages are processed in order. Is that normally done by using a single thread with only one level of parallelism? Or does Rebus already enforce that some other way?

mookid8000 commented 3 years ago

Batching is already supported – let's pretend you have some kind of WorkToDo, then you simply send an instance of this:

public class WorkBatch
{
    public WorkBatch(IEnumerable<WorkToDo> work) => Work = work.ToList();

    public IReadOnlyList<WorkToDo> Work { get; }
}

and then you handle it in a handler like this:

public class WorkBatchHandler : IHandleMessages<WorkBatch>
{
    public async Task Handle(WorkBatch batch)
    {
        // ...
    }
}

😉 But, seriously – Rebus 1 (versions <= 0.84.0) had built-in batching support, meaning that you could bus.Send(aSequenceOfMessages), but combined with polymorphic dispatch in the receiving end it added a silly amount of complexity with little benefit.

Explicitly bundling messages is so easy to do, easy to understand for the next developer, and has so predictable behavior, that that is how Rebus supports batching. 🙂

Also related to this, how does Rebus currently make sure messages are all processes in the correct sequence, if there are multiple threads or task parallelism enabled for the consumer?

Good question! The answer is: It doesn't!! 😱 And this is why, when you're building stuff with message queues, you must always build in some level of tolerance towards reordered messages. Even if you set threads = parallelism = 1, and you know your queue is strictly FIFO, then you can STILL get messages out of order when messages from the dead-letter queue are retried.

So this is also why I would not recommend you implement a typical stock ticker feed with message queues: Even though they're FIFO, queues are the most fun when consumers can process messages in parallel (multiple threads, maybe even multiple processes), and so they're not a good fit for tasks that require strict ordering of things.

Queues are also damn slow if you only take one message out at a time.

You should use Kafka for your stock ticker 🙂 (if you need high throughput), or ordered rows in a database if your needs are modest.

mookid8000 commented 3 years ago

Btw. if you're interested in events with guaranteed ordering, you want the consumer to decide how big batches to consume, and you like the general approach in the APIs of Rebus 😁 then I suggest you take a look at Topos – it's not an "official" Rebus FM project (yet), but it's being used in production right now internally in the Fleet Manager product we deliver.

It's basically like "some kind of Rebus for log-based event brokers", where log-based event brokers at the moment is limited to

  1. Kafka (with special configuration support for Confluent Cloud)
  2. Azure Event Hubs (via its Kafka facade, with special configuration support)
  3. The file system (via Microsoft FASTER

It could easily support more brokers, but Kafka, Azure Event Hubs, and the file system are those that I've needed myself until now.

kendallb commented 3 years ago

Thanks for the comments! I will look into the batching support. Mostly I care about being able to grab a full batch of work at a time and pull it out. I suppose we could implement the batching support differently in that we submit the batches directly into the message queue as batches of work in a single message. Usually this is for search re-indexing on our site, when a ton of products need to get indexed and it's way faster to do them in blocks of 1000 (we can get data for lots of them with a single SQL query).

We do need the ability to submit multiple entries to the message queue when we are setting them up to be indexed in batches, and we currently have batched SQL inserts to do that with our own homegrown queue. But the code that inserts them into the message queue could probably just as easily submit them in chunks of 1000. Currently they all get inserted as single events, and then the consumer pulls them out in groups of 1000 until they are all done.

Anyway part of the reason we need to be able to support ordering of events is that is for a different case where we are rebuilding product category connections in the background. When we submit a batch of those (which could be thousands in one submit), after it's all processed we then process a cache clear event so the caches get flushed once it is all done. So if that gets processed before the indexing gets completed, then things might not show up until the next time the caches expire. Not really sure how to handle doing that if the messages might end up being processed slightly out of order?

Is there a way to peek at the queue to see if any messages are pending in the same queue? Anyway for this particular use case our existing background task queue works well so it's not necessary to convert it to a rebus message queue if it's not a good fit. The existing system works nicely as is.

What I am going to use this for is to build a better, extensible event tracking platform for our web sites. Event tracking on the web is a real pain when you have multiple connections you need to send tracking events to (Google Analytics, marketing tools, search engine intelligence etc) as you need to send of async events from the browser to get captured, but if the browser navigates before those get done, recent browsers will terminate those requests so sometimes you lose the data. That data is pretty important when you are trying to optimize your search results on eCommerce web sites by knowing what customers do with the results so you can bubble the important stuff to the top. So my goal is to remove all the web based tracking completely and have a single async call to our own back end tracking server, and then let the click navigation complete when that is done. The back end service will take the call and submit a message to the Rebus queue with the event, and multiple subscribers will be listening and will then process the events and send them off to their respective services (Google Analytics, search engine etc). I think Rebus will be a good fit for that and the ordering of those events is not important. It just needs to be fast.

kendallb commented 3 years ago

BTW, if you are reading this I sent you an email to your hello@ email address to ask some questions about work I am doing on Rebus outside of Github.

kendallb commented 3 years ago

Ok, getting back to this I don't see that batches are actually supported in the way I was hoping (single messages get pushed to the queue, and you can pull them out in batches and process them).

I can mimic it as you said by pushing into the queue in larger batches, but there is no solution to get more than one message from the queue at a time and processing in a batch. Doable with a SQL transport, but probably not with other message queues? Does MSMQ or RabbitMQ support reading multiple messages at a time?

mookid8000 commented 3 years ago

Yeah well, as you're correctly getting at here, queue generally don't lend themselves well to consuming messages in batches. While queueing systems like MSMQ and RabbitMQ do allow for pulling multiple messages out, they're still retrieved one at a time, and so the "receive-side batching" would be completely up to the consumer to implement.

If you really really want to have batching in the receiving end with Rebus, I suggest you take a look at implementing a Task-based solution backed by a dispatcher thread, which would take care of bundling requests. The API from a Rebus handler could look like this:

public class BatchingMessageHandler : IHandleMessages<WorkToDo>
{
    readonly IWorkBundler _workBundler;

    public BatchingMessageHandler(IWorkBundler workBundler) => _workBundler = workBundler;

    public Task Handle(WorkToDo message) => _workBundler.Process(message);
}

Internally, the work bundler would have a ConcurrentQueue of pieces of work to do, each associated with an appropriate TaskCompletionSource, enabling that a Task be returned for each piece of work. The bundler would then have a worker thread, a timer trigger, or another way of deciding when to process things, which it then be able to do un bundles.

Although... with the aforementioned Topos library, because a log-based broker lends itself so well to working this way, it's as easy as going

using var consumer = Configure
    .Consumer("default-group", c => c.UseKafka("kafkahost01:9092", "kafkahost02:9092"))
    .Serialization(s => s.UseNewtonsoftJson())
    .Topics(t => t.Subscribe("someevents")
    .Positions(p => p.StoreInMongoDb("mongodb://mongohost01/some_database", "Positions"))
    .Options(o => o.SetMaximumBatchSize(10000))    
    .Handle(async (messages, context, token) =>
    {
        // messages will contain 1..10000 items here
    })
    .Start();

to bundle messages in the consumer, in this case dispatching batches of up to 10k messages at a time.

kendallb commented 3 years ago

Yeah a separate batcher thread is not a bad idea. It could wait until it gets up to X messages and then process them as a batch, or start processing if it does not reach the batching limit within a short timeout, say 100ms or something.

As for the Topos library, it's not clear to me yet from looking at it what is different with it and Rebus? Sounds like it is more about guaranteed message ordering and has support for pulling messages out in chunks? I see there is now SQL transport for it yet, is that because it does not lend itself well to being stored in SQL, or some other reason?

We currently use Couchbase as our caching tier and it is a NoSQL data store like MongDB, so I see there is a MongoDB transport so I wonder if it would be easy to port that to run with Couchbase as a place to store the messages? Couchbase is really fast, and for transient stuff like messages it would be a good fit.

Actually makes me wonder if it would not also be a good fit for storing Rebus messages ... might be way more performant than SQL.

kendallb commented 3 years ago

Ok, so having some more work on the MySQL connectors and got the performance as good as I can get it, the core problem with high performance when using the SQL transport is that the transport itself is quite slow at running the queries to find the 'next' message in the queue. If there is only one thread at a time pulling messages from the queue and in the case of competing threads with MySQL, it causes deadlocks all the time so you have to start over.

So it seems to me perhaps there is a way to kill two birds with one stone here. If the SQL transport layer was capable of reading a 'batch' of messages from the queue at once (up to a configurable maximum), those messages could be brought into a batch queue and then farmed out to either a single handler designed to accept a batch of messages, or simply call a handler one at a time with each message (via async of course up to MaxParallelism) in the normal case. That would take care of a huge part of the bottleneck of the transport, as you would be reading more messages at once.

Clearly you cannot hold a transaction open for the duration of the message handling in this case, but I already had to change that for the MySQL transport anyway, as MySQL will deadlock if you try to do it the way it is done in SQL server. So instead we flip a 'processing' flag on the message to block it from being processed by other handlers, and we either clear the flag or delete the entry when the message has been processed by registering the handlers in the transaction semantics. Kinda like how the leased messages work, but without the leasing timing logic.

mookid8000 commented 3 years ago

As for the Topos library, it's not clear to me yet from looking at it what is different with it and Rebus?

It's a different beast alltogether, because it's centered around a log (e.g. Kafka, Azure Event Hubs, etc.) and not a queue (e.g. Azure Service Bus, RabbitMQ, etc.). Check out this blog post for a good and to-the-point comparison of the two types of brokers 👉 https://jack-vanlightly.com/blog/2018/5/20/event-driven-architectures-the-queue-vs-the-log

Sounds like it is more about guaranteed message ordering and has support for pulling messages out in chunks? I see there is now SQL transport for it yet, is that because it does not lend itself well to being stored in SQL, or some other reason?

And no there's no SQL broker implementation for it yet. I could definitely see myself adding that at some point.

We currently use Couchbase as our caching tier and it is a NoSQL data store like MongDB, so I see there is a MongoDB transport so I wonder if it would be easy to port that to run with Couchbase as a place to store the messages? Couchbase is really fast, and for transient stuff like messages it would be a good fit.

There's no MongoDB broker implementation for Topos, it's a "positions store". The thing is, when you're reading from a log, which is a shared resource, clients do not "take messages out", or change the log in any other way – so it needs to keep track of how far it has read, which is why all clients must have a "positions store" where they periodically checkpoint the offset of how far they have come in reading the log. This way, they can resume reading from the correct place when they're restarted, after a crach, etc.

Actually makes me wonder if it would not also be a good fit for storing Rebus messages ... might be way more performant than SQL.

Could be 🙂 don't know anything about CouchBase. It should be possible though to Bingle around for reports from people having tried to used it as a queue.