smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
237 stars 177 forks source link

RabbitMQ - BackPressureFailure: Buffer full #1906

Open derari opened 2 years ago

derari commented 2 years ago

I am regulary getting BackPressureFailures from my RabbitMQ consumer in my Quarkus (2.12.2-Final, Java 17) application.

io.smallrye.mutiny.subscription.BackPressureFailure: Buffer full, cannot emit item
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.failOverflow(MultiFlatMapOp.java:541)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.tryEmit(MultiFlatMapOp.java:243)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onItem(MultiFlatMapOp.java:607)
    at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
    at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.vertx.MultiReadStream.lambda$subscribe$2(MultiReadStream.java:77)
    at io.vertx.rabbitmq.impl.RabbitMQConsumerImpl.lambda$handler$0(RabbitMQConsumerImpl.java:59)
    at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:239)
    at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:129)
    at io.vertx.rabbitmq.impl.RabbitMQConsumerImpl.handleMessage(RabbitMQConsumerImpl.java:150)
    at io.vertx.rabbitmq.impl.QueueConsumerHandler.lambda$handleDelivery$0(QueueConsumerHandler.java:39)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
    at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:833)
    at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:596)
    at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)

The weird thing is that I have configured max-outstanding-messages, so in my understanding it should be impossible to get this error.

mp:
  messaging:
    incoming:
      new-message:
        automatic-recovery-enabled: true
        max-outstanding-messages: 2
        exchange:
          name: x
          type: topic
        queue:
          name: x
        routing-keys:
          - x.#

Here's the code that handles messages. consume contains a catch-all statement.

   @Incoming("new-event")
    @Blocking
    public CompletionStage<Void> newMessage(Message<?> message) {
        var meta = message.getMetadata(IncomingRabbitMQMetadata.class);
        meta.ifPresent(amqp -> {
            var key = amqp.getRoutingKey();
            LOG.info("Message {} received", key);
            consume(key, amqp.getHeaders());
        });
        return message.ack();
    }

Any ideas how to approach this?

samodadela commented 1 year ago

Was there any progress with this? I'm also seeing the same error... only restarting the app helps.

eferraris commented 1 year ago

also

I'm getting this exception too! And @samodadela is right. It only starts working again after a restart.

raminbp commented 1 year ago

I have the exact problem.

wind57 commented 1 year ago

same problem over here, any news on this one?

mjedwabn commented 1 year ago

Repro:

  1. Fill source queue with messages
  2. Start consumer
  3. Terminate incoming connection using rmq management UI

Consumer will try to recover: it manages to restore connection, channel and basicConsumer but it fails to resume actual consuming:

72
2023-02-14 14:55:50,835 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-1) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 14:55:51,298 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 14:55:51,797 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
73
2023-02-14 14:55:51,840 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-1) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 14:55:52,295 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 14:55:52,799 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
74
2023-02-14 14:55:52,845 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-1) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 14:55:53,298 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 14:55:53,797 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
75
2023-02-14 14:55:53,851 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-1) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 14:55:54,252 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-11-thread-9) consumer has been shutdown unexpectedly: amq.ctag-Cj8_-cjNhLcv9Y1UV2N0hA
2023-02-14 14:55:54,254 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (pool-11-thread-9) Start to reconnect...
2023-02-14 14:55:54,255 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (pool-11-thread-9) Stopping rabbitmq client
2023-02-14 14:55:54,255 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-0) Disconnecting from rabbitmq...
2023-02-14 14:55:54,258 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-0) Already disconnected from rabbitmq !
2023-02-14 14:55:54,270 WARN  [com.rab.cli.imp.ForgivingExceptionHandler] (AMQP Connection 127.0.0.1:51806) An unexpected connection driver error occured (Exception message: Socket closed)
2023-02-14 14:55:54,271 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (AMQP Connection 127.0.0.1:51806) RabbitMQ connection shutdown! The client will attempt to reconnect automatically: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:985)
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:975)
    at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:913)
    at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:868)
    at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:265)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:719)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
    at java.base/java.lang.Thread.run(Thread.java:833)

2023-02-14 14:55:54,271 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (AMQP Connection 127.0.0.1:51806) Other consumers or producers are reconnecting. Continue to wait for reconnection
2023-02-14 14:55:54,271 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-2) Starting rabbitmq client
2023-02-14 14:55:54,271 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-0) Connecting to rabbitmq...
2023-02-14 14:55:54,284 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-0) Connected to rabbitmq !
2023-02-14 14:55:54,284 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-2) SRMSG17023: Established exchange `generator`
2023-02-14 14:55:54,285 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-2) SRMSG17025: Established queue `source`
2023-02-14 14:55:54,286 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-2) SRMSG17027: Established binding of queue `source` to exchange 'generator' using routing key '#'
2023-02-14 14:55:54,286 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-2) Successed to restart client. 
2023-02-14 14:55:54,288 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-0) Reconsume queue: source success
2023-02-14 14:55:54,288 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-14-thread-3) Consumer tag is now amq.ctag-3Y2J11Tsn5iddvjqU1n3OQ
2023-02-14 14:55:54,290 ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-0) SRMSG00201: Error caught while processing a message in method org.acme.Main#process: io.smallrye.mutiny.subscription.BackPressureFailure: Buffer full, cannot emit item
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.failOverflow(MultiFlatMapOp.java:541)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.tryEmit(MultiFlatMapOp.java:243)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onItem(MultiFlatMapOp.java:607)
    at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
    at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.vertx.MultiReadStream.lambda$subscribe$2(MultiReadStream.java:77)
    at io.vertx.rabbitmq.impl.RabbitMQConsumerImpl.lambda$handler$0(RabbitMQConsumerImpl.java:59)
    at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:239)
    at io.vertx.core.streams.impl.InboundBuffer.drain(InboundBuffer.java:226)
    at io.vertx.core.streams.impl.InboundBuffer.lambda$fetch$0(InboundBuffer.java:279)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
    at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)

2023-02-14 14:55:54,290 ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-0) SRMSG00201: Error caught while processing a message in method org.acme.Main#process: io.smallrye.mutiny.subscription.BackPressureFailure: Buffer full, cannot emit item
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.failOverflow(MultiFlatMapOp.java:541)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.tryEmit(MultiFlatMapOp.java:243)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onItem(MultiFlatMapOp.java:607)
    at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
    at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.vertx.MultiReadStream.lambda$subscribe$2(MultiReadStream.java:77)
    at io.vertx.rabbitmq.impl.RabbitMQConsumerImpl.lambda$handler$0(RabbitMQConsumerImpl.java:59)
    at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:239)
    at io.vertx.core.streams.impl.InboundBuffer.drain(InboundBuffer.java:226)
    at io.vertx.core.streams.impl.InboundBuffer.lambda$fetch$0(InboundBuffer.java:279)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
    at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)

2023-02-14 14:55:54,295 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 14:55:54,799 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
76
2023-02-14 14:55:54,857 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-1) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 14:55:55,298 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 14:55:55,799 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 14:55:56,299 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.annotations.Blocking;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;

@ApplicationScoped
class Main {
    @Outgoing("generator")
    Multi<Integer> generator() {
        AtomicInteger count = new AtomicInteger();
        return Multi.createFrom().ticks().every(Duration.ofMillis(500))
                .map(l -> count.incrementAndGet())
                .onOverflow().drop();
    }

    @Incoming("source")
    @Blocking
    @Acknowledgment(Acknowledgment.Strategy.MANUAL)
    CompletionStage<Void> process(Message<String> in) throws InterruptedException {
        final String payload = in.getPayload();

        Thread.sleep(1000);
        System.out.println(payload);

        return in.ack();
    }
}
mp.messaging.outgoing.generator.connector=smallrye-rabbitmq
mp.messaging.outgoing.generator.exchange.name=generator

mp.messaging.incoming.source.connector=smallrye-rabbitmq
mp.messaging.incoming.source.exchange.name=generator
mp.messaging.incoming.source.queue.name=source
mp.messaging.incoming.source.max-outstanding-messages=50
mp.messaging.incoming.source.automatic-recovery-on-initial-connection=false
mp.messaging.incoming.source.rabbitmq-reconnect-attempts=5000
mp.messaging.incoming.source.rabbitmq-reconnect-interval=2

However, given queue is empty, when connection is terminated, then consumer fully recovers and is able to process new messages.

mjedwabn commented 1 year ago

Not sure if this is related but there is also an issue with qos (prefetch count) not getting reapplied after connection failure - consumer buffers all messages.

initial state: image

<-- connection closed -->

after that: image

wind57 commented 1 year ago

@mjedwabn the qos not being re-applied on re-connect is exactly what we are facing!

mjedwabn commented 1 year ago

There is only one place in Microprofile Messaging where qos gets set: https://github.com/smallrye/smallrye-reactive-messaging/blob/3d0c49d6ed62824cfda22fc74c3b6816f58ba786/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java#L288

By looking at the underneath rabbitmq-vertx client, there are scenarios where new channel is spawned without applying qos; in fact library never sets qos by itself. So I did some ugly changes locally in rabbitmq-vertx client, built it and imported to demo project: qos is now effective, even after connection interrupted but it didn't help much with the Buffer full, cannot emit item itself.

I think some message gets jammed during this failure/recovery process and never gets acked/pushed out from the buffer. Debug indicates that q is an instance of SingletonQueue - queue holding a single element. https://github.com/smallrye/smallrye-mutiny/blob/1.9.0/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiFlatMapOp.java#L242-L243

mjedwabn commented 1 year ago

With max-outstanding-messages=1 it works, "kind of" - consuming is resumed but channel gets recreated every time and redeliveries can be observed:

image

payload: 93
2023-02-14 16:50:02,271 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:02,590 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 16:50:03,087 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
payload: 94
2023-02-14 16:50:03,289 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:03,589 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 16:50:04,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
payload: 95
2023-02-14 16:50:04,304 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:04,591 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 16:50:05,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
payload: 96
2023-02-14 16:50:05,313 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd

connection closed somewhere here

2023-02-14 16:50:05,590 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 16:50:06,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
payload: 97
2023-02-14 16:50:06,331 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:06,590 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 16:50:06,669 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-10-thread-12) consumer has been shutdown unexpectedly: amq.ctag-jCKAa3H0h7nbKS7OTYKBgw
2023-02-14 16:50:06,673 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (pool-10-thread-12) Start to reconnect...
2023-02-14 16:50:06,674 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (pool-10-thread-12) Stopping rabbitmq client
2023-02-14 16:50:06,675 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnecting from rabbitmq...
2023-02-14 16:50:06,688 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Already disconnected from rabbitmq !
2023-02-14 16:50:06,692 WARN  [com.rab.cli.imp.ForgivingExceptionHandler] (AMQP Connection 127.0.0.1:57763) An unexpected connection driver error occurred (Exception message: Socket closed)
2023-02-14 16:50:06,692 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (AMQP Connection 127.0.0.1:57763) RabbitMQ connection shutdown! The client will attempt to reconnect automatically: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:985)
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:975)
    at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:913)
    at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:868)
    at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:265)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:719)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
    at java.base/java.lang.Thread.run(Thread.java:833)

2023-02-14 16:50:06,692 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-2) Starting rabbitmq client
2023-02-14 16:50:06,692 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connecting to rabbitmq...
2023-02-14 16:50:06,692 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (AMQP Connection 127.0.0.1:57763) Other consumers or producers are reconnecting. Continue to wait for reconnection
2023-02-14 16:50:06,699 DEBUG [com.rab.cli.imp.ConsumerWorkService] (vert.x-worker-thread-1) Creating executor service with 10 thread(s) for consumer work service
2023-02-14 16:50:06,706 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connected to rabbitmq !
2023-02-14 16:50:06,707 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-2) SRMSG17023: Established exchange `generator`
2023-02-14 16:50:06,708 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-2) SRMSG17025: Established queue `source`
2023-02-14 16:50:06,709 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-2) SRMSG17027: Established binding of queue `source` to exchange 'generator' using routing key '#'
2023-02-14 16:50:06,710 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-2) Successed to restart client. 
2023-02-14 16:50:06,711 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Reconsume queue: source success
2023-02-14 16:50:06,711 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-14-thread-3) Consumer tag is now amq.ctag-13_qKfB34BUX_thhJvML6w
2023-02-14 16:50:07,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
payload: 98
2023-02-14 16:50:07,344 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:07,351 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-14-thread-4) consumer has been shutdown unexpectedly: amq.ctag-13_qKfB34BUX_thhJvML6w
2023-02-14 16:50:07,351 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (pool-14-thread-4) Start to reconnect...
2023-02-14 16:50:07,351 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (pool-14-thread-4) Stopping rabbitmq client
2023-02-14 16:50:07,353 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnecting from rabbitmq...
2023-02-14 16:50:07,358 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnected from rabbitmq !
2023-02-14 16:50:07,362 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-4) Starting rabbitmq client
2023-02-14 16:50:07,362 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connecting to rabbitmq...
2023-02-14 16:50:07,370 DEBUG [com.rab.cli.imp.ConsumerWorkService] (vert.x-worker-thread-1) Creating executor service with 10 thread(s) for consumer work service
2023-02-14 16:50:07,396 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connected to rabbitmq !
2023-02-14 16:50:07,397 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-4) SRMSG17023: Established exchange `generator`
2023-02-14 16:50:07,398 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-4) SRMSG17025: Established queue `source`
2023-02-14 16:50:07,399 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-4) SRMSG17027: Established binding of queue `source` to exchange 'generator' using routing key '#'
2023-02-14 16:50:07,399 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-4) Successed to restart client. 
2023-02-14 16:50:07,400 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Reconsume queue: source success
2023-02-14 16:50:07,400 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-16-thread-3) Consumer tag is now amq.ctag-wY2dSKXeUxu-RAWm1O8EIQ
2023-02-14 16:50:07,591 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 16:50:08,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
payload: 98
2023-02-14 16:50:08,345 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:08,589 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 16:50:09,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
payload: 98
2023-02-14 16:50:09,351 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:09,357 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-16-thread-5) consumer has been shutdown unexpectedly: amq.ctag-wY2dSKXeUxu-RAWm1O8EIQ
2023-02-14 16:50:09,357 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (pool-16-thread-5) Start to reconnect...
2023-02-14 16:50:09,357 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (pool-16-thread-5) Stopping rabbitmq client
2023-02-14 16:50:09,357 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnecting from rabbitmq...
2023-02-14 16:50:09,371 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnected from rabbitmq !
2023-02-14 16:50:09,371 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-5) Starting rabbitmq client
2023-02-14 16:50:09,371 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connecting to rabbitmq...
2023-02-14 16:50:09,378 DEBUG [com.rab.cli.imp.ConsumerWorkService] (vert.x-worker-thread-1) Creating executor service with 10 thread(s) for consumer work service
2023-02-14 16:50:09,386 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connected to rabbitmq !
2023-02-14 16:50:09,386 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-5) SRMSG17023: Established exchange `generator`
2023-02-14 16:50:09,388 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-5) SRMSG17025: Established queue `source`
2023-02-14 16:50:09,389 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-5) SRMSG17027: Established binding of queue `source` to exchange 'generator' using routing key '#'
2023-02-14 16:50:09,390 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-5) Successed to restart client. 
2023-02-14 16:50:09,391 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Reconsume queue: source success
2023-02-14 16:50:09,391 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-18-thread-3) Consumer tag is now amq.ctag-tMTiqriOqntlVPBZxT_RUA
2023-02-14 16:50:09,589 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 16:50:10,088 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
payload: 99
2023-02-14 16:50:10,358 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:10,364 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-18-thread-4) consumer has been shutdown unexpectedly: amq.ctag-tMTiqriOqntlVPBZxT_RUA
2023-02-14 16:50:10,364 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (pool-18-thread-4) Start to reconnect...
2023-02-14 16:50:10,364 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (pool-18-thread-4) Stopping rabbitmq client
2023-02-14 16:50:10,365 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnecting from rabbitmq...
2023-02-14 16:50:10,379 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnected from rabbitmq !
2023-02-14 16:50:10,379 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-6) Starting rabbitmq client
2023-02-14 16:50:10,379 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connecting to rabbitmq...
2023-02-14 16:50:10,387 DEBUG [com.rab.cli.imp.ConsumerWorkService] (vert.x-worker-thread-1) Creating executor service with 10 thread(s) for consumer work service
2023-02-14 16:50:10,396 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connected to rabbitmq !
2023-02-14 16:50:10,397 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-6) SRMSG17023: Established exchange `generator`
2023-02-14 16:50:10,399 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-6) SRMSG17025: Established queue `source`
2023-02-14 16:50:10,402 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-6) SRMSG17027: Established binding of queue `source` to exchange 'generator' using routing key '#'
2023-02-14 16:50:10,403 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-6) Successed to restart client. 
2023-02-14 16:50:10,405 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Reconsume queue: source success
2023-02-14 16:50:10,405 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-20-thread-3) Consumer tag is now amq.ctag-1gtZmBKUhqlMPbqE4aAZnQ
2023-02-14 16:50:10,588 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 16:50:11,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
payload: 99
2023-02-14 16:50:11,361 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:11,591 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 16:50:12,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
payload: 99
2023-02-14 16:50:12,368 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:12,376 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-20-thread-5) consumer has been shutdown unexpectedly: amq.ctag-1gtZmBKUhqlMPbqE4aAZnQ
2023-02-14 16:50:12,376 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (pool-20-thread-5) Start to reconnect...
2023-02-14 16:50:12,376 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (pool-20-thread-5) Stopping rabbitmq client
2023-02-14 16:50:12,378 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnecting from rabbitmq...
2023-02-14 16:50:12,401 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnected from rabbitmq !
2023-02-14 16:50:12,402 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-7) Starting rabbitmq client
2023-02-14 16:50:12,403 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connecting to rabbitmq...
2023-02-14 16:50:12,410 DEBUG [com.rab.cli.imp.ConsumerWorkService] (vert.x-worker-thread-1) Creating executor service with 10 thread(s) for consumer work service
2023-02-14 16:50:12,418 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connected to rabbitmq !
2023-02-14 16:50:12,419 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-7) SRMSG17023: Established exchange `generator`
2023-02-14 16:50:12,420 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-7) SRMSG17025: Established queue `source`
2023-02-14 16:50:12,421 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-7) SRMSG17027: Established binding of queue `source` to exchange 'generator' using routing key '#'
2023-02-14 16:50:12,421 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-7) Successed to restart client. 
2023-02-14 16:50:12,422 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Reconsume queue: source success
2023-02-14 16:50:12,423 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-22-thread-3) Consumer tag is now amq.ctag-RSPBQVr2f3tAAuaQ2zXTzA
2023-02-14 16:50:12,591 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
2023-02-14 16:50:13,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key 
payload: 100
mjedwabn commented 1 year ago

FYI @vietj

maragud commented 1 year ago

By looking at both the vertx-rabbitmq-client and smallrye-reactive-messaging-rabbitmq libraries, I found that the main issue was with the RabbitMQConsumer.resume() during the consumer restart: https://github.com/vert-x3/vertx-rabbitmq-client/blob/master/src/main/java/io/vertx/rabbitmq/impl/RabbitMQClientImpl.java#L221

From what I understand the RabbitMQConsumer is a ReadStream that is working in flowing mode and not in fetch mode (based on the documentation of the ReadStream), because when the consumer is initialized the resume method is called setting InboundBuffer's demand to Long.MAX_VALUE. When the RabbitMQConnector sets up the Multi the demand is reset to 1, fetching messages using the backpressure, but when the connection is restarted then the demand is changed back to Long.MAX_VALUE, filling up the internal queue of the flat map operation and causing the backpressure failure. The demand is not reset, since this operation is done during the onSubscribe of the flow.

The main issue I see here is that ReadStream supports both the flowing and the fetch mode, but the RabbitMQConsumer is not handling appropriately the fetch mode. I have checked that removing the resume during the restart solves the issue, but this change is in the vertx-rabbitmq-client library and I am not sure it does not affect the flowing mode, since I don't have any examples using that mode.

engineering-dotcms commented 1 year ago

Hi everyone, any news about this topic? We have the same problem introduced when we migrated application from springBoot to quarkus and for simulate the prefetch count we used the "max-outstanding-messages" setting the value to 250.

Should we have a workaround? For example increasing this number... Regards

maragud commented 1 year ago

@engineering-dotcms For the issue with the prefetch count resetting to 0 after restarting the connection, it is a different issue, as someone also pointed out in this ticket. I created a separate ticket here: https://github.com/smallrye/smallrye-reactive-messaging/issues/2084

Dosexe commented 1 year ago

Same problem, joining the waiting list.

flamurjahiri commented 1 year ago

is there any update on this

nejco commented 1 year ago

Same issue here, any updates?

ma3s7r0 commented 1 year ago

Same error here, setting max-outstanding-messages to 1 seems to do the trick, but I would much prefer a solution working also with any other value.

juandiii commented 8 months ago

Same issue here

corujoraphael commented 8 months ago

Same issues here

maloyans commented 1 month ago

Same issues here, any updates ?