Open mj0nez opened 1 week ago
On further thought I think the following changes would be necessary to make this work:
Which would move the responsibility of triggering the DLQ-logic from the processor to the strategies, but keep it consistent for batch and "simple" processing.
To put individual messages from batches to the DLQ (step 2 in the example above) the strategy would also need to pass the handle to the function. Thus, another breaking change or we could simply add a new 'RunTaskOnBatch' strategy which covers that.
Currently, handling of invalid messages with the available DLQ does only work if the strategy has a direct processing chain and does not include batching. Consider the following:
All (individual) strategies after the batch step (2 & 4) cannot raise an
InvalidMessage
exception to trigger the DLQ, because any exception would dump the current batch of messages. Furthermore, if we need the faulty message in the DLQ, the strategy needs some kind of routing producer, which routes valid messages to one and invalid messages to the DLQ topic.To avoid this, I would like the
StreamProcessor
to expose it’shandle_invalid_message
to the strategy factory, like it does withcommit
. This would introduce considerably changes to the API but also allow strategies to pass invalid messages directly to the dlq handler, utilizing the configured policy and buffered messages, while avoiding complex routing logic and replicating the DLQ handling.