Open johnaohara opened 2 months ago
/cc @cescoffier (reactive-messaging), @ozangunalp (reactive-messaging)
@jponge too
thanks @johnaohara for reporting, I see you already have searched where it doesn't make progress
I've tried to reproduce it locally on 2 different machines with no success; I have to try with a new laptop and see if I'm lucky
I don't believe we changed anything. @ozangunalp any idea?
@johnaohara Thanks for the detailed explanation.
There weren't any changes to the emitter, except the Mutiny update :)
In AMQP you need to have a listener for the address for produced messages to get delivered, this may happen if during restart producer sends messages before the consumer starts listening. But you are saying that emitted messages are enqueued in the buffer and not sent to the underlying client, so there must be something else.
We have similar problem where our applications emitted messages are enqueued into the buffer of BufferItemMultiEmitter
class queue but those are not emitted to the downstream AmqpCreditBasedSender
. This problem seems to happen rarely after there has been connection issues to message broker Amazon MQ (Active MQ variant)
After a lot of debugging I believe the issue is that the BufferItemMultiEmitter.requested
atomic long is decremented while emitting the messages from the queue to downstream to zero but the AmqpCreditBasedSender.requested
atomic long is not in sync and has a bigger value.
The AmqpCreditBasedSender.onNext(Message<?> message)
only requests new messages once the requested
field value is zero.
.subscribe().with(
tuple -> {
if (tuple != null) { // Serialization issue
subscriber.onNext(tuple.getItem2());
if (requested.decrementAndGet() == 0) { // no more credit, request more
onNoMoreCredit(tuple.getItem1());
}
}
},
subscriber::onError);
Why the AmqpCreditBasedSender.requested
gets out of sync seems to happen when there is connection issue after the AmqpSenderImpl.doSend
method has invoked sender.send(updated.unwrap(), ack)
The ack handler did not get invoked for this message that triggered the doSend
.
One way I can reproduce the issue is to suspend the application in ProtonSenderImpl
method:
@Override
public ProtonDelivery send(byte[] tag, Message message, Handler<ProtonDelivery> onUpdated) {
if (anonymousSender && message.getAddress() == null) {
throw new IllegalArgumentException("Message must have an address when using anonymous sender.");
}
// TODO: prevent odd combination of onRecieved callback + SenderSettleMode.SETTLED, or just allow it?
Delivery delivery = sender().delivery(tag); // start a new delivery..
ProtonWritableBufferImpl buffer = new ProtonWritableBufferImpl();
MessageImpl msg = (MessageImpl) message;
msg.encode(buffer);
ReadableBuffer encoded = new ProtonReadableBufferImpl(buffer.getBuffer());
sender().sendNoCopy(encoded);
and stop the activemq and let the application run and start the activeMQ again.
I am coming back to this now.
In our case BufferItemMultiEmitter
requested
is 0 on the first request, I do not even get far enough down the stack to call any methods on AmqpCreditBasedSender
There is nothing complicated in the messaging setup atm. I basically have the following configuration in application.properties:
# schema-sync incoming
mp.messaging.incoming.run-upload-in.connector=smallrye-amqp
mp.messaging.incoming.run-upload-in.address=run-upload
mp.messaging.incoming.run-upload-in.durable=true
mp.messaging.incoming.run-upload-in.container-id=horreum-broker
mp.messaging.incoming.run-upload-in.link-name=run-upload
# schema-sync outgoing
mp.messaging.outgoing.run-upload-out.connector=smallrye-amqp
mp.messaging.outgoing.run-upload-out.address=run-upload
mp.messaging.outgoing.run-upload-out.durable=true
mp.messaging.outgoing.run-upload-out.container-id=horreum-broker
mp.messaging.outgoing.run-upload-out.link-name=run-upload
and an org.eclipse.microprofile.reactive.messaging.Emitter
:
@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10000)
@Channel("run-upload-out")
Emitter<Integer> runUploadEmitter;
I am calling the emitter with;
runUploadEmitter.send(uploadID);
I don't think this is a recent behaviour. our CI has been failing intermittently for a while with errors related to this. My machine was working again, but is consistently failing atm.
I will spend some more time digging into what is happening
Thanks
I've been spending some time to reproduce this.
I was thinking this may happen if the send acknowledgement never returns from the AMQP broker, due to the reconnection, but I couldn't reproduce it. I think overall having container-id
set helps with that.
I can reproduce some reconnection scenarios, there is indeed one of them which throws an IllegalStateException
with the message send not allowed after the sender is closed.
because of the retry mechanism we apply on reactive messaging.
It does retry the send operation without retrying to establish the connection, and thus fails.
I think #40592 is also a similar problem.
BTW I observe that because of the credit-based flow control, when a reconnection happens the order of outgoing messages are no longer respected, even with or without our retry mechanism.
@johnaohara having requested to 0 would be expected if there isn't any requests from the downstream (AmqpCreditBasedSender
). If we understand and fix what is happening on reconnection I think the flow control in mutiny will work correctly.
Hi @ozangunalp thank you for looking into this.
It is worth noting that this problem does not just occur for me on restart, if I start our quarkus based app in dev mode (with the amq broker instantiated by dev services), the first invocation fails as well
@johnaohara I can't reproduce the problem with the dev mode, using the code.quarkus.io amqp 1.0 messaging example (which sends messages at startup using an emitter)
@ozangunalp it does not happen in all environments, and is not always reproducible. For example, my machine has been "working" (i.e. tests in our project work etc) for the past couple of weeks, but occasionally fails without any changes to the messaging code paths, or upgrades. Our CI fails occasionally often with messaging related tests where messages are not delivered, but if I re-run the tests they pass.
I looks like a race cond at startup, when BaseMultiEmitter.requested
initial value is set, but that property is an AtomicLong
so is protected by mem barriers, but idk about the other startup logic. I am going to spend some time digging today
@ozangunalp I have noticed that we are currently running on an older quarkus release (3.8.4) and smallrye-reactive-messaging-amqp has been upgrade from 4.18.0 to 4.21.0 between quarkus 3.8.4 and 3.10.1
The implementation of io.smallrye.mutiny.operators.multi.builders.BufferItemMultiEmitter.drain()
has been re-factored between those releases.
I will try and upgrade to quarkus 3.10.0 and see if we still have the problem.
I've tested the dev mode both on 3.8.4 and 3.10.1 with a raspberrypi (for a change). I couldn't reproduce it.
There were some instances when forcing restart on dev mode, and not using the container-id
, the newly created app would create a new queue for the address, therefore some messages were missing. But when I force the container-id
, I no longer have that issue.
@johnaohara During your tests, were you able to check on the Artemis UI whether messages are queued and not delivered to the consumer? Or were messages stuck on the emitter buffer?
@ozangunalp yes I checked the Aretmis UI and there are not messages in the queue.
I can see the messages are all backed up in memory in BufferItemMultiEmitter.queue
Trying to recreate this consistently has been tricky. I have a very simple application that exhibits the behaviour on the machine that tends to fail: https://github.com/johnaohara/quarkus-issue-40118
This application works as expected on my laptop.
I have not been able to recreate the issue where the emitter fails on the first startup of the application, only on restart
desktop:
$ uname -a
Linux fedora 6.8.8-200.fc39.x86_64 #1 SMP PREEMPT_DYNAMIC Sat Apr 27 17:42:13 UTC 2024 x86_64 GNU/Linux
$ java --version
openjdk 21.0.1 2023-10-17 LTS
OpenJDK Runtime Environment Temurin-21.0.1+12 (build 21.0.1+12-LTS)
OpenJDK 64-Bit Server VM Temurin-21.0.1+12 (build 21.0.1+12-LTS, mixed mode, sharing)
$ docker --version
Docker version 24.0.7, build afdd53b
To reproduce;
$ mvn clean package -DksipTests -DskipITs
$ mvn quarkus:dev
2. Send a http command to the rest api
`
$ curl -X POST http://localhost:8080/api
`
You should see some messages written to the console that has been sent through the broker
![image](https://github.com/quarkusio/quarkus/assets/959822/b3a9f79d-23b0-4b8e-a1ad-698cdac8bf43)
3. Type "s" into the dev mode console to restart the application
4. Send another http command to the rest api, on the machine that has problems this fails
![image](https://github.com/quarkusio/quarkus/assets/959822/7a27a9fb-a2fa-4029-bb72-c053c1f133ef)
with error message
2024-05-17 09:59:06,104 ERROR [io.sma.rea.mes.amqp] (executor-thread-1) SRMSG16225: Failure reported for channel run-upload-out
, closing client: io.smallrye.mutiny.subscription.BackPressureFailure: Could not emit value due to lack of requests
at io.smallrye.mutiny.operators.multi.builders.EmitterBasedMulti$ErrorOnOverflowMultiEmitter.onOverflow(EmitterBasedMulti.java:134)
at io.smallrye.mutiny.operators.multi.builders.EmitterBasedMulti$NoOverflowBaseMultiEmitter.emit(EmitterBasedMulti.java:105)
at io.smallrye.mutiny.operators.multi.builders.SerializedMultiEmitter.onItem(SerializedMultiEmitter.java:50)
at io.smallrye.mutiny.operators.multi.builders.SerializedMultiEmitter.emit(SerializedMultiEmitter.java:140)
at io.smallrye.reactive.messaging.providers.extension.AbstractEmitter.emit(AbstractEmitter.java:176)
at io.smallrye.reactive.messaging.providers.extension.EmitterImpl.send(EmitterImpl.java:31)
at org.acme.MyMessagingApplication.sendMsg(MyMessagingApplication.java:20)
at org.acme.MyMessagingApplication_ClientProxy.sendMsg(Unknown Source)
at org.acme.ApiService.testMessaging(ApiService.java:18)
at org.acme.ApiService$quarkusrestinvoker$testMessaging_c559ec23349c7474752cecba71550f3d34c5016d.invoke(Unknown Source)
at org.jboss.resteasy.reactive.server.handlers.InvocationHandler.handle(InvocationHandler.java:29)
at io.quarkus.resteasy.reactive.server.runtime.QuarkusResteasyReactiveRequestContext.invokeHandler(QuarkusResteasyReactiveRequestContext.java:141)
at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:147)
at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:599)
at org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2516)
at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2495)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1521)
at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:11)
at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:11)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:1583)
java.lang.Exception: Missing onFailure/onError handler in the subscriber at io.smallrye.mutiny.subscription.Subscribers.lambda$static$0(Subscribers.java:18) at io.smallrye.mutiny.subscription.Subscribers$CallbackBasedSubscriber.onFailure(Subscribers.java:93) at io.smallrye.mutiny.operators.multi.MultiOnFailureInvoke$MultiOnFailureInvokeProcessor.onFailure(MultiOnFailureInvoke.java:50) at io.smallrye.mutiny.subscription.MultiSubscriber.onError(MultiSubscriber.java:73) at io.smallrye.reactive.messaging.amqp.AmqpCreditBasedSender.onError(AmqpCreditBasedSender.java:238) at io.smallrye.reactive.messaging.providers.helpers.MultiUtils$1.onFailure(MultiUtils.java:83) at io.smallrye.mutiny.subscription.MultiSubscriber.onError(MultiSubscriber.java:73) at io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor$2.onError(DevModeSupportConnectorFactoryInterceptor.java:115) at io.smallrye.mutiny.subscription.MultiSubscriberAdapter.onFailure(MultiSubscriberAdapter.java:32) at io.smallrye.mutiny.operators.multi.builders.BaseMultiEmitter.failed(BaseMultiEmitter.java:89) at io.smallrye.mutiny.operators.multi.builders.BaseMultiEmitter.fail(BaseMultiEmitter.java:78) at io.smallrye.mutiny.operators.multi.builders.EmitterBasedMulti$ErrorOnOverflowMultiEmitter.onOverflow(EmitterBasedMulti.java:134) at io.smallrye.mutiny.operators.multi.builders.EmitterBasedMulti$NoOverflowBaseMultiEmitter.emit(EmitterBasedMulti.java:105) at io.smallrye.mutiny.operators.multi.builders.SerializedMultiEmitter.onItem(SerializedMultiEmitter.java:50) at io.smallrye.mutiny.operators.multi.builders.SerializedMultiEmitter.emit(SerializedMultiEmitter.java:140) at io.smallrye.reactive.messaging.providers.extension.AbstractEmitter.emit(AbstractEmitter.java:176) at io.smallrye.reactive.messaging.providers.extension.EmitterImpl.send(EmitterImpl.java:31) at org.acme.MyMessagingApplication.sendMsg(MyMessagingApplication.java:20) at org.acme.MyMessagingApplication_ClientProxy.sendMsg(Unknown Source) at org.acme.ApiService.testMessaging(ApiService.java:18) at org.acme.ApiService$quarkusrestinvoker$testMessaging_c559ec23349c7474752cecba71550f3d34c5016d.invoke(Unknown Source) at org.jboss.resteasy.reactive.server.handlers.InvocationHandler.handle(InvocationHandler.java:29) at io.quarkus.resteasy.reactive.server.runtime.QuarkusResteasyReactiveRequestContext.invokeHandler(QuarkusResteasyReactiveRequestContext.java:141) at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:147) at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:599) at org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2516) at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2495) at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1521) at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:11) at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:11) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: io.smallrye.mutiny.subscription.BackPressureFailure: Could not emit value due to lack of requests
I have managed to trigger the condition on my "working" machine (although only once)
There appears to be a race condition on org.apache.qpid.proton.engine.impl.LinkImpl._credit
where the property is accessed via getCredit()
and addCredit(int credit)
I can set a debug breakpoint on setCredit to occasionally trigger the condition in the working env
If I enable debug messages, on the working machine i see the following log;
2024-05-17 11:01:41,062 INFO [io.qua.sma.dep.processor] (build-7) Configuring the channel 'run-upload-in' to be managed by the connector 'smallrye-amqp'
2024-05-17 11:01:41,072 INFO [io.qua.sma.dep.processor] (build-7) Configuring the channel 'run-upload-out' to be managed by the connector 'smallrye-amqp'
2024-05-17 11:01:46,292 INFO [io.qua.sma.rea.amq.dep.AmqpDevServicesProcessor] (build-21) Dev Services for AMQP started. Other Quarkus applications in dev mode will find the broker automatically. For Quarkus applications in production mode, you can connect to this by starting your application with -Damqp.host=localhost -Damqp.port=38345 -Damqp.user=admin -Damqp.password=admin
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2024-05-17 11:01:46,887 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:38345 for channel run-upload-out
2024-05-17 11:01:46,911 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-05-17 11:01:46,923 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-05-17 11:01:46,995 INFO [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 3.10.1) started in 6.614s. Listening on: http://localhost:8080
2024-05-17 11:01:46,995 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2024-05-17 11:01:46,996 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, messaging, messaging-amqp, rest, smallrye-context-propagation, vertx]
2024-05-17 11:01:47,094 INFO [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16213: Connection with AMQP broker established
2024-05-17 11:01:47,102 INFO [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16213: Connection with AMQP broker established
2024-05-17 11:01:47,134 DEBUG [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16222: Retrieved credits for channel `run-upload-out`: 1000
2024-05-17 11:01:47,137 INFO [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16203: AMQP Receiver listening address run-upload
2024-05-17 11:01:48,820 INFO [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Restarting as requested by the user.
2024-05-17 11:01:48,835 INFO [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer stopped in 0.013s
2024-05-17 11:01:48,945 INFO [io.qua.sma.dep.processor] (build-35) Configuring the channel 'run-upload-in' to be managed by the connector 'smallrye-amqp'
2024-05-17 11:01:48,945 INFO [io.qua.sma.dep.processor] (build-35) Configuring the channel 'run-upload-out' to be managed by the connector 'smallrye-amqp'
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2024-05-17 11:01:49,265 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:38345 for channel run-upload-in
2024-05-17 11:01:49,267 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:38345 for channel run-upload-out
2024-05-17 11:01:49,269 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-05-17 11:01:49,270 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-05-17 11:01:49,271 INFO [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 3.10.1) started in 0.436s. Listening on: http://localhost:8080
2024-05-17 11:01:49,272 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2024-05-17 11:01:49,272 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, messaging, messaging-amqp, rest, smallrye-context-propagation, vertx]
2024-05-17 11:01:49,273 INFO [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Live reload total time: 0.457s
2024-05-17 11:01:49,289 INFO [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-3) SRMSG16213: Connection with AMQP broker established
2024-05-17 11:01:49,291 INFO [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-3) SRMSG16213: Connection with AMQP broker established
2024-05-17 11:01:49,296 INFO [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-3) SRMSG16203: AMQP Receiver listening address run-upload
2024-05-17 11:01:49,298 DEBUG [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-3) SRMSG16222: Retrieved credits for channel `run-upload-out`: 1000
on the failing machine:
2024-05-17 11:00:42,126 INFO [io.qua.sma.dep.processor] (build-14) Configuring the channel 'run-upload-in' to be managed by the connector 'smallrye-amqp'
2024-05-17 11:00:42,126 INFO [io.qua.sma.dep.processor] (build-14) Configuring the channel 'run-upload-out' to be managed by the connector 'smallrye-amqp'
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2024-05-17 11:00:42,369 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:32974 for channel run-upload-in
2024-05-17 11:00:42,370 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:32974 for channel run-upload-out
2024-05-17 11:00:42,372 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-05-17 11:00:42,373 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-05-17 11:00:42,374 INFO [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 3.10.1) started in 0.347s. Listening on: http://localhost:8080
2024-05-17 11:00:42,374 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2024-05-17 11:00:42,374 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, messaging, messaging-amqp, rest, smallrye-context-propagation, vertx]
2024-05-17 11:00:42,375 INFO [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Live reload total time: 0.362s
2024-05-17 11:00:42,381 INFO [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-3) SRMSG16213: Connection with AMQP broker established
2024-05-17 11:00:42,382 INFO [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-3) SRMSG16213: Connection with AMQP broker established
2024-05-17 11:00:42,383 DEBUG [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-3) SRMSG16222: Retrieved credits for channel `run-upload-out`: 1000
2024-05-17 11:00:42,385 INFO [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-3) SRMSG16203: AMQP Receiver listening address run-upload
2024-05-17 11:00:45,034 INFO [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Restarting as requested by the user.
2024-05-17 11:00:45,037 INFO [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer stopped in 0.002s
2024-05-17 11:00:45,136 INFO [io.qua.sma.dep.processor] (build-2) Configuring the channel 'run-upload-in' to be managed by the connector 'smallrye-amqp'
2024-05-17 11:00:45,136 INFO [io.qua.sma.dep.processor] (build-2) Configuring the channel 'run-upload-out' to be managed by the connector 'smallrye-amqp'
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2024-05-17 11:00:45,357 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:32974 for channel run-upload-in
2024-05-17 11:00:45,358 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:32974 for channel run-upload-out
2024-05-17 11:00:45,360 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-05-17 11:00:45,361 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-05-17 11:00:45,361 INFO [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 3.10.1) started in 0.323s. Listening on: http://localhost:8080
2024-05-17 11:00:45,362 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2024-05-17 11:00:45,362 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, messaging, messaging-amqp, rest, smallrye-context-propagation, vertx]
2024-05-17 11:00:45,362 INFO [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Live reload total time: 0.329s
2024-05-17 11:00:45,368 INFO [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-4) SRMSG16213: Connection with AMQP broker established
2024-05-17 11:00:45,369 INFO [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-4) SRMSG16213: Connection with AMQP broker established
2024-05-17 11:00:45,370 DEBUG [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-4) SRMSG16223: No more credit for channel run-upload-out, requesting more credits
2024-05-17 11:00:45,371 INFO [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-4) SRMSG16203: AMQP Receiver listening address run-upload
here appears to be a race condition on org.apache.qpid.proton.engine.impl.LinkImpl._credit where the property is accessed via getCredit() and addCredit(int credit)
In theory the proton stuff should always run from within their Netty event loop threads, @gemmellr Am i right?
here appears to be a race condition on org.apache.qpid.proton.engine.impl.LinkImpl._credit where the property is accessed via getCredit() and addCredit(int credit)
In theory the proton stuff should always run from within their Netty event loop threads, @gemmellr Am i right?
They do, I am now a little bit further down the stack, looks like on a restart, we are missing a LINK_FLOW
proton event during handshake on the machine that is not working
here appears to be a race condition on org.apache.qpid.proton.engine.impl.LinkImpl._credit where the property is accessed via getCredit() and addCredit(int credit)
In theory the proton stuff should always run from within their Netty event loop threads, @gemmellr Am i right?
That is correct, its expressly single threaded, so it cant 'race' unless being mis-used.
If you are not seeing a flow event, the most likely reason is a flow frame was not sent. I'd be looking at whether you are falling foul of flow control by the broker to block production (by not flow'ing credit to send anything), e.g due to its current memory and/or disk limits configuration (e.g the broker defaults to a 90% max-disk-usage limit if not otherwise configured).
You can more closely see what is actually sent using env variable PN_TRACE_FRM=true (on client and/or broker sides since both are using proton-j underneath) to provoke a protocol trace to stdout.
@gemmellr thank you very much for the info.
wrt the "race"; in quarkus dev mode, when the application is restarted, the i/o processing move from one eventloop thread to another. At start up all buffer processing is running on vert.x-eventloop-thread-01
, when it is restarted, the vert.x-eventloop-thread-02
thread processors buffers. That is why I was concerned it might be a race cond. I have since ruled this out as a possible cause by verifying that a restart the thread remains consistent.
I have captured the network packets, and i am seeing the flow packet being retruned from the broker, with the expected number of credits. However, this is not being propagated to ProtonTransport.handleSocketBuffer()
(in vert.x)
I am still currently digging through the code path to understand why this packet is not handled correctly.
So this issue is caused by flow control :(
The disk that was mounted into the broker container was 95% full, the first client connection works, but subsequent connections are not allocated and credits to send requests
@ozangunalp Am wondering if we can capture this state and warn users when this is the case? atm it all appears to work, but there are no credits to send messages. This condition appears to be tested for already in BufferItemMultiEmitter.drain()
so could we add a user warning when it detects that there are no credits?
Thank you very much @gemmellr for pointing me in the right direction!
@johnaohara So it was the disk mounted to the container that's full?
Actually, we've the message (on debug maybe it can be changed): No more credit for channel run-upload-out, requesting more credits
It tries getting credits every 2 seconds by default.
yeah, looking back, it was shown as a debug message:
2024-05-17 11:00:45,370 DEBUG [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-4) SRMSG16223: No more credit for channel run-upload-out, requesting more credits
Maybe this could be a warning? idk what other situations where this could be an expected state.
I think if this message had been a warning I would have started looking into that message
@johnaohara So it was the disk mounted to the container that's full?
Yes, that was the root cause, it was at 95% of capacity
I think if this message had been a warning I would have started looking into that message
I agree, let's change that to warn.
I wouldnt necessarily do that on every check, you may end up repeatedly emitting the warning any time new credit isnt being granted quite as fast as you could send, which may be entirely expected behaviour, at which point youll instead start getting questions about why it is warning in the course of doing what its meant to be doing (as it actually is now...). I'd either establish a larger time over which it is warned, or say it is actually info.
I wouldnt necessarily do that on every check, you may end up repeatedly emitting the warning any time new credit isnt being granted quite as fast as you could send, which may be entirely expected behaviour, at which point youll instead start getting questions about why it is warning in the course of doing what its meant to be doing (as it actually is now...). I'd either establish a larger time over which it is warned, or say it is actually info.
I think that is a fair point, esp. when running in prod mode. When running in Dev/test mode, might we want a logging different level?
Describe the bug
We have an application that uses AMQ broker for async message processing and were experiencing test failures in our test suite where messages are not being passed to the AMQ broker via
quarkus-smallrye-reactive-messaging-amqp
clientOur test suite uses an AMQ broker that is automatically provisioned by dev services
What I noticed was when we called
org.eclipse.microprofile.reactive.messaging.Emitter.send()
with a msg payload, the messages were being enqueued in a buffer, but not delivered to the underlying AMQ client. Therefore the messages were not delivered to the broker.In order to reproduce this issue, I created a sample application from code.quarkus.io, just selecting the
Messaging - AMQP Connector [quarkus-messaging-amqp]
extension.I found that if I start that application in dev mode the messages are processed as expected, but if i restart dev mode 3-4 times (by pressing
s
in the dev console) the messages are no longer delivered to the broker and they are buffered in a queue, in the same way our test suites behaves.There appears to be a race/bug where the
requested
counter inio.smallrye.mutiny.operators.multi.builders.BufferItemMultiEmitter
is set to0
during a restart and in test mode and prevents the call todrain()
from emitting the messagesThis does not appear to happen on all machines. I see this issue in Fedora 39 on x86_64 , but our CI environment (github) or Mac M2 does not demonstrate this behaviour.
Expected behavior
The messages in the sample app to be outputted every time quarkus is restarted in dev mode:
Actual behavior
After 1-2 restarts, the messages are no longer dispatched to the AMQ broker:
How to Reproduce?
Messaging - AMQP Connector [quarkus-messaging-amqp]
extensions
in the terminalOutput of
uname -a
orver
Linux fedora 6.8.4-200.fc39.x86_64 #1 SMP PREEMPT_DYNAMIC Thu Apr 4 20:45:21 UTC 2024 x86_64 GNU/Linux
Output of
java -version
openjdk version "21.0.1" 2023-10-17 LTS OpenJDK Runtime Environment Temurin-21.0.1+12 (build 21.0.1+12-LTS) OpenJDK 64-Bit Server VM Temurin-21.0.1+12 (build 21.0.1+12-LTS, mixed mode, sharing)
Quarkus version or git rev
No response
Build tool (ie. output of
mvnw --version
orgradlew --version
)3.9.3
Additional information
No response