reactor / reactor-rabbitmq

Reactor RabbitMQ
Apache License 2.0
157 stars 55 forks source link

receiver.consumeManualAck does not seem compatible with take(int) reactor operator #176

Open chibenwa opened 2 years ago

chibenwa commented 2 years ago

As part of the Apache James project, in some of our tests we want to dequeue only some items from a rabbitMQ queue while still keeping the others enqueue.

In order to achieve this, our first approach was to use the take(int) operator.

This proves to be unstable.

Expected Behavior

I expect to be able to use the take operator out of reactor-rabbitmq library primitives.

I expect violations of the TCK to be well documented when not achievable with a list of impcted oerators, if this issue cannot be fixed.

Actual Behavior

take operator is buggy, and this limitation is undocumented.

Steps to Reproduce

        @Test
        void consumingShouldSuccessWhenAckConcurrentWithFluxTakeAndFlatMap() throws Exception {
            ReceiverProvider receiverProvider = rabbitMQExtension.getReceiverProvider();
            int counter = 5;
            CountDownLatch countDownLatch = new CountDownLatch(counter);

            Flux.using(receiverProvider::createReceiver,
                    receiver -> receiver.consumeManualAck(QUEUE_NAME_1, new ConsumeOptions()),
                    Receiver::close)
                .filter(getResponse -> getResponse.getBody() != null)
                .take(counter)
                .flatMap(acknowledgableDelivery -> Mono.fromCallable(() -> {
                    acknowledgableDelivery.ack(true);
                    countDownLatch.countDown();
                    return acknowledgableDelivery;
                }).subscribeOn(Schedulers.elastic()))
                .subscribe();

            assertThat(countDownLatch.await(10, TimeUnit.SECONDS)).isTrue();
        }

=> Sometime fails by ignoring some elements...

Also fails with contatMap.

        @Test
        void consumingShouldSuccessWhenAckConcurrentWithFluxTake() throws Exception {
            ReceiverProvider receiverProvider = rabbitMQExtension.getReceiverProvider();
            int counter = 5;
            CountDownLatch countDownLatch = new CountDownLatch(counter);

            Flux.using(receiverProvider::createReceiver,
                    receiver -> receiver.consumeManualAck(QUEUE_NAME_1, new ConsumeOptions()),
                    Receiver::close)
                .filter(getResponse -> getResponse.getBody() != null)
                .take(counter)
                .concatMap(acknowledgableDelivery -> Mono.fromCallable(() -> {
                    acknowledgableDelivery.ack(true);
                    countDownLatch.countDown();
                    System.out.println(Thread.currentThread().getName() + ": " + countDownLatch.getCount());
                    return acknowledgableDelivery;
                }).subscribeOn(Schedulers.elastic()))
                .subscribe();

            assertThat(countDownLatch.await(10, TimeUnit.SECONDS)).isTrue();
        }

=> Fails reliably with:

09:03:46.066 [ERROR] r.c.p.Operators - Operator called default onErrorDropped
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
    at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:421)
    at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:93)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:439)
    at reactor.rabbitmq.AcknowledgableDelivery.basicAck(AcknowledgableDelivery.java:110)
    at reactor.rabbitmq.AcknowledgableDelivery.ack(AcknowledgableDelivery.java:62)
    ... 10 common frames omitted
Wrapped by: reactor.rabbitmq.RabbitFluxException: Not retryable exception, cannot retry
    at reactor.rabbitmq.ExceptionHandlers$SimpleRetryTemplate.retry(ExceptionHandlers.java:125)
    at reactor.rabbitmq.ExceptionHandlers$RetryAcknowledgmentExceptionHandler.accept(ExceptionHandlers.java:143)
    at reactor.rabbitmq.ExceptionHandlers$RetryAcknowledgmentExceptionHandler.accept(ExceptionHandlers.java:130)
    at reactor.rabbitmq.AcknowledgableDelivery.retry(AcknowledgableDelivery.java:130)
    at reactor.rabbitmq.AcknowledgableDelivery.ack(AcknowledgableDelivery.java:64)
    at org.apache.james.backends.rabbitmq.RabbitMQTest$ConcurrencyTest.lambda$consumingShouldSuccessWhenAckConcurrentWithFluxTake$7(RabbitMQTest.java:665)
    at reactor.core.publisher.MonoCallable.call(MonoCallable.java:92)
    at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:227)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

Possible Solution

No idea here. We complexified our testing code to leverage this situation...

Your Environment

Reactor versions: (BOM) 2020.0.19 JVM 11 OS Ubuntu 2020.4