rabbitmq / rabbitmq-stream-go-client

A client library for RabbitMQ streams
MIT License
169 stars 20 forks source link

Fix the reconnection logic for parallel writes with HA Producer. #335

Closed hiimjako closed 3 months ago

hiimjako commented 3 months ago

If there are parallel calls to Send or BatchSend in concurrent routines during reconnection, only one of these sends will unlock and send the message, while the others will remain in a locked state. This is because currently there isn't a broadcast notification with reconnectionSignal. To fix this issue the reconnectionSignal channel has been changed to a sync.Cond.

Gsantomaggio commented 3 months ago

Great find! Thanks @hiimjako

May I ask to change the example by adding a number for concurrent producers?, like:

   const concurrentProducers = xxx

Then you should test the example with these parameters by restarting the node.

I use:

declare -a arr=$(docker ps -q)

for image in $(docker ps -q); do
   echo Stopping ..$image
   docker restart $image
   echo Waiting 60s ..$image 
   sleep 40
   echo Waiting 20s ..$image
   sleep 20
   echo Waiting 10s ..$image
   sleep 10
   echo Waiting 10s ..$image
   sleep 10
   echo Waiting 10s ..$image
   sleep 10
done 

And I leave it running for some time. The result numbers should be coherent. thank you

hiimjako commented 3 months ago

May I ask to change the example by adding a number for concurrent producers?

Done! In the main branch the example returns more or less half of the messages, as expected, while in this branch all 20M messages.

Gsantomaggio commented 3 months ago

Thank you