EasyNetQ / EasyNetQ

An easy to use .NET API for RabbitMQ
http://easynetq.com
MIT License
2.9k stars 751 forks source link

Exchanges are not re-declared after a connection failure #560

Open micdenny opened 8 years ago

micdenny commented 8 years ago

This issue was fired by @axelbodo and this is the chat history:

from @axelbodo

We use easynetq in a multinode cluster, and with queue durability set to false, because of a known issue, when the node owning a durable queue fails. In this case If we could use autorecovery and topology recovery inthe client itself, the remaining nodes can grab the queues from the node went off, but easynetq recovery results in multiple reconnection. If we use only easynetq reconnection logic, it won't be able to recover the queues itself. Would it be possible to implement the recovery in the easynetq as well to avoid this problem?

from @micdenny

@axelbodo you mean this problem (Automatic) deletion of an auto-delete queue could lead to blocked channels (resolved in 3.6.1) described in the issue #581?

from @axelbodo

Not exactly, but has partial intersection. Our problem is that we should use queue durability set to false. In this case when a node's going down, rabbitmq.net driver is able to recover queues on another node (in case of queue durability set to true it's not the case, as cluster remembers that queue is owned by another node durably). Easynetq will also make a connection recovery, and subscriptions recovery, now on the existing queues. What is the problem in this case is the subscriptions count duplication, and we guess half of them are zombies, as they are not wired into easynetq. The second case, when we turn off auto recovery and topology recovery (which in this case is meaningless), easynetq recovers the connection, and tries to recover subscriptions, but without declaring queues, bindings and exchanges. In our cluster ha (as I know meaning that all queues messages are multiplicated in the cluster) is turned off, as it will impact the system performance very much, causing high amount of mnesia operations.

micdenny commented 8 years ago

@axelbodo I'm moving the conversation in a separated issue

micdenny commented 8 years ago

At the moment easynetq does not re-declare exchanges when PUBLISH because is using an internal cache mechanism to avoid re-declaring exchange every call (see PublishExchangeDeclareStrategy). As far as I understood your use-case is not a common approach, often a cluster is in HA configuration if you want to achieve automatic fail-overing, anyway I think I got your point, with queue durability set to false you could have start to use the secondary node if ENQ re-declares everything and it does not.

What you can do right now is replacing the actual implementation of IPublishExchangeDeclareStrategy with your implementation that does not use the internal cache, that would be something like:

using System;
using System.Threading.Tasks;
using EasyNetQ.Topology;

namespace EasyNetQ.Producer
{
    public class MyPublishExchangeDeclareStrategy : IPublishExchangeDeclareStrategy
    {
        public IExchange DeclareExchange(IAdvancedBus advancedBus, string exchangeName, string exchangeType)
        {
            var exchange = advancedBus.ExchangeDeclare(exchangeName, exchangeType);
            return exchange;
        }

        public IExchange DeclareExchange(IAdvancedBus advancedBus, Type messageType, string exchangeType)
        {
            var conventions = advancedBus.Container.Resolve<IConventions>();
            var exchangeName = conventions.ExchangeNamingConvention(messageType);
            return DeclareExchange(advancedBus, exchangeName, exchangeType);
        }

        public async Task<IExchange> DeclareExchangeAsync(IAdvancedBus advancedBus, string exchangeName, string exchangeType)
        {
            var exchange = await advancedBus.ExchangeDeclareAsync(exchangeName, exchangeType).ConfigureAwait(false);
            return exchange;
        }

        public Task<IExchange> DeclareExchangeAsync(IAdvancedBus advancedBus, Type messageType, string exchangeType)
        {
            var conventions = advancedBus.Container.Resolve<IConventions>();
            var exchangeName = conventions.ExchangeNamingConvention(messageType);
            return DeclareExchangeAsync(advancedBus, exchangeName, exchangeType);
        }
    }
}

then you can substitute the default implementation with your (https://github.com/EasyNetQ/EasyNetQ/wiki/Replacing-EasyNetQ-Components):

var bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IPublishExchangeDeclareStrategy>(s => new MyPublishExchangeDeclareStrategy()));
axelbodo commented 8 years ago

Thank you for advice, we'll try the workaround and let you know what the result is. However it's for exchanges only, and we use rpc style messaging as well. The workaround solves exchange recovery on producer side, and we need also queue recovery on consumer side, however the example gives an idea how to do that.

micdenny commented 8 years ago

on consumer side you need to check the re-connecting process, it is re-issued the consume command but not the re-declare, and probably we can add it if it fits in the re-connection process, for instance we can't put it in the consumer cancellation that rabbitmq can send when for example you explicitly delete a queue, in that case the consumer just stop and you have to restart the application, in this case I think we can't do so much if we don't want to break the actual behavior, but for the re-connection process you can try a PR.

axelbodo commented 8 years ago

Yes, we also think so, to capture it during connection recovery process, and make queue recovery just in that case, and maybe just for nondurable queues.

axelbodo commented 8 years ago

For rpc request queues, which are not exclusive, I think the best approach is to push down queue declaration into PersistentConsumer.StartConsuming, by propagating the queue declaration properties in Queue class instance, for it is the responsible for subscription, so if it subscribe anyway, it is good to ensure the queue exists for which it subscribes, and declaration is idempotent with the same properties. What do you think Michael, is it a good approach?

For rpc response queues the situation a little bit harder, as they use TransientConsumer, and SubscribeToResponse will store it as a created queue. I think in this case (and any other fire and forget like exclusive subsciptions, e.g. fanout event subscription implementations), we can inform ConsumerFactory, that we need to recreate the consumer after connection reset, so it can use PersistentCosnumer as well in this case.

In any other case, when the connection/consumer is closed intentionally the teardown process uses (should use) the Dispose path, which won't rerun the connection/consumer reestablishment.

I'd like to ask for your opinion if it fits into your plan of re-connection process, and if it is ok, we go with this approach until easynetq team will implement its own solution, if they plan so.

micdenny commented 8 years ago

For rpc request queues, which are not exclusive, I think the best approach is to push down queue declaration into PersistentConsumer.StartConsuming, by propagating the queue declaration properties in Queue class instance, for it is the responsible for subscription, so if it subscribe anyway, it is good to ensure the queue exists for which it subscribes, and declaration is idempotent with the same properties. What do you think Michael, is it a good approach?

at the moment I don't see any other point, because an IConsumer is responsible of eventually re-connect like it does the PersistentConsumer, so there's no other point where we can add the re-declaration of the queue.

For rpc response

I need more time to focus on this...

In any other case, when the connection/consumer is closed intentionally the teardown process uses (should use) the Dispose path, which won't rerun the connection/consumer reestablishment.

this is how it behaves now

I'd like to ask for your opinion if it fits into your plan of re-connection process, and if it is ok, we go with this approach until easynetq team will implement its own solution, if they plan so.

ENQ team is not a 100% committed team, so it's best you try out a PR, and will be take in consideration for sure.

Even if I'm not sure on the rpc, if you have an idea maybe now it's better we move to discuss on code, so you can start a PR (WIP/DRAFT) even if it's not complete, to best understand your intention.

axelbodo commented 8 years ago

Ok, thank you for your answer, I think we'll do it (PR) as we develop the fix for our issue and test it.

axelbodo commented 8 years ago

We make endurance test with the fixes in our project, and the fix looks promising. I'll try to make a PR on this week with the modifications we did, As for rpc response queues it was not necessary to make any changes, as they are exclusive, so the queues are created on the same node, where the client is connected to, so in case of connection loss all responsequeues are disposed i RPC.cs.

axelbodo commented 8 years ago

I've created a PR, with a DRAFT Code to discuss, it went through an endurance test in an earlier ENQ version.