Closed siom79 closed 1 year ago
Hi @siom79 ! Thanks for the detailed bug description! I will create a bug ticket in another tool to fix this soon! Cheers, Christopher
We've discussed the issue internally and are not yet convinced that this is a bug:
The TimeoutException
should be logged in MessageListener
but the message will not be committed. Which is correct from our point of view. What you want to have in case of errors is a retry, which the next poll will do for you. In the next poll you will get the same message again and can try to commit it again.
For this to work correctly in your service it is important to implement it in an idempotent way. Practically that means that your service should support retries.
Would you agree with my explanation? What behaviour is it that you want to change exactly?
Cheers,
Christopher
I was expecting a retry for a TimeoutException
. If the timeout is only caused by network congestion, you will get the next chunk of messages by calling poll()
and not the same one.
It is not a bug, but a surprising implementation that you proceed processing without even retrying after a timeout. I was just not expecting that. Especially as the name SyncCommitMLS
suggests that the strategy heavily insists on correct commit behavior.
Compare the documentation of the poll method:
On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last consumed offset can be manually set through seek(TopicPartition, long) or automatically set as the last committed offset for the subscribed list of partitions
As the SyncCommitMLS
only commits after all records we processed, you will get the same chunk in the next poll when the commit did not succeed (due to the timeout).
Even if we retry once after the TimeoutException
I wonder what will happen if also the next try fails. I guess, at some point we need to enter the poll loop again. I don't see the point (yet) why extra handling of the TimeoutException
has a benefit.
Especially as the name SyncCommitMLS suggests that the strategy heavily insists on correct commit behavior.
Well, it does provide you an at-least-once semantic. But what our code cannot fix are some of the so-called fallacies of distributed computing. Network can always be slow. The broker can always be down. The poll loop will just retry in those cases and you have to consider the case in your service: There might be rare cases when consumers see the same record more than once. That's why idempotency is so important.
You won't get the same chunk, if the commit fails. The emphasis lies on the word consumed: "last consumed offset". They do not say "last committed offset". You can see it also in their example in the javadoc here, section "Manual Offset Control". They poll in the loop but only process and commit the records if the size matches their min size. So poll() returns the new records starting from the last "consumed" position, not the last committed position.
It is clear that the receiving service needs to be able to handle the same message twice. That is not the point here.
The point is that without retrying after timeout, you increase the "lag" between committed offset and already processed messages unnecessarily. Just think about a service that is reading from a Kafka topic, transforming messages and then writing the transformed messages to another Kafka topic (which is, what I am doing right now). You just don't want to write another chunk of messages to the output topic before the last chunk hasn't been committed correctly.
What about letting the user decide what to do? You could call the ErrorHandler.handleError()
method inside the RuntimeException
block of the MessageListener
(via the strategy, i.e. new method handleError()
on MessageListenerStrategy
).
The emphasis lies on the word consumed: "last consumed offset". They do not say "last committed offset".
Ok, now I see your point! Let me discuss that issue with the team!
Have you tried our RetryProcessingErrorMLS instead? It already implement a seek
in case of exceptions. It also offers an ErrorHandler
to decide what to do. Also compare the documentation.
The RetryProcessingErrorMLS
strategy only lets me handle errors during MessageHandler.handle()
. It does not allow me to retry in case the commit fails, as the commit is outside the try/catch block (see here).
Starting with the implementation / discussion: https://github.com/SDA-SE/sda-dropwizard-commons/pull/2849
:tada: This issue has been resolved in version 5.7.0 :tada:
The release is available on GitHub release
Your semantic-release bot :package::rocket:
The
SyncCommitMLS
handles the unretrieable exceptionCommitFailedException
(see here) but not the retrieable exceptionTimeoutException
. This causes theTimeoutException
to pass up to theMessageListener
in the catch-block for theRuntimeException
(see here. In this block only the poll interval gets increased, the loop with thepoll()
call does not stop. If now thepoll()
returns successfully new messages, the last chunk has not been committed. TheMessageHandler
gets the new chunk to process (not knowing that his last chunk has not been committed).