quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.75k stars 2.67k forks source link

MQTT reactive stream transformer fails to process events due to lack of requests #42607

Open kdurnoga opened 2 months ago

kdurnoga commented 2 months ago

Describe the bug

Apparently, Quarkus 3.13.0 (through one of its transitive dependencies) broke MQTT reactive messaging processors of the form:

@Incoming("source")
@Outgoing("sink")
Multi<String> process(Multi<String> source) {
    return source;
}

One of the stream subscribers installed by Quarkus requests only 16 items (this is a hardcoded value in one of the Mutiny's operators: https://github.com/smallrye/smallrye-mutiny/blob/main/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java#L89) from source and therefore it runs into backpressure issue if the source emits items quicker, before another batch of items is requested. Before, i.e., in Quarkus 3.12.3, the number of items requested was set to Long.MAX_VALUE.

Changing the buffer-size setting does not seem to influence this behaviour. Neither does controlling the demand with Mutiny's paceDemand() operator.

The root cause probably pertains to one of Quarkus's dependencies, with io.smallrye.reactive:smallrye-reactive-messaging being logically the first suspect. However, I’ve tried forcing a downgraded version of this library to no avail. Debugging reactive streams is notoriously tricky...

Expected behavior

The reproducer code works fine without runtime errors.

Actual behavior

The code may fail with SRMSG17105: Unable to establish a connection with the MQTT broker: io.smallrye.mutiny.subscription.BackPressureFailure: Could not emit item downstream due to lack of requests. Note that the issue is not deterministic, it may depend on how threads interleave and the number of available cores, I guess.

How to Reproduce?

Build&run the following project: https://github.com/kdurnoga/mqtt-report.

The project features a dummy generator that produces events in batches of 17 (this number corresponds to the number of items requested by Mutiny, mentioned above) every second. You may experiment with lower values (I get backpressure errors with numbers as low as 5), but with 17 you should run into this issue fairly quickly.

If you downgrade to Quarkus 3.12.3 the code should run fine.

Output of uname -a or ver

Microsoft Windows [Version 10.0.19045.4780]

Output of java -version

openjdk 21.0.3 2024-04-16 LTS

Quarkus version or git rev

3.13.2

Build tool (ie. output of mvnw --version or gradlew --version)

No response

Additional information

No response

quarkus-bot[bot] commented 2 months ago

/cc @cescoffier (reactive-messaging), @ozangunalp (reactive-messaging)

ozangunalp commented 2 months ago

This indeed looks like a regression. MQTT channels have a buffer-size attribute for this reason but it is not properly applied.

kdurnoga commented 2 weeks ago

@ozangunalp Would you mind setting a milestone for this issue? I'm looking forward to upgrade my app to the latest Quarkus but this issue is a blocker...

brunohorta82 commented 2 weeks ago

the problem persists on quarkus version 3.15.1

brunohorta82 commented 2 weeks ago

2024-10-16 10:50:30,678 ERROR [io.sma.rea.mes.mqtt] (vert.x-eventloop-thread-1) SRMSG17105: Unable to establish a connection with the MQTT broker: io.smallrye.mutiny.subscription.BackPressureFailure: Could not emit item downstream due to lack of requests at io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor$BroadcastSubscription.onNext(BroadcastProcessor.java:217) at io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor.onNext(BroadcastProcessor.java:139) at io.smallrye.reactive.messaging.mqtt.Clients$ClientHolder.lambda$new$0(Clients.java:93) at io.smallrye.reactive.messaging.mqtt.session.impl.MqttClientSessionImpl.serverPublished(MqttClientSessionImpl.java:578) at io.vertx.mqtt.impl.MqttClientImpl.handlePublish(MqttClientImpl.java:1189) at io.vertx.mqtt.impl.MqttClientImpl.handleMessage(MqttClientImpl.java:954) at io.vertx.mqtt.impl.MqttClientImpl.lambda$null$0(MqttClientImpl.java:250) at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:328) at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:321) at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:388) at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:159) at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:1583)

micaelsf commented 2 weeks ago

I also have this issue on 3.15.1, I had to revert Quarkus version to the one I had before (3.9.4) to avoid this for now