rabbitmq / rabbitmq-dotnet-client

RabbitMQ .NET client for .NET Standard 2.0+ and .NET 4.6.2+
https://www.rabbitmq.com/dotnet.html
Other
2.07k stars 579 forks source link

Autorecovery for server-named queues loops indefinitely when consumer listen this queue #1238

Closed stukselbax closed 1 year ago

stukselbax commented 2 years ago

When using autorecovery connection which listens the server-named autodelete queue, and the process of connection autorecovery starts, it will never ends.

RabbitMQ.Client version: 6.4.0

Here minimalistic console application where problem can be reproduced

CODE

```c# namespace RabbitMQTest { using System.Globalization; using RabbitMQ.Client; using RabbitMQ.Client.Logging; internal class Program { static void Main() { var consoleLogger = new RabbitMqConsoleEventListener(); var resolver = new DefaultEndpointResolver(new[] { new AmqpTcpEndpoint("localhost") }); var connectionFactory = new ConnectionFactory { UserName = "guest", Password = "guest", Port = 5672, DispatchConsumersAsync = true, AutomaticRecoveryEnabled = true, TopologyRecoveryEnabled = true, HostName = "localhost", EndpointResolverFactory = endpoints => { return resolver; }, UseBackgroundThreadsForIO = true, }; var connection = connectionFactory.CreateConnection(); var channel = connection.CreateModel(); // use "" for server named queue var queueDeclareOk = channel.QueueDeclare("", durable: false, exclusive: true, autoDelete: true, null); Console.WriteLine( "Declared queue {0}", queueDeclareOk.QueueName); channel.BasicQos(0, 50, false); // use "" for server named queue var consumerTag = channel.BasicConsume( "", false, Guid.NewGuid().ToString("N", CultureInfo.InvariantCulture), true, true, null, new AsyncDefaultBasicConsumer(channel)); while (true) { Thread.Sleep(2000); Console.WriteLine("Waiting for a message"); } } } } ```

when it runs, make network between this app and rabbitmq server disconnected. For example, when running rabbitmq using docker, you can run the following commands:

docker network ls
# choose the network your rabbitmq container uses
docker network disconnect NETWORK_NAME RABBITMQ_CONTAINER
# wait for autorecovery process in console application starts
docker network connect NETWORK_NAME RABBITMQ_CONTAINER

After this steps it is expected that connection, with consumers and topology, will be recovered successfully.

Actually, recovery process didn't complete, because of consumer recovery error:

Informational: Performing automatic recovery
Error: Topology recovery exception
Error: Exception: RabbitMQ.Client.Exceptions.TopologyRecoveryException
Caught an exception while recovering consumer aa92187d6e204b399a45f2638132a4cf on queue : The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text='NOT_FOUND - no previously declared queue', classId=60, methodId=20
Stack trace

```log Informational: Performing automatic recovery Error: Topology recovery exception Error: Exception: RabbitMQ.Client.Exceptions.TopologyRecoveryException Caught an exception while recovering consumer aa92187d6e204b399a45f2638132a4cf on queue : The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text='NOT_FOUND - no previously declared queue', classId=60, methodId=20 InnerException: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text='NOT_FOUND - no previously declared queue', classId=60, methodId=20 at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout) at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer) at RabbitMQ.Client.Impl.RecordedConsumer.Recover(IModel channelToUse) at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverConsumers(AutorecoveringModel modelToRecover, IModel channelToUse) Error: Exception when recovering connection. Will try again after retry interval. Error: Exception: RabbitMQ.Client.Exceptions.TopologyRecoveryException Caught an exception while recovering consumer aa92187d6e204b399a45f2638132a4cf on queue : The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text='NOT_FOUND - no previously declared queue', classId=60, methodId=20 at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.HandleTopologyRecoveryException(TopologyRecoveryException e) at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverConsumers(AutorecoveringModel modelToRecover, IModel channelToUse) at RabbitMQ.Client.Impl.AutorecoveringModel.AutomaticallyRecover(AutorecoveringConnection conn, Boolean recoverConsumers) at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverModelsAndItsConsumers() at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.TryPerformAutomaticRecovery() InnerException: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text='NOT_FOUND - no previously declared queue', classId=60, methodId=20 at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout) at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer) at RabbitMQ.Client.Impl.RecordedConsumer.Recover(IModel channelToUse) at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverConsumers(AutorecoveringModel modelToRecover, IModel channelToUse) Informational: Received request to BeginAutomaticRecovery, but already in Recovering state. ```

The possible root cause is that RecordedConsumer stores "" queue name when trying to call BasicConsume. When queues are recovering, an attempt is made to change consumers queue names. But RecordedQueue stores server-generated name. As the result, the name of queue in RecordedConsumer didn't changed - it still stores "", which leads to 404 error, because it didn't exists.

michaelklishin commented 2 years ago

You have missed one crucial piece of information: what version of the client is used.

michaelklishin commented 2 years ago

If you understand what the issue is, you are welcome to submit a PR.

lukebakken commented 1 year ago

I'm going to see if there's a reasonably simple fix. The obvious work-around is to change your code to NOT use the empty string when consuming:

var consumerTag = channel.BasicConsume(
    queueDeclareOk.QueueName,
    false,
    Guid.NewGuid().ToString("N", CultureInfo.InvariantCulture),
    true,
    true,
    null,
    new AsyncDefaultBasicConsumer(channel));
lukebakken commented 1 year ago

Fixed by #1324

lukebakken commented 1 year ago

@stukselbax the fix for this issue will ship in version 6.5.0.