fridujo / rabbitmq-mock

Mock for RabbitMQ Java amqp-client
Apache License 2.0
176 stars 32 forks source link

`MockQueue` uses unsynchronized `LinkedHashMap`s which can cause `ConcurrentModificationException` or halts message consumption #316

Open hjohn opened 2 years ago

hjohn commented 2 years ago

The MockQueue class has many unsynchronized LinkedHashMaps. Any access, even read access like calling size() that may occur on different threads must be synchronized or the JVM is free to cache these values for the current thread. This can cause for example a message that is being received by a queue almost simultaneously with a consumer being added to be not delivered (and delivery can stop altogether even if more message are sent). It can also cause ConcurrentModificationExceptions; here's an example that I've seen:

java.util.ConcurrentModificationException: null
at java.base/java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:756)
at java.base/java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:778)
at com.github.fridujo.rabbitmq.mock.MockQueue.doWithUnackedUntil(MockQueue.java:323)
at com.github.fridujo.rabbitmq.mock.MockQueue.basicAck(MockQueue.java:200)
at com.github.fridujo.rabbitmq.mock.MockNode.lambda$basicAck$0(MockNode.java:120)
at java.base/java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4780)
at com.github.fridujo.rabbitmq.mock.MockNode.basicAck(MockNode.java:120)
at com.github.fridujo.rabbitmq.mock.MockChannel.basicAck(MockChannel.java:407)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1162)
at jdk.proxy2/jdk.proxy2.$Proxy181.basicAck(Unknown Source)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.commitIfNecessary(BlockingQueueConsumer.java:876)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1048)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:940)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1317)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1223)
at java.base/java.lang.Thread.run(Thread.java:833)

I've fixed these problems locally, but I've already have two Pull Requests open on this project that have unfortunately not had any response, and I'm not sure if it is worth my time to make a 3rd PR to get this fixed.

The project seems somewhat abandoned -- are you looking for help? I wouldn't mind integrating the fix above, and my other two PR's and cutting a new release.

If it is abandoned, would you have any objection in my forking this project and releasing it to Maven central under my own groupId?

ledoyen commented 2 years ago

Hello @hjohn, sorry for not responding to your previous requests.

The project is not really abandoned, it is just that I have no time currently to take care of it.

I do need help, as I think this project can still (hello testcontainers) help some people / project.

I just reviewed your two previous PRs, could you please make the changes, I will be happy to merge them.

After that, if you want, it will be my pleasure to add you to the team if you want to help keeping this project alive !

hjohn commented 2 years ago

Added a fix for this in #318