The DelayQueue is polled to get the task to be executed based on getDelay of the task.
And so on until the timeout of the task has reach (timeout)
However, the implementation has flaws:
First, it is in the getDelay calculation, we want the task to be executed on waitExpiration instant, however Duration.between exclude the end instant, therefore, the task might be executed 1 millisecond earlier than expected
Second, it is in the logic of calculating the next time to be executed, basically, at the last retries, the value will always be set to requestExpiration.
Combining the two flaws above, we can now explain why the test is flaky (to be fair, the test is correct, it is implementation's issue)
On the last retry of the task, waitExpiration set to requestExpiration, due to the first flaw, the task would be executed 1 millisecond early.
This task would do the polling, and other steps, as there is nothing to read, the task would finish those steps very quick (in less than 1 millisecond), and due to the second flaw, it would then be executed in the exact millisecond that the previous one ran. You can now notice the cycle will continue until that exact millisecond passes.
Hence, that is why we will get many retries at the last retry.
This PR fixes the above flaws by ensuring that:
the task will be executed in the right time for the first flaw
and it checks for the last retry condition for the second flaw
This PR fixes the logic in
KafkaConsumerManager
class in the way it handles consumer backoff for readRecordsThe approach that KafkaConsumerManager uses to execute backoff RunnableReadTask is:
However, the implementation has flaws:
waitExpiration
instant, however Duration.between exclude the end instant, therefore, the task might be executed 1 millisecond earlier than expectedCombining the two flaws above, we can now explain why the test is flaky (to be fair, the test is correct, it is implementation's issue)
This PR fixes the above flaws by ensuring that: