rabbitmq / rabbitmq-dotnet-client

RabbitMQ .NET client for .NET Standard 2.0+ and .NET 4.6.2+
https://www.rabbitmq.com/dotnet.html
Other
2.06k stars 574 forks source link

Deadlock when closing a channel in the Received Event #1382

Closed romanlum closed 7 months ago

romanlum commented 10 months ago

Describe the bug

We are currently have an unexpected behaviour in some rare network error situations.

There is a timeout on TXCommit and after that we are cleaning up the connection and try to reconnect to the rabbitMQ server.

Here is the Consumer_Received method.

private void Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
        {
            var rabbitMessage = new RabbitMessage
            {
                Body = eventArgs.Body,
                Properties = eventArgs.BasicProperties
            };

            IModel channel = Consumer.Model;
            if (sender is EventingBasicConsumer consumer)
            {
                channel = consumer.Model;
            }

            try
            {
                // throws an exception to indicate the message cannot be received and should be re-delivered
                // with the eventing basic consumer this would be enough to not have the message acknowledged
                Receive(rabbitMessage);

                channel.BasicAck(eventArgs.DeliveryTag, false);
            }
            catch (Exception exception)
            {
                Log.Warn("Failed to receive rabbit message.", exception);
                channel.BasicReject(eventArgs.DeliveryTag, true); // reject a single message REVIEW - and mark for re-delivery
            }
            finally
            {
                try
                {
                    channel.TxCommit(); // commit the ack or the reject
                }
                catch (Exception e)
                {
                    Log.Error("Failed to commit on receive.", e);
                    try
                    {
                       // the RPC continuation queue probably still contains the commit call and the rollback may fail, but lets try first
                        channel.TxRollback(); // this should rollback the ack or reject

                    }
                    catch (Exception ex)
                    {
                        Log.Error("Failed to rollback after a failed commit on receive.", ex);
                        // now that the rollback failed after the commit had failed, try to cleanup so we can recover
-->                       Reconnect(); // closes both the send and receive channel to make sure the RPC continuation queues are emptied
                    }
                }
            }
        }

The reconnect method closes the Send and Receive Channel and also the Connection. This sometimes results in a deadlock because of the following code:

Model.Base.cs:456
public void OnSessionShutdown(object sender, ShutdownEventArgs reason)
        {
            ConsumerDispatcher.Quiesce();
            SetCloseReason(reason);
            OnModelShutdown(reason);
            BroadcastShutdownToConsumers(_consumers, reason);
   -->         ConsumerDispatcher.Shutdown(this).GetAwaiter().GetResult(); ;
        }

Is it allowed to close the rabbitMQ connection in the Received Event?

Should we call the disconnect method in another thread?


Task.Run(Reconnect);

Thanks for your help

Reproduction steps

Its hard to reproduce the network error that causes the TXCommit to timeout but the TCP connection does not fail.

Expected behavior

Connection.Close should not deadlock.

Maybe there is a missing ConfigureAwait(false) at ConsumerDispatcher.Shutdown(this).GetAwaiter().GetResult(); ?

Additional context

RabbitMQ Client Version 6.4.0

Callstack of the deadlock

mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout, bool exitContext) Unknown mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout) Unknown mscorlib.dll!System.Threading.ManualResetEventSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) Unknown mscorlib.dll!System.Threading.Tasks.Task.SpinThenBlockingWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) Unknown mscorlib.dll!System.Threading.Tasks.Task.InternalWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) Unknown mscorlib.dll!System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(System.Threading.Tasks.Task task) Unknown mscorlib.dll!System.Runtime.CompilerServices.TaskAwaiter.GetResult() Unknown [Waiting on Async Operation, double-click or press enter to view Async Call Stacks] RabbitMQ.Client.dll!RabbitMQ.Client.Impl.ModelBase.OnSessionShutdown(object sender, RabbitMQ.Client.ShutdownEventArgs reason) Line 471 C# RabbitMQ.Client.dll!RabbitMQ.Client.Impl.SessionBase.OnSessionShutdown(RabbitMQ.Client.ShutdownEventArgs reason) Line 112 C# RabbitMQ.Client.dll!RabbitMQ.Client.Impl.SessionBase.Close(RabbitMQ.Client.ShutdownEventArgs reason, bool notify) Line 139 C# RabbitMQ.Client.dll!RabbitMQ.Client.Impl.SessionBase.Close(RabbitMQ.Client.ShutdownEventArgs reason) Line 122 C# RabbitMQ.Client.dll!RabbitMQ.Client.Impl.SessionBase.OnConnectionShutdown(object conn, RabbitMQ.Client.ShutdownEventArgs reason) Line 99 C# RabbitMQ.Client.dll!RabbitMQ.Client.Framing.Impl.Connection.OnShutdown() Line 721 C# RabbitMQ.Client.dll!RabbitMQ.Client.Framing.Impl.Connection.Close(RabbitMQ.Client.ShutdownEventArgs reason, bool abort, System.TimeSpan timeout) Line 320 C# RabbitMQ.Client.dll!RabbitMQ.Client.Framing.Impl.Connection.Close(ushort reasonCode, string reasonText, System.TimeSpan timeout) Line 1020 C# RabbitMQ.Client.dll!RabbitMQ.Client.Framing.Impl.Connection.Close() Line 1002 C# RabbitMQ.Client.dll!RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Close() Line 787 C# Rabbit.dll!Rabbit.RabbitMessageTransport.CloseConnection(RabbitMQ.Client.IConnection connection) Line 1061 C# Rabbit.dll!Rabbit.RabbitMessageTransport.Disconnect() Line 1046 C# Rabbit.dll!Rabbit.RabbitMessageTransport.UpdateConnectionState(Rabbit.RabbitConnectionState state) Line 1228 C# Rabbit.dll!Rabbit.RabbitMessageTransport.Reconnect() Line 1200 C# Rabbit.dll!Rabbit.RabbitMessageTransport.Consumer_Received(object sender, RabbitMQ.Client.Events.BasicDeliverEventArgs eventArgs) Line 564 C# RabbitMQ.Client.dll!RabbitMQ.Client.Events.EventingBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, System.ReadOnlyMemory body) Line 90 C# RabbitMQ.Client.dll!RabbitMQ.Client.Impl.ConcurrentConsumerDispatcher.HandleBasicDeliver.AnonymousMethod0() Line 75 C# RabbitMQ.Client.dll!RabbitMQ.Client.Impl.ConsumerWorkService.WorkPool.Loop() Line 104 C# [Resuming Async Method] mscorlib.dll!System.Runtime.CompilerServices.AsyncMethodBuilderCore.MoveNextRunner.InvokeMoveNext(object stateMachine) Unknown mscorlib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx) Unknown mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx) Unknown mscorlib.dll!System.Runtime.CompilerServices.AsyncMethodBuilderCore.MoveNextRunner.Run() Unknown mscorlib.dll!System.Runtime.CompilerServices.AsyncMethodBuilderCore.OutputAsyncCausalityEvents.AnonymousMethod0() Unknown mscorlib.dll!System.Runtime.CompilerServices.AsyncMethodBuilderCore.ContinuationWrapper.Invoke() Unknown System.Threading.Tasks.Extensions.dll!System.Runtime.CompilerServices.ValueTaskAwaiter..cctor.AnonymousMethod__9_0(object state) Unknown System.Threading.Channels.dll!System.Threading.Channels.AsyncOperation.SetCompletionAndInvokeContinuation() Unknown System.Threading.Channels.dll!System.Threading.Channels.AsyncOperation.UnsafeQueueSetCompletionAndInvokeContinuation.AnonymousMethod__37_0(object s) Unknown mscorlib.dll!System.Threading.Tasks.Task.InnerInvoke() Unknown mscorlib.dll!System.Threading.Tasks.Task.Execute() Unknown mscorlib.dll!System.Threading.Tasks.Task.ExecutionContextCallback(object obj) Unknown mscorlib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx) Unknown mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx) Unknown mscorlib.dll!System.Threading.Tasks.Task.ExecuteWithThreadLocal(ref System.Threading.Tasks.Task currentTaskSlot) Unknown mscorlib.dll!System.Threading.Tasks.Task.ExecuteEntry(bool bPreventDoubleExecution) Unknown mscorlib.dll!System.Threading.Tasks.Task.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem() Unknown mscorlib.dll!System.Threading.ThreadPoolWorkQueue.Dispatch() Unknown mscorlib.dll!System.Threading._ThreadPoolWaitCallback.PerformWaitCallback() Unknown [Async Call Stack]
[Async] mscorlib.dll!System.Threading.Tasks.Task.Run Unknown



Async Logicial Stack

![image](https://github.com/rabbitmq/rabbitmq-dotnet-client/assets/464862/38dad99f-836a-4472-b106-773789742a7b)
michaelklishin commented 10 months ago

This is highly unusual to close a channel from within a delivery handling method of its consumer. channel.close is a synchronous operation, it instructs the channel to wait for a response, while it is in the middle of a channel dispatch.

@lukebakken @Gsantomaggio do we care to address this? Closing in an asynchronous task should be a decent workaround.

lukebakken commented 10 months ago

To start with I strongly suggest not using AMQP transactions.

Having said that, yes it is probably a Bad Thing™ to close a connection within a delivery handler. Running a new Task should be fine. I'll see if I can come up with a better way to do this, though. I'm thinking you may be able to cancel your consumer and then re-connect when BasicConsume returns.

romanlum commented 10 months ago

Thanks for your quick reply. Can you describe why we should not use AMQP Transactions?

lukebakken commented 10 months ago

https://www.rabbitmq.com/semantics.html

I've found that people assume AMQP TX provides transactions like those in a database. There are a lot of caveats in that document. If you're aware of them, and are OK with the performance of AMQP TX, then go right ahead.

In general, publisher confirmations are a better solution.

lukebakken commented 7 months ago

@romanlum I don't think there is actionable work for us to do with this issue.

Please try the following suggestions: