glazkovalex / Rebus.Kafka

Apache Kafka transport for Rebus
MIT License
16 stars 6 forks source link

Unable to invoke deadletter topic #6

Closed mkoziel2000 closed 6 months ago

mkoziel2000 commented 3 years ago

I noticed that the SimpleRetryStrategy doesn't work with the Rebus.Kafka plugin. When I have a message handler that throws an exception, Rebus will log it as a warning, but the Rebus.Kafka just goes on its merry way and consumes the next message in the kafka topic as if the previous message was processed successfully. I can see where it can be difficult to handle replaying of a previous message as it requires moving the offset backwards to try reprocessing it before finally giving up and forwarding the message to an actual deadletter topic. Is there something that I'm missing that needs to be configured to turn on the retry and deadletter mechanism or is this feature not implemented in the plugin yet?

It seems that we always have the option to build out a retry mechanism and forwarding of the message to an error queue from within handler code, but this is not ideal. It leads to a lot of repetitive code and is not safe from the perspective that the message may never get completely processed if the service is restarted as the message is in the middle of its retry logic. It would be nice if that was leveraging the fallback mechanism already built into rebus as it accounts for those scenarios.

One option I can think of is to block performing a commit() until after the successful processing of a message by the handler (this would obviously require turning off the autocommit feature). If an exception is thrown., don't allow the code to determine if a commit() should be called. Instead, try calling the handler again. Once the retry is exceeded, push the message to the deadletter topic defined in the SimpleRetryStrategy and then we can proceed to the commit() logic and subsequent consume() of the next message.

It seems like this would be the most straight forward way to hook into the SimpleRetryStrategy. Obviously, if SimpleRetry is not defined, the current implementation seems like it would be appropriate.

Is there a setting I'm missing somewhere or is this a legitimate limitation of the plugin?

glazkovalex commented 3 years ago

Thank you for the issue! Later, after a careful investigation of this case, I will write solutions to these issues. So far, I will only note about this:

block performing a commit() until after the successful processing of a message by the handler (this would obviously require turning off the autocommit feature)

Please check whether the EnableAutoCommit option is disabled in your consumer configuration. For the bus to work properly with the Apache Kafka transport, it must have the value EnableAutoCommit = false, as it is done by default, and as it is done when overriding the consumer parameter in this example: https://github.com/glazkovalex/Rebus.Kafka/blob/master/Examples/Scaleout.Producer/Program.cs#L38.

Notice that the current version of the transport with disabled autoCommit, receives five messages and after processing moves the pointer. In other words, if the service is interrupted with autoCommit disabled, the service can re-process up to five messages when restarting. The number of these re-processed messages can be changed in future versions of the transport "Rebus.Kafka".

mkoziel2000 commented 3 years ago

I've worked with enableAutoCommit in both true and false states to see how it effects Rebus operation. Aside from adding in deadletter support, the threshold of 5 seems very arbitrary and it would be nice if this is exposed as a parameter that can be tweaked. That way, anyone could set it to something that represents how much message replay they want to sacrifice in the case of an outage happening. It would also allow us to do "exactly once" delivery if we set the value to 1. The comment I saw in the code on the hard-coding of this value suggests you've been thinking about doing that for a while.

glazkovalex commented 3 years ago

@mkoziel2000, I have not yet had time to fully investigate this case. I will definitely investigate it further. However, on your recommendation, I closed one ToDo that I had already completely forgotten about, and moved the CommitPeriod constant to the ConsumerAndBehaviorConfig parameter I hope this will be helpful

mkoziel2000 commented 3 years ago

Looks great...definitely will be useful. This along with a solution on supporting SimpleRetryStrategy will definitely allows us to abstract away a good best-practices based deadletter implementation so that developers don't need to write logic in every handler to account for this. A deadletter solution would bring this connector into parity with how we're working with other transport connectors today (e.g. SQLServer and Azure Service Bus). The SimpleRetryStrategy alongside an adjustable ConsumerAndBehaviorConfig should actually give us a stronger argument for Kafka because that would allow us to get "Exactly once" delivery with a built-in deadletter pattern, which would be awesome, The other platforms have a much weaker story around how they could provide such a promise. Of course, "exact once" comes at a cost on performance with the offset being written after every message...but if that sacrifice is accepted, the up-side of reducing the burden of idempotency checking in the handlers can make things much simpler to code.

glazkovalex commented 6 months ago

Looks great...definitely will be useful. This along with a solution on supporting SimpleRetryStrategy will definitely allows us to abstract away a good best-practices based deadletter implementation so that developers don't need to write logic in every handler to account for this. A deadletter solution would bring this connector into parity with how we're working with other transport connectors today (e.g. SQLServer and Azure Service Bus). The SimpleRetryStrategy alongside an adjustable ConsumerAndBehaviorConfig should actually give us a stronger argument for Kafka because that would allow us to get "Exactly once" delivery with a built-in deadletter pattern, which would be awesome, The other platforms have a much weaker story around how they could provide such a promise. Of course, "exact once" comes at a cost on performance with the offset being written after every message...but if that sacrifice is accepted, the up-side of reducing the burden of idempotency checking in the handlers can make things much simpler to code.

Rebus.Kafka version 3.0.1 now supports RetryStrategy - automatic retries and error handling. Confirmations of receipt of messages are now sent not after they are received, but only after successful processing of messages or sending them to the error topic.

That is, now, as you wanted, the committing does not occur after the message is delivered, but only after its successful processing or resending to the error queue.