antonyvorontsov / RabbitMQ.Client.Core.DependencyInjection

.Net Core library-wrapper of RabbitMQ.Client for Dependency Injection.
MIT License
111 stars 36 forks source link

Consumption event subscription on unwanted queues #3

Closed skashif closed 4 years ago

skashif commented 4 years ago

Hi Anthony,

I wanted to clear my understanding on the following workflow -

Let's say in ServiceX we have an Exchange 'A' which has the following queues q1, q2 & Exchange 'B' which has queue 'q3'. So what we do is declare exchange settings, like this, "ExchangeA": { "Type": "direct", "Durable": true, "AutoDelete": false, "DeadLetterExchange": "exchange.a.dlx", "RequeueFailedMessages": true, "Queues": [ { "Name": "q1", "Durable": true, "AutoDelete": false, "RoutingKeys": [ "q1.routing.key" ], "Arguments": { "queue-mode": "lazy" } }, { "Name": "q2", "Durable": true, "AutoDelete": false, "RoutingKeys": [ "q2.routing.key" ], "Arguments": { "queue-mode": "lazy" } } ] }

"ExchangeB": { "Type": "direct", "Durable": true, "AutoDelete": false, "DeadLetterExchange": "exchange.b.dlx", "RequeueFailedMessages": true, "Queues": [ { "Name": "q3", "Durable": true, "AutoDelete": false, "RoutingKeys": [ "q3.routing.key" ], "Arguments": { "queue-mode": "lazy" } }, ] }

This setting will be used in the QueueService.cs => StartExchange() & StartQueue() to build exchange and queue bindings. Now as a convention stated by Rabbit Mq we declare Exchange and queue bindings in each app service that depends on using the same as a publisher or consumer.

Further in this example, let me state that ServiceX is a publisher publishing messages in q1, q2 and a consumer consuming from queue q3, well in this case what we do is get the IQueueService instance and say .StartConsuming() which brings us to the part where I need some clarification - QueueService.cs => StartConsuming() If we see this method image

You can notice that it will iterate through each exchange [ExchangeA & ExchangeB] and bind consume event to all the queues in the exchange, which effectively means that ServiceX now has become a consumer from q1, q2, q3 which is not intended in this example. This leads to a potential issue of a direct route messaging becoming a worker queue (ServiceX as one of the consumer ) messaging and ServiceX hogs on its own published messages (in q1 or q2) resulting in intended consumer not receiving it.

In my example above please feel free to throw in light, if there is any short coming in my understanding or if you think this is an intended code workflow to follow a convention I am unaware of or if it is a potential issue according to you as well.

Potential fix would be to provide flexibility to the developer to define a setting in RabbitMqQueueOptions to say 'isConsumer' which in turn can be used to filter in StartConsuming function to avoid declaring a service as a consumer on queues that it is not intended to consume. Let me know what you think.

Regards, Kashif

antonyvorontsov commented 4 years ago

Dear Kashif, thank you for asking.

Frankly speaking I have not had a case in which one application uses two exchanges. So I need a little bit of time to investigate and reproduce that.

So I am aware of the problem and will try to give you detailed answer asap. To be more precise if I have enough time I will try to do it next week.

skashif commented 4 years ago

Hi Anthony,

Thank you for replying. Further to your reply, I wanted to add that the example that I took is to stress on the fact that if a service (ServiceX) is publishing in queue A and B and consuming from C (irrespective of single / multiple exchange), we will still run into the problem of ServiceX establishing consumption event subscription on queue A and queue B, which is not intended. In this example we would have been fine if Service X did not have to consume from queue C because that would mean that you will not invoke StartConsuming() method. I would suggest that for reproducing you can have two service X & Y have a single direct exchange E1 and have 3 queues in it. Make X publish in q1 and q2 and make Y consume from q1 and q2. So far you will not have to use StartConsuming() method in Service X as it is only publishing. Publish messages in q1 and q2, everything will be fine and you will get the messages in the q1 and q2 message handlers of Service Y.

Generating the issue - Now make Service X consume from q3, so use startConsuming() method. Start Service X before service Y (important because X will be a priority consumer from q1 and q2) Now, start service Y as well.

In the Rabbit admin page for q1 and q2 you will notice that consumer count is showing as 2, this is an indication of the error but it gets worse when you publish and notice that messages are never reaching message handlers of q1 and q2 that you have defined in Service Y, and if you notice the logs you will find that you getting delivery ack as well (Service X is sending you those).

Let me know if this makes sense and if it helps in reproducing the issue.

Regards, Kashif

antonyvorontsov commented 4 years ago

Hello, Kashif @skashif . I apologize again for the long answers. Unfortunately, I didn't have much free time to deal with the problem, but I finally got around to solving it.

I analyzed several solutions and decided that the best way with the least changes would be to add an option for exchanges that would control their behavior on message consumption. So i ended up with couple new extension methods and one change for AddExchange method.

So for now if you want to add an exchange that only for message production then use method AddProductionExchange like this

    services.AddRabbitMqClient(rabbitMqSection)
        .AddProductionExchange("exchange.A", Configuration.GetSection("ExchangeSectionA"));

This is equivalent to:

    services.AddRabbitMqClient(rabbitMqSection)
        .AddExchange("exchange.A", isConsuming: false, Configuration.GetSection("ExchangeSectionA"));

But if you want an exchange that will do both producing and consuming then use AddConsumptionExchange. The usage is the same

    services.AddRabbitMqClient(rabbitMqSection)
        .AddConsumptionExchange("exchange.A", Configuration.GetSection("ExchangeSectionA"));

So this is equivalent to:

    services.AddRabbitMqClient(rabbitMqSection)
        .AddExchange("exchange.A", isConsuming: true, Configuration.GetSection("ExchangeSectionA"));

I found that adding new methods is more consistent for fluent API style which this library trying to follow. Also i changed a readme file a bit to cover those changes.


And the main change inside the QueueService is

var consumptionExchanges = _exchanges.Where(x => x.IsConsuming);
foreach (var exchange in consumptionExchanges)
{
    foreach (var queue in exchange.Options.Queues)
    {
         _channel.BasicConsume(queue: queue.Name, autoAck: false, consumer: _consumer);
    }
}

All the changes are presented in 3.0.2 version of the library.

To sum up I can say that you won't get any unwanted messages anymore. At least I hope to. Give it a try and tell me if it is working or not.

Regards, Antony

skashif commented 4 years ago

Hi Anthony,

Thanking you for taking time to address the issue. In theory the above changes looks promising to work but unfortunately I will not be able to test it with my current solution because of the lib v3.0.2 prerequisite of .net core >v3 and my solution is currently on .net core v2.2. We do not plan to use .net core v3 or higher for production use yet.

Please let me know if you would be pushing changes to lower version of your library to provide backward compatibility for .net core apps with v^2 or lower.

Thanks & Regards, Anthony.

antonyvorontsov commented 4 years ago

Hi, Kashif,

I made another version of the library compatible with .Net Core 2.2. Since I switched to the versioning model which follows version on the platform that library patch has the same version v2.2.0. So you can finally try it!

Feel free to leave a review on the lib, I am not gonna close this issue yet.

Regards, Antony

skashif commented 4 years ago

Hi Anthony,

Thank you for your reply, I will definitely try this and get back to you.

In the mean time, I have a different question if you don't mind answering it - What do you think is a good approach to handle publishing of message in an offline scenario, i.e., if the rabbit mq server is down then channel .publish is going to throw an exception which means that we loose the message we trying to publish. Please let me know if you have taken care of such offline scenarios to log your messages and republish when the connection is recovered.

Regards.

skashif commented 4 years ago

Hi Anthony,

I can confirm that the changes are working well. Thank you for giving time to address the issue.

Regards.

antonyvorontsov commented 4 years ago

Hello Kashif,

As for your question about keeping your application stable when RabbitMQ shuts down, I actually do not know a recipe for this. This question is rather polemical coz there are a lot of different hacks you can use but neither of them is unique solution. It also varies of how you use RabbitMQ.

So the base idea is to store all messages somewhere when RabbitMQ shuts down and then re-send them after Rabbit starts running again. And that somewhere can be anything: files, cache, database, another "backup" queueing system and etc. etc.

With files you can get in trouble with file locks on writing. Cache (Memcache, Redis cache or whatever) is kind of a hack because it designed for different problems. Database is okay but you have to manage database connections properly (I assume it has to be in scoped mode, not transient) otherwise you can exceed the connection limit and lose messages anyway. Another "backup" queueing system seems like an overhead (you can just use RabbitMQ clustering).

I would have personally made a thing with database (with single table with columns [exchange; routing key; message]) and some sort of IHostedService which can check whether RabbitMQ is down or not and resend messages from the database easily. But again this solution is not ideal at all.

Feel free to tell me if you make any of this hacks by sending me an email which is showing in my profile.

And thank you for the feedback. I will close this issue for now.

Regards, Antony