pardahlman / RawRabbit

A modern .NET framework for communication over RabbitMq
MIT License
747 stars 144 forks source link

Message sequences fail after a while #194

Closed videege closed 7 years ago

videege commented 7 years ago

I have an ASP.NET Core application that initiates many different message sequences (running RawRabbit 1.10.3). Everything works fine most of the time, but occasionally (especially after a day or so of uptime) I will start seeing message sequences timing out. Inspecting my logs reveals something like this:

[14:48:26] [$80] [DEBUG] [MessageSequenceBuilder`1]: Preparing Message Sequence for '6911df0f-39ae-4a32-a176-1a2b6567100c' that starts with QueryById.
[14:48:26] [$80] [DEBUG] [MessageSequenceBuilder`1]: Message Sequence for '6911df0f-39ae-4a32-a176-1a2b6567100c' completes with 'CollectionDataResponse`1'.
[14:48:26] [$80] [DEBUG] [TopologyProvider]: Start processing topology work.
[14:48:26] [$80] [INFO] [TopologyProvider]: Declaring exchange 'messages.responses'.
[14:48:26] [$80] [DEBUG] [ChannelFactory]: Existing connection is open and will be used.
[14:48:26] [$80] [INFO] [TopologyProvider]: Binding queue 'rawrabbit_chain_43d652ae-7822-49c9-8870-c111515fd2c2' to exchange 'messages.responses' with routing key 'collectiondataresponse[itemdto].6911df0f-39ae-4a32-a176-1a2b6567100c'
[14:48:26] [$80] [ERROR] [TopologyProvider]: Unable to bind queue, Exception:
 RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'rawrabbit_chain_43d652ae-7822-49c9-8870-c111515fd2c2' in vhost '/'", classId=50, methodId=20, cause=
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
   at RabbitMQ.Client.Framing.Impl.Model._Private_QueueBind(String queue, String exchange, String routingKey, Boolean nowait, IDictionary`2 arguments)
   at RabbitMQ.Client.Impl.AutorecoveringModel.QueueBind(String queue, String exchange, String routingKey, IDictionary`2 arguments)
   at RawRabbit.Common.TopologyProvider.BindQueueToExchange(ScheduledBindQueueTask bind)
   at RawRabbit.Common.TopologyProvider.EnsureWorker()

Is there a way to recover from this situation?

pardahlman commented 7 years ago

Perhaps... what version of the RabbitMQ broker are you running?

videege commented 7 years ago

I am running a single instance of RMQ 3.6.6. Here's my topology settings for RawRabbit:

      "RequestTimeout": "00:00:15",
      "PublishConfirmTimeout": "00:00:01",
      "RecoveryInterval": "00:00:10",
      "PersistentDeliveryMode": true,
      "AutoCloseConnection": true,
      "AutomaticRecovery": true,
      "TopologyRecovery": true,
      "Exchange": {
        "Durable": true,
        "AutoDelete": true,
        "Type": "Topic"
      },
      "Queue": {
        "AutoDelete": true,
        "Durable": true,
        "Exclusive": false
      }
pardahlman commented 7 years ago

OK, thank you for this. I was wondering, as I got bitten by a nasty bug in the broker that could have explained what you're seeing (https://github.com/rabbitmq/rabbitmq-server/issues/953).

I wonder, though... looking at your configuration, I see that you set AutoDelete to true. Each instance of the bus client uses the same queue for message sequnce (in your example it's rawrabbit_chain_43d652ae-7822-49c9-8870-c111515fd2c2). I wonder if the previous sequence completes and the consumer is removed from the queue, which will lead to the queue being deleted, which might happen when another execution has verified that the queue exists.

If there is not too much hustle, it would be interesting to see if you get the same problem if you set AutoDelete to false.

pardahlman commented 7 years ago

(as a side note, queue mgmt for sequences are updated in 2.0. I've noticed that there are corner cases where sharing queue isn't optimal)

videege commented 7 years ago

I'll try setting AutoDelete to false - thanks for the advice. We'll definitely be looking into 2.0 when you release. Thanks again for all the work you've put into this library!

pardahlman commented 7 years ago

Hello @videege - any success with the proposed approach?

videege commented 7 years ago

A bit - the system seems more stable now but we are still occasionally running into this error. Do I need to set AutoDelete to false on the Exchange settings as well as the queue settings?

pardahlman commented 7 years ago

Perhaps, or in fact likely if the root of the problem is what we are expecting here. In the logs that you posted earlier the error message indicated that a queue didn't exists. I wonder if the messages you get now complains about an exchange that does not exist?

videege commented 7 years ago

OK, I think maybe I have a lead on what's happening. Changing the settings to AutoDelete=false have not corrected the issue.

I noticed that even though I set queues to not have AutoDelete, the queues created for message sequences have AutoDelete set to true. The process that is initiating these sequences is an ASP.NET web application running in a Docker container. Sometimes the connection is unstable (for whatever reason - maybe a node in my swarm goes down) and the RMQ client will disconnect. The connection recovers after ~20 seconds, but I wonder, is the queue created for message sequences automatically deleted at this point?

It seems like the web project still assumes that the message sequence queue exists, but at some point this queue gets dropped and then RawRabbit cannot recover from this topology problem.

Hopefully I'm on the right track here - if I am, can you point me to where I might write an extension that can create a new queue when this problem gets detected?

pardahlman commented 7 years ago

Alright - nice work!

I'm not 100% how RMQ behaves if a queue is marked with AutoDelete and the only consumer on that queue disconnects. It should be fairly easy to setup a small project with a single consumer and disrupt the connection.

To verify you theory, you could implement your own IMessageChainTopologyUtil (heres the default: MessageChainTopologyUtil).

A queue is created on InitializeConsumer and then is assumed to exist when binding and unbinding queues. The class is not built to be extended... no virtual methods - sorry! What you could do is copy the class all together and then update BindToExchange so that it declares the queue each time:

public async Task BindToExchange(Type messageType, Guid globalMessaegId)
{
    await _topologyProvider.DeclareQueueAsync(_queueConfig); // add this line
    var chainConfig = _configEvaluator.GetConfiguration(messageType);
    await _topologyProvider.BindQueueAsync(
        _queueConfig,
        chainConfig.Exchange,
        $"{chainConfig.RoutingKey}.{globalMessaegId}"
    );
}

It would also make sense to add some more logging here to see that the queue is correctly declared

I'm wondering, though... if the queue was previously declared, it is likely that it also was bound to an exchange with a specific routing key. That binding will be lost if the queue is removed. Declaring the queue again might remove the issue you've run in to, but you might have other problems. Hopefully not, though!

pardahlman commented 7 years ago

Btw, how do you invoke the message sequence? Do you use the optional globalMessageId argument on publish? (.PublishAsync<BasicMessage>(msg, Guid.NewGuid())>?

videege commented 7 years ago

We weren't using the optional argument but we are now. I think the change you suggested (implementing a custom MessageChainTopologyUtil) did the trick. I can see that connection failures still happen, but the message sequence queue gets recreated and everything keeps on working.

Thanks for your help on this issue.