centiservice / mats3

Mats3: Message-based Asynchronous Transactional Staged Stateless Services
https://mats3.io/
Other
63 stars 7 forks source link

Batch processing of incoming messages. #16

Open stolsvik opened 5 years ago

stolsvik commented 5 years ago

The commit to MQ and DB is not free. Thus, in a heavily loaded environment, processing just one and one message gets a pretty hefty cost, if there already are messages ready to be processed on the receiver.

It should be possible to process messages in batch. What one probably then wants, is to be able to receive multiple messages, do some DB operation on each (e.g. batch SQL insert), and then commit both DB and MQ at the end of the batch

At least for batch SQL operations, you'd want to keep the whole setup of DB objects available (i.e. PreparedStatement). So the obvious idea where you'd just invoke the entire lambda multiple times before DB and MQ is committed will not be adequate for all situations (albeit maybe nice to have as a "simple batch" variant).

There are problems with error handling, as you now will roll back an entire batch of messages (up to the error-inducing message), and then eventually DLQ them all at the same time, even though maybe 99 of the 100-batch was ok.

~What I currently envision, is a semi-manual solution where we have a context.nextMessage(), which returns with the next deserialized DTO along with State, if available, and where the other message-specific getters on context "changes along". Any "request" invocations must also change context, and any invoked setters must be cleared.~

~Context.nextMessage() would return Optional, and when the current batch was empty (configurable batch size) or if there was no more messages, it would return empty, and the lambda should exit, letting Mats commit DB and MQ.~

(edit 2022-10-02) Well, that would not really work out the way we want: We want to be able to get a bunch of messages, then do e.g. a SELECT .. IN-like construct to also batch from the database, and then, when the DB result is in, make a bunch of "out messages", each corresponding to the respective incoming message. This would be necessary to continue each of the Mats Flows with the correct outgoing message.

A solution that would cater to this was that the incoming message argument for a batching endpoint was a MatsBatchList<I>, and that when producing replies, you'd do a context.setBatchMessageContext(I whichIncomingMessage) which used object identity. All relevant context methods (both incoming and outgoing) would "switch over" to that message's context. That is, both context.getFromStageId(), and context.addBytes(...) and context.reply(..) changed to this context of this message, i.e. the contextual Mats Flow that this message represents. Notice the type of the incoming value - this would be all it took to say to mats that you want batching. You would not be able to utilize any of ProcessContext's methods before having set the incoming message. Interesting problem with DetachedProcessContext when the Context could change like this.. Maybe need a copy-like method to get an DetachedProcessContext, or maybe rather that the setBatchMessageContext returned a ProcessContext that was message-contextual (and thus was immutable wrt. DetacedProcessContext methods).

Error handling:

Non-solution: If any message in the batch introduced any exception, including MatsRefuseMessageException, the stage would roll back the current processing (as normal), but then set the endpoint into single-processing for the next 'batch-size' number of messages. This would ensure that a poison message would be processed - and DLQed - on its own. When 'batch-size' number of messages was single-processed, it would revert to batching again.

However, the poison message could very well be presented to a different processor - on a different JVM - which then also head to realize that one of the messages in its batch processing was poison, and then go into single mode. If you then had more processors than retries (!), you'd again end up DLQing the entire batch.

Band-aid to non-solution:

At least let the stage processors share the number-of-single-process-before-batching counter (so that the 16 concurrent stage processors on this node/JVM not all had to discover this independently)

Broadcast this to the other JVMs running the same stage so that they also goes into single-batch. With this we have an asynchronous situation, which I do not see a way out of. So probably rule this out?

Solution?:

For all stages that are batching, make a sidecar stage processor with its own queue that are NOT batching, with e.g. concurrency=1. We'd keep all processed messages in memory. If we got into this situation, we'd roll back the DB, and commit the MQ but post all the messages in the current batch up to and including the poison one, to this no-batch queue (any "attachments" must be remembered!). On the no-batch queue each good message would be processed as normal, but then the poison one would be processed and eventually DLQed - as normal. (Note how this really would screw up ordering of message delivery - which Mats have expressly never cared about - but this would be a new notch on the scale.)