antonyvorontsov / RabbitMQ.Client.Core.DependencyInjection

.Net Core library-wrapper of RabbitMQ.Client for Dependency Injection.
MIT License
111 stars 36 forks source link

Consumer stops consuming when all messages on queue are received - I need it to wait for new messages #84

Closed rabbers closed 3 years ago

rabbers commented 3 years ago

Hi,

I have an app which both consumes and produces. I would ideally like to do that using the same exchange for both producing and consuming, but that's a question for another day. At present, I have a program that works well, message is received, I Ack the message as soon as I have something to return, and if there are a number of messages in the queue, then each message is received, processed and replied to as expected.

The problem begins when the queued messages end. Basically the Consumer process stops receiving messages. Since this program is intended to reply to messages as they arrive, I would expect the loop to keep waiting for new messages, and ideally reconnect if the connection should collapse. Here's my setup:

            services.AddRabbitMqClient(new RabbitMQDI.Configuration.RabbitMqClientOptions()
            {
                HostName = Environment.GetEnvironmentVariable("RABBITMQ_HOSTNAME"),
                Port = int.Parse(Environment.GetEnvironmentVariable("RABBITMQ_PORT")),
                UserName = Environment.GetEnvironmentVariable("RABBITMQ_USERNAME"),
                Password = Environment.GetEnvironmentVariable("RABBITMQ_PASSWORD"),
                VirtualHost = Environment.GetEnvironmentVariable("RABBITMQ_VHOST")
            })
            .AddProductionExchange(System.Environment.GetEnvironmentVariable("RABBITMQ_EXCHANGEOUT"), new RabbitMQDI.Configuration.RabbitMqExchangeOptions()
            {
                AutoDelete = false,
                Durable = true,
                Type = "topic",
                Queues = new List<RabbitMQDI.Configuration.RabbitMqQueueOptions>() { 
                new RabbitMQDI.Configuration.RabbitMqQueueOptions()
                {
                    AutoDelete = false,
                    Durable = true,
                    Name = System.Environment.GetEnvironmentVariable("RABBITMQ_QUEUEOUT"),
                    RoutingKeys = new HashSet<string>() { System.Environment.GetEnvironmentVariable("RABBITMQ_ROUTINGKEYOUT") }
                }
                }
            })
            .AddConsumptionExchange(System.Environment.GetEnvironmentVariable("RABBITMQ_EXCHANGE"), new RabbitMQDI.Configuration.RabbitMqExchangeOptions()
            {
                AutoDelete = false,
                Durable = true,
                Type = "topic",
                Queues = new List<RabbitMQDI.Configuration.RabbitMqQueueOptions>() {
                    new RabbitMQDI.Configuration.RabbitMqQueueOptions()
                    {
                        AutoDelete = false,
                        Durable = true,
                        Name = System.Environment.GetEnvironmentVariable("RABBITMQ_QUEUE"),
                        RoutingKeys = new HashSet<string>() { System.Environment.GetEnvironmentVariable("RABBITMQ_ROUTINGKEY") }
                    }
                }
            })
            .AddAsyncNonCyclicMessageHandlerSingleton<EventController>(System.Environment.GetEnvironmentVariable("RABBITMQ_ROUTINGKEY"));

            services.AddHostedService<ConsumingHostedService>();

What am I doing wrong? how would I be able to keep the consuming loop going? Is it an option?

FYI, my message handler is of type IAsyncNonCyclicMessageHandler. I had presumed that Non Cyclic meant that the consumer didn't go back to it's initial state, but implementing with a IAsyncMessageHandler instead didn't work either.

Any help greatly appreciated.

rabbers commented 3 years ago

Whilst looking at the code for this library, I noticed that the ConsumerService has an "AckAction". This defaults to a BasicAck. Since I also coded a BasicAck directly, there were two Acks. For some reason this caused the connector to not listen anymore.

On removing my Ack, the library is now consistently listening. Odd behaviour.

For my testing, I'll stick with the immediate Ack, but really I want my Ack as late as possible. How would I change the AckAction to do nothing?

antonyvorontsov commented 3 years ago

Hi Mark!

As I can see from the provided code snippet you are using 4.3.0 version of the library. Am I right? AckAction have just appeared in the betas of 5.0.0. There is no stable 5.0.0 version due to lack of documentation, I have not finished it yet due to lack of time.

So first thing to say, if you are trying to loop through the source code you should select 4.3.0 tag to get an actual state of that version of the library.

On removing my Ack, the library is now consistently listening. Odd behaviour.

This looks strange. I will definitely try to cover this by unit tests.

How would I change the AckAction to do nothing?

You can't do that using current implementation. But I can make an adjustment in code. I guess in a couple of days. But you will have to upgrade to a newer version of the library, because new functionality will be added there. There is nothing wrong with 5.0.0 betas except the lack of documentation as I've said already.

rabbers commented 3 years ago

V4.30 does a BasicAck in the message handler, so it still dies at the end of messages if a message is acknowledged twice. It's not an issue now that I know what causes it. Thanks for a great library.

antonyvorontsov commented 3 years ago

@rabbers Just wait a couple of days for a pull request. There is no need to close this issue 😃

antonyvorontsov commented 3 years ago

https://github.com/AntonyVorontsov/RabbitMQ.Client.Core.DependencyInjection/pull/85

And I'll try to investigate my time in the problem when consumer stops.