reactor / reactor-rabbitmq

Reactor RabbitMQ
Apache License 2.0
157 stars 55 forks source link

RPCs are executed one at a time, blocking on a mutex #174

Open chibenwa opened 2 years ago

chibenwa commented 2 years ago

Summary

https://github.com/apache/james-project/pull/1003

Calling Sender::unbind does wait a mutex release if a RPC is currently executed on the current channel, and waits this RPC to complete before submitting a new one.

This leads to a blocking behaviour upon "massive" unbinds.

Expected Behavior

I would expect to not get such blocking behaviours in a reactor-* project and would expect a reactive driver not to force me to put subscribeOn(boundedElastic()) everywhere.

At the very least we could have a separate method in ChannelPool to get a channel not executing RPCs thus enabling smart implementation to get channels always in a state to directly submit RPCs, open new channels if needed or waits without blocking threads if relevant.

Actual Behavior

See this flame graph taken on the Netty event loop:

Screenshot from 2022-05-17 18-07-46

Context: a CTRL+C in a perf test lead to 10.000 IMAP connections being closed at the same time, cleaning up

Steps to Reproduce

        @Test
        void blockingRPCs(DockerRabbitMQ rabbitMQ) throws Exception {
            BlockHound.install();
            SenderOptions senderOptions =  new SenderOptions()
                .connectionFactory(connectionFactory)
                .resourceManagementScheduler(Schedulers.boundedElastic());

            final Sender sender = new Sender(senderOptions);

            sender.declare(QueueSpecification.queue().name("queue")).block();
            sender.declare(ExchangeSpecification.exchange().name("ex")).block();

            Flux.range(0, 100)
                .parallel()
                .flatMap(i -> {
                    return sender.bind(BindingSpecification.binding().exchange("ex").queue("queue").routingKey("" + i))
                        .then(sender.unbind(BindingSpecification.binding().exchange("ex").queue("queue").routingKey("routing" + i)));
                })
                .then()
                .subscribeOn(Schedulers.parallel())
                .block();
        }

Fails with

reactor.core.Exceptions$ReactiveException: reactor.blockhound.BlockingOperationError: Blocking call! java.net.SocketOutputStream#socketWrite0
[...]
Caused by: reactor.blockhound.BlockingOperationError: Blocking call! java.net.SocketOutputStream#socketWrite0
    at java.base/java.net.SocketOutputStream.socketWrite0(SocketOutputStream.java)
    at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
    at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
    at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
    at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142)
    at java.base/java.io.DataOutputStream.flush(DataOutputStream.java:123)
    at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:197)
    at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:636)
    at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:134)
    at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:455)
    at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:434)
    at com.rabbitmq.client.impl.AMQChannel.quiescingAsyncRpc(AMQChannel.java:369)
    at com.rabbitmq.client.impl.AMQChannel.asyncRpc(AMQChannel.java:360)
    at com.rabbitmq.client.impl.AMQChannel.privateAsyncRpc(AMQChannel.java:320)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingAsyncRpc(AMQChannel.java:155)
    at com.rabbitmq.client.impl.ChannelN.asyncCompletableRpc(ChannelN.java:1580)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.asyncCompletableRpc(AutorecoveringChannel.java:931)
    at reactor.rabbitmq.ChannelProxy.asyncCompletableRpc(ChannelProxy.java:556)
    at reactor.rabbitmq.Sender.lambda$bind$28(Sender.java:691)
[...]

Your Environment

acogoluegnes commented 2 years ago

The underlying Java client uses blocking IO by default. You can switch to NIO with ConnectionFactory#useNio() and this should make the Blockhound failure from above disappear. See the documentation for more information.

The Java client still has to use some locking to serialize RPC calls like unbind as the AMQP 0.9.1 protocol does not use a correlation ID for requests, so there can be only one RPC call at a time on a channel in practice (this is what most client libraries do AFAIK).

You're welcome to investigate further with NIO enabled to see where things could be made more optimized. The one-RPC-call-at-a-time limitation will always be there but there may be ways to mitigate the issue.

chibenwa commented 2 years ago

You're welcome to investigate further with NIO enabled to see where things could be made more optimized.

Fair. Modifying the code exemple to:

        @Test
        void blockingRPCs(DockerRabbitMQ rabbitMQ) throws Exception {
            BlockHound.install();
            final ConnectionFactory connectionFactory = rabbitMQ.connectionFactory();
            connectionFactory.useNio();
            SenderOptions senderOptions =  new SenderOptions()
                .connectionFactory(connectionFactory)
                .resourceManagementScheduler(Schedulers.boundedElastic());

            final Sender sender = new Sender(senderOptions);

            sender.declare(QueueSpecification.queue().name("queue")).block();
            sender.declare(ExchangeSpecification.exchange().name("ex")).block();

            Flux.range(0, 100)
                .parallel()
                .flatMap(i -> {
                    return sender.bind(BindingSpecification.binding().exchange("ex").queue("queue").routingKey("routing" + i))
                        .then(sender.unbind(BindingSpecification.binding().exchange("ex").queue("queue").routingKey("routing" + i)));
                })
                .then()
                .subscribeOn(Schedulers.parallel())
                .block();
        }

Turns into the expected stacktrace:

reactor.core.Exceptions$ReactiveException: reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Object#wait

    at reactor.core.Exceptions.propagate(Exceptions.java:392)
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:97)
    at reactor.core.publisher.Mono.block(Mono.java:1707)
    at org.apache.james.backends.rabbitmq.RabbitMQTest$FourConnections.blockingRPCs(RabbitMQTest.java:219)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
Caused by: reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Object#wait
    at java.base/java.lang.Object.wait(Object.java)
    at java.base/java.lang.Object.wait(Object.java:328)
    at com.rabbitmq.client.impl.AMQChannel.doEnqueueRpc(AMQChannel.java:220)
    at com.rabbitmq.client.impl.AMQChannel.enqueueAsyncRpc(AMQChannel.java:212)
    at com.rabbitmq.client.impl.AMQChannel.quiescingAsyncRpc(AMQChannel.java:368)
    at com.rabbitmq.client.impl.AMQChannel.asyncRpc(AMQChannel.java:360)
    at com.rabbitmq.client.impl.AMQChannel.privateAsyncRpc(AMQChannel.java:320)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingAsyncRpc(AMQChannel.java:155)

See the documentation for more information.

The documentation do not mention nio will prevent some blocking calls and merely states:

Use the NIO mode if your Java process uses many connections (dozens or hundreds). You should use fewer threads than with the default blocking mode. With the appropriate number of threads set, you shouldn't experience any decrease in performance, especially if the connections are not so busy.

Which, running a single connection do not look appealing to me!

I suggest updating the documentation and clerly mention the costs of not using NIO regarding blocking call...

The one-RPC-call-at-a-time limitation will always be there but there may be ways to mitigate the issue.

Yes, I propose above refining the channel pool API.

Using another dedicated channel pool upon bind, unbind, where return of the pools to the channels are piggy backed on RPC completion could work too.

By the way, at the very least this limitation needs to be documented.