Open lm231290 opened 1 month ago
If the whole batch fails, the retry behavior would be pretty much same as if the first record in a batch fails, is my assumption correct?
So If the header is added for batchListenerFailedException
, I think it can also be added for any other (whole batch) errors as it would for batchListenerFailedException.index = 0
I'm not sure what you mean with a BatchListenerFailedException
analogy, but I'd be happy to review some contribution for better understanding how this request could be fixed.
Thanks
The logic around this header is not that simple as it sounds.
See FailedRecordTracker
for details.
Seems like my analogy with BatchListenerFailedException is incorrect.
So my suggestion would be to call
KafkaMessageListenerContainer.internalHeaders
or
KafkaMessageListenerContainer.checkEarlyIntercept
from
KafkaMessageListenerContainer.doInvokeBatchListener
and
KafkaMessageListenerContainer.invokeBatchListenerInTx
.
Currently KafkaMessageListenerContainer.checkEarlyIntercept
is called only from
KafkaMessageListenerContainer.invokeRecordListenerInTx
and
KafkaMessageListenerContainer.doInvokeWithRecords
That's correct. But the problem with the rest of missing API that we deal with the whole batch.
Therefore it is not clear how to inject that KafkaHeaders.DELIVERY_ATTEMPT
header and handle it respectively.
If you have anything in mind, feel free to open pull request and we gladly review it.
I am not so deep into the code, are there any contributors who could take care of it? Maybe the issue should be promoted as a feature request?
Sure! Any one can take an issue for contribution. I am personally, don't see a quick and easy solution, and with limited amount of time resources on my side, not willing to dive deep to the problem which was note advertised before anyway.
@lm231290 We will keep this in the backlog for the time being. If there is an urgent need for anyone in the community, they can design a solution and contribute a PR. If not, we will take a look at this as time permits.
Version 3.3.2
When using blocking retries for a batch listener, the attempt number is not coming in DELIVERY_ATTEMPT header even if setting
setDeliveryAttemptHeader(true)
The method
KafkaMessageListenerContainer.ListenerConsumer.internalHeaders
is called only inConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> recordArg)
which is used for a regular listener, and is not called inConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg)
which is used for a batch listener