The key modification was ensuring that the same future passed to receiveQueueBuffer.receiveMessage is the one that gets completed, regardless of whether a timeout occurs.
Previously, a new future (timeoutFuture) was scheduled to complete after the timeout period, but this future was independent of the one added to the futures queue. Now, instead of creating a new timeout future, we directly complete the original future (receiveMessageFuture) if it hasn't already been completed before the timeout occurs.
Root Cause
The issue occurred because a new CompletableFuture (timeout future) was being created and completed during timeouts, rather than completing the original future added to the ReceiveBuffer. This left the original future uncompleted, causing the buffer to return empty responses.
Fix
The fix ensures that the same receiveMessageFuturepassed to the buffer is completed, even when a timeout occurs. The timeout logic now checks if the original future is complete and directly completes it, avoiding the creation of a separate timeout future. This ensures that the buffer behaves correctly and returns the expected messages.
Modifications
The key modification was ensuring that the same future passed to receiveQueueBuffer.receiveMessage is the one that gets completed, regardless of whether a timeout occurs.
Previously, a new future (timeoutFuture) was scheduled to complete after the timeout period, but this future was independent of the one added to the futures queue. Now, instead of creating a new timeout future, we directly complete the original future (receiveMessageFuture) if it hasn't already been completed before the timeout occurs.
Also we need to honor the user passed waitReceiveTime as the timeoOut time , same as done in V1 , thus removed bypassing .
Testing
Integ test
Junits added
License
[x] I confirm that this pull request can be released under the Apache 2 license
Motivation and Context
The key modification was ensuring that the same future passed to
receiveQueueBuffer.receiveMessage
is the one that gets completed, regardless of whether a timeout occurs.Previously, a new future (timeoutFuture) was scheduled to complete after the timeout period, but this future was independent of the one added to the futures queue. Now, instead of creating a new timeout future, we directly complete the original future (receiveMessageFuture) if it hasn't already been completed before the timeout occurs.
Root Cause
The issue occurred because a new CompletableFuture (timeout future) was being created and completed during timeouts, rather than completing the original future added to the ReceiveBuffer. This left the original future uncompleted, causing the buffer to return empty responses.
Fix
The fix ensures that the same
receiveMessageFuture
passed to the buffer is completed, even when a timeout occurs. The timeout logic now checks if the original future is complete and directly completes it, avoiding the creation of a separate timeout future. This ensures that the buffer behaves correctly and returns the expected messages.Modifications
The key modification was ensuring that the same future passed to
receiveQueueBuffer.receiveMessage
is the one that gets completed, regardless of whether a timeout occurs.Previously, a new future (timeoutFuture) was scheduled to complete after the timeout period, but this future was independent of the one added to the futures queue. Now, instead of creating a new timeout future, we directly complete the original future (receiveMessageFuture) if it hasn't already been completed before the timeout occurs.
Testing
License