rebus-org / Rebus.RabbitMq

:bus: RabbitMQ transport for Rebus
https://mookid.dk/category/rebus
Other
63 stars 44 forks source link

Simple retrying with publisher confirms causes errors #67

Closed MortenDuus closed 4 years ago

MortenDuus commented 4 years ago

When enabling publisher confirms boolean, catching a timeout exception and attempting a retry, we get a "Pipelining of requests forbidden" This likely occours because we use the same transactionscontext again, which holds the same model, and the model never called next on the SimpleBlockingRpcContinueation object it holds. The Model will clear the block when it gets a reply from rabbitMQ. but in case of a network outtage, like changing vpn or simply pulling the plug, a response is never received. But we do get a timeout, and then we retry and call publish again, on a model that is blocked by the last request. Therefore the next request will result in an exception.

I am attaching a small example that can reliably reproduce this, if some manuel intervention is required to cut the connection to rabbitMQ in a non-graceful way.

Example is found here: https://github.com/MortenDuus/RebusRabbitMq here is an example of the log of this error.

rebusrabbitbug

mookid8000 commented 4 years ago

What exactly do you mean by

(...) because we use the same transactionscontext again (...)

?

MortenDuus commented 4 years ago

We want to re-send as messsage because we get a timeout on the first attempt. In this file we call the InnerSend in that flow: https://github.com/rebus-org/Rebus/blob/dbc93a7250262973001ac3db9a8d26f898daf66e/Rebus/Bus/RebusBus.cs ` async Task InnerSend(IEnumerable destinationAddresses, Message logicalMessage) { var currentTransactionContext = GetCurrentTransactionContext(mustBelongToThisBus: true);

        if (currentTransactionContext != null)
        {
            await SendUsingTransactionContext(destinationAddresses, logicalMessage, currentTransactionContext);
        }
        else
        {
            using (var context = new TransactionContextWithOwningBus(this))
            {
                await SendUsingTransactionContext(destinationAddresses, logicalMessage, context);
                await context.Complete();
            }
        }
    }`

In this method we get a transactioncontext. If GetCurrentTransactionContext, returns the AmbientTransactionContext.Current, then that would be the same as in the previous Send.
But that transactioncontext then contains the same RabbitMqClientModel as before, which is in a blocked state. Then we get the pipelineerror because the model is already blocked.

MortenDuus commented 4 years ago

Atleast that is what i suspect happens. It is possible i use the rebus with Rabbitmq incorrectly. but i wouldnt expect the Publish method to return a pipeline error when i attempt to resend after i get an exception, the exception should maybe clear the blocked state on the model. But then, if RabbitMQ received the message, and is unable to send an ack back, what happens to the message in RabbitMQ

mookid8000 commented 4 years ago

I don't think it makes sense to retry sending of outgoing messages from within a Rebus handler.

All bus operations that result in outgoing messages are enlisted in the same transaction context, and that context will use the IModel from the consumer that recevied the message.

And correctly, as you're guessing towards, if that IModel cannot send, it must be in a failed state somehow, which in turn means that Rebus will not be able to ACK the message.

Conclusion: The best thing you can do in your Rebus handler is to send and publish without any retries, and then let things fail if they fail. If something fails, Rebus will try to NACK the message, which will not be successful in this case.

If the IModel is actually in a failed state, changes are the message being processed has alread been returned to the queue, ready for another consumer to receive. I think RabbitMQ does that when a connection dies.

I hope this is OK for you. I'm closing this issue for now. Feel free to continue the talk in here, if you feel this isn't resolved. 🙂

hulvei3 commented 4 years ago

Working together with @MortenDuus on this one.

The issue we are facing currently is that we are loosing messages if we occasionally experience a drop in network during or close to the publish operation. Therefore we are enabling publisher-confirms in our current Rebus implementation.

Our implementation is used by 100+ services, thus is shared by a .NET package that wraps Rebus operations, to ease the work of configuration changes etc.

Along with publisher-confirms also comes a high performance hit on waiting for the broker to confirm persistence of the published messages. In our setup the performance penalty is not acceptable, hence we want to unblock the publish operation (in most cases an API request scope) and hand-off the work with 1. publishing, 2. waiting for confirm, (3. retry if failed) to a background process. This is basically also what Rebus does now internally without publisher confirms, but if there's a way to also let Rebus take case of that with publisher confirms I'm all ears.

So basically our goal is two fold:

  1. Provide guarantee that no message will be lost during connection/channel issues
  2. Not significant affect response times in API transactions

No. 1 can be solved by publisher confirms No. 2 can be solved by not block on publish() (like Rebus does now without publisher confirms)

If you say that Rebus is actually re-queuing the failed message internally when publish confirms don't succeed then I missed that in the implementation. If that's the case then it will also re-send the message when connection is restored?

I should note though, that even though we see the "NotSupportedException: Pipelining of requests forbidden" (https://www.rabbitmq.com/dotnet-api-guide.html#model-sharing) we do succeed eventually in our tests where we have cut the network to our broker briefly. We just need to be able to reliably catch the right exception to retry only on connection timeouts.

mookid8000 commented 4 years ago

Publisher confirms OMG?! 😨

OK, after having read a little bit about publisher confirms, I have been convinced that requiring them to be enabled explicitly in Rebus IS ACTUALLY A BUG.

It's a bug, because Rebus promises "at least once delivery" – and end-to-end durability by default is vital to that promise. All other Rebus transports satisfy this property.

I would have released the change as a bugfix, if it weren't for the fact that I had to go and rename the EnablePublisherConfirms configuration method to SetPublisherConfirms, so it will be out as a new major: Rebus.RabbitMq 7.0.0.

Speed concerns

On my machine with a locally running RabbitMQ, I can publish about 800 messages per second if I publish them like this:

foreach (var msg in messages)
{
    await bus.Send(msg);
}

which I think is reasonable.

If you want to accelerate it while keeping end-to-end durability, you can batch your send operations like this:

using (var scope = new RebusTransactionScope())
{
    foreach (var msg in batch)
    {
        await _bus.Send(msg);
    }

    await scope.CompleteAsync();
}

which in a simple example on my machine gave me ~7000 messages sent per second.

The third option, which is the fastest, is to opt out of message durability alltogether. This can be done on a per-message basis by adding Rebus' rbs2-express header to the message like this:

var headers = new Dictionary<string, string> {{ Headers.Express, null }};

await bus.Send(message, headers);

Sending messages with the express flag using the two aforementioned methods (dumb foreach and batching) gave be 13k and 17k sends per second on my machine.

hulvei3 commented 4 years ago

Thank you, @mookid8000 for the speed considerations. We will test this and if it provides a decent performance, we can avoid having the wrap our own background worker doing this off the main request scope. Though, testing with a locally running RMQ is not a valid test case for use :) We will however look at using the RebusTransactionScope automatically batch published messages.

And thank you also for understanding our issue with publisher confirms now :) Again, it's important for me to note that the main reasons for enabling this is due to catching transient network issues during publishing. Subscribers in general just use the built-in re-delivery/ack mechanism of RMQ

MortenDuus commented 4 years ago

@mookid8000 thank you for reconsidering the issue.

I believe the problem is a little deeper than that, when we publish messages without publisher confirms, we can send message after message out on the connection before it gets a timeout and we have an error, all those messages are currently lost.

When we enable publisher confirms, and we send a message, we will get a timeout error if the connection has been cut, but not closed. That message is then (properly) lost.

If we try to send more messages in the same thread/transactioncontext/scope we get an "Pipelining of requests forbidden" Exception. We would not expect to get this error when we attempt to send new or resend old messages. We would expect to be able to actually attempt a new send, rather than run into a blocked IModel

This is because the IModel is blocked by the sending that caused the timeout. So any further attempts to publish new messages or retry (within that same thread/transactioncontext) will fail and are lost(until the IModel is unblocked). We do not get a new model after we get an exception back on sending.

hulvei3 commented 4 years ago

@mookid8000 please see comment from @MortenDuus above. Even though we can agree that the publisher confirms is a mandatory thing to have enabled if you should honor the "at-least-once" promise, I still think we are circling around the issue that we experience with publisher confirms enabled. Please refer to the example on the top.

Basically, by calling _bus.Publish() multiple times I would expect that Rebus would handle this case with the .NET client so we don't get different exceptions during transient issues.

Having publisher confirms only gives us a possibility to act on failures - we still need to re-send/publish messages accordingly AFAIK.

mookid8000 commented 4 years ago

@hulvei3 I think the error message you're experiencing, "pipelining bla bla forbidden", is caused by you reusing the transaction context somehow, and not by threading issues as insinuated by the link you posted.

@MortenDuus also wrote

(...) This likely occours because we use the same transactionscontext again, which holds the same model (...) which sounds like you're doing exactly that, although I can't seem to figure out how you're doing it by looking at your code.

I tried running your code and restarting RabbitMQ in the middle of everything. It generated a lot of noise in the log, but I couldn't get it to lose messages.

Could you maybe tell me some more about how your retry logic is structured?

MortenDuus commented 4 years ago

I think if you restart rabbitmq you do not get such errors, any time a connection is closed "gracefully" it will recover.. I had the same issue with reproducing them on a local rabbitmq. however, if you try to access a rabbitmq that is on a remote machine and you cut the connection, or via VPN disconnect or simply disable the NIC on the docker host that is running rabbitMQ, then you will be able to get the errors.

basically the client must think the connection is still open and attempt to send messages.

any retry logic is not required, all that is required is that we still attempt to send more messages after the first System.TimeoutException

MortenDuus commented 4 years ago

@mookid8000 any luck replicating the issues with the locked model? I will be happy demonstrate the problem on a skype session or similar.

mookid8000 commented 4 years ago

Hi there, I finally got the chance to try and reproduce your issue again.

First, I emulated severe package loss and stuff by using Clumsy, but that didn't cause any trouble.

Then I configured your code to connect to a broker at CloudAmqp, and then I switched the network on and off. That DID cause some pretty odd behavior, as it seemed to reconnect and then start receiving every other message?!?!? and then a period of silence, and then suddenly all of the missing message arrived.

When I tried breaking the connection twice, every 3rd message was initially received when the connection recovered... 🤔

Check this out: I added two dictionaries to your program, so I could track sent/received messages, and then I made the program compare these two dictionaries at the end. It looked like this in the console the time I broke the connection twice:

(...startup stuff...)
Received test 0
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "fcf8e28e-300d-4785-a12f-28514ffe3fde" to 1 handlers took 0 ms
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #6): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
Received test 1
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "74d3cc56-e5c8-4e39-8faf-8fb367f7f65b" to 1 handlers took 0 ms
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #8): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
Received test 2
(... etc - here I break the connection...)
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "f9608a9e-7e13-43d3-a907-dccf0f45461c" to 1 handlers took 0 ms
[INF] Rebus.Internals.ConnectionManager (Rebus 1 worker 1): Existing connection found to be CLOSED
[INF] Rebus.Internals.ConnectionManager (Rebus 2 worker 1): Existing connection found to be CLOSED
[WRN] Rebus.Internals.ConnectionManager (Rebus 2 worker 1): Could not establish connection: "None of the specified endpoints were reachable"
[WRN] Rebus.Internals.ConnectionManager (Rebus 1 worker 1): Could not establish connection: "None of the specified endpoints were reachable"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #4): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.RabbitMq.RabbitMqTransport (Thread #4): Found out current model was closed... disposing it
[DBG] Rebus.RabbitMq.RabbitMqTransport (Thread #4): Initializing new model
[INF] Rebus.Internals.ConnectionManager (Thread #4): Existing connection found to be CLOSED
[WRN] Rebus.Internals.ConnectionManager (Thread #4): Could not establish connection: "None of the specified endpoints were reachable"
[WRN] Rebus.Workers.ThreadPoolBased.ThreadPoolWorker (Rebus 2 worker 1): An error occurred when attempting to receive the next message: Rebus.Exceptions.RebusApplicationException: Unexpected exception thrown while trying to dequeue a message from rabbitmq, queue address: RebusTestBug.Publisher
 ---> RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
 ---> System.AggregateException: One or more errors occurred. (Connection failed)
 ---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed
 ---> System.Net.Sockets.SocketException (11001): No such host is known.
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw(Exception source)
   at System.Net.Dns.HostResolutionEndHelper(IAsyncResult asyncResult)
   at System.Net.Dns.EndGetHostAddresses(IAsyncResult asyncResult)
   at System.Net.Dns.<>c.<GetHostAddressesAsync>b__25_1(IAsyncResult asyncResult)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
   at RabbitMQ.Client.TcpClientAdapter.ConnectAsync(String host, Int32 port)
   at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, Int32 millisecondsTimeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, Int32 timeout)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout, AddressFamily family)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingIPv4(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
   at RabbitMQ.Client.Framing.Impl.IProtocolExtensions.CreateFrameHandler(IProtocol protocol, AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
   at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint)
   at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IEndpointResolver endpoints)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IList`1 endpoints)
   at Rebus.Internals.ConnectionManager.GetConnection()
   at Rebus.RabbitMq.RabbitMqTransport.InitializeConsumer()
   at Rebus.RabbitMq.RabbitMqTransport.Receive(ITransactionContext context, CancellationToken cancellationToken)
   --- End of inner exception stack trace ---
   at Rebus.RabbitMq.RabbitMqTransport.Receive(ITransactionContext context, CancellationToken cancellationToken)
   at Rebus.Workers.ThreadPoolBased.ThreadPoolWorker.ReceiveTransportMessage(CancellationToken token, ITransactionContext context)
[ERR] Rebus.RabbitMq.Bug.Tester (Thread #4): Error, retrying: Could not 'GetOrAdd' item with key 'rabbitmq-current-model' as type RabbitMQ.Client.IModel

(... logs logs logs....)
[INF] Rebus.Internals.ConnectionManager (Rebus 1 worker 1): Existing connection found to be CLOSED
[INF] Rebus.RabbitMq.RabbitMqTransport (Rebus 2 worker 1): Successfully initialized consumer for "RebusTestBug.Publisher"
[INF] Rebus.RabbitMq.RabbitMqTransport (Rebus 1 worker 1): Successfully initialized consumer for "RebusTestBug"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #19): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #29): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
Received test 15
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "0986b795-6d73-4406-b18e-86bc4737eaf9" to 1 handlers took 3 ms
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #19): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #29): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
Received test 17
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "b7e02250-37a6-4ab3-bdd6-80dd83920899" to 1 handlers took 0 ms
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #16): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.RabbitMq.RabbitMqTransport (Thread #16): Found out current model was closed... disposing it
[DBG] Rebus.RabbitMq.RabbitMqTransport (Thread #16): Initializing new model
[INF] Rebus.Internals.ConnectionManager (Thread #16): Existing connection found to be CLOSED
[WRN] Rebus.Internals.ConnectionManager (Thread #16): Could not establish connection: "None of the specified endpoints were reachable"
[INF] Rebus.Internals.ConnectionManager (Rebus 1 worker 1): Existing connection found to be CLOSED
[WRN] Rebus.Internals.ConnectionManager (Rebus 1 worker 1): Could not establish connection: "None of the specified endpoints were reachable"
[INF] Rebus.Internals.ConnectionManager (Rebus 2 worker 1): Existing connection found to be CLOSED
[ERR] Rebus.RabbitMq.Bug.Tester (Thread #16): Error, retrying: Could not 'GetOrAdd' item with key 'rabbitmq-current-model' as type RabbitMQ.Client.IModel
[WRN] Rebus.Internals.ConnectionManager (Rebus 2 worker 1): Could not establish connection: "None of the specified endpoints were reachable"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #16): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.RabbitMq.RabbitMqTransport (Thread #16): Initializing new model
(... here I break it again...)
[INF] Rebus.Internals.ConnectionManager (Thread #16): Existing connection found to be CLOSED
[WRN] Rebus.Internals.ConnectionManager (Thread #16): Could not establish connection: "None of the specified endpoints were reachable"
[WRN] Rebus.Workers.ThreadPoolBased.ThreadPoolWorker (Rebus 1 worker 1): An error occurred when attempting to receive the next message: Rebus.Exceptions.RebusApplicationException: Unexpected exception thrown while trying to dequeue a message from rabbitmq, queue address: RebusTestBug
 ---> RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
 ---> System.AggregateException: One or more errors occurred. (Connection failed)
 ---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed
 ---> System.Net.Sockets.SocketException (11001): No such host is known.
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw(Exception source)
   at System.Net.Dns.HostResolutionEndHelper(IAsyncResult asyncResult)
   at System.Net.Dns.EndGetHostAddresses(IAsyncResult asyncResult)
   at System.Net.Dns.<>c.<GetHostAddressesAsync>b__25_1(IAsyncResult asyncResult)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
   at RabbitMQ.Client.TcpClientAdapter.ConnectAsync(String host, Int32 port)
   at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, Int32 millisecondsTimeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, Int32 timeout)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout, AddressFamily family)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingIPv4(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
   at RabbitMQ.Client.Framing.Impl.IProtocolExtensions.CreateFrameHandler(IProtocol protocol, AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
   at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint)
   at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IEndpointResolver endpoints)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IList`1 endpoints)
   at Rebus.Internals.ConnectionManager.GetConnection()
   at Rebus.RabbitMq.RabbitMqTransport.InitializeConsumer()
   at Rebus.RabbitMq.RabbitMqTransport.Receive(ITransactionContext context, CancellationToken cancellationToken)
   --- End of inner exception stack trace ---
   at Rebus.RabbitMq.RabbitMqTransport.Receive(ITransactionContext context, CancellationToken cancellationToken)
   at Rebus.Workers.ThreadPoolBased.ThreadPoolWorker.ReceiveTransportMessage(CancellationToken token, ITransactionContext context)
(.... lots and lots of logs ....)
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #29): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #16): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #16): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #16): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #29): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #16): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #29): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #6): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #16): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[INF] Rebus.RabbitMq.RabbitMqTransport (Rebus 2 worker 1): Successfully initialized consumer for "RebusTestBug.Publisher"
[INF] Rebus.Internals.ConnectionManager (Rebus 1 worker 1): Existing connection found to be CLOSED
[INF] Rebus.RabbitMq.RabbitMqTransport (Rebus 1 worker 1): Successfully initialized consumer for "RebusTestBug"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #29): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #40): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #41): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
Received test 30
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "8ee683e8-d148-4253-9c71-cb93d2021081" to 1 handlers took 0 ms
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #40): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #8): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #41): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
Received test 33
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "f6c2b41b-7e4a-47fb-a334-ca3a39e25d3d" to 1 handlers took 0 ms
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #39): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #41): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #41): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
Received test 36
( snip )
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "228d8f0e-1d20-43fc-8d09-04ee6cc098d6" to 1 handlers took 0 ms
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #19): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #41): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #41): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
Received test 96
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "affbd568-6972-49e3-94a9-615bb75b0b8d" to 1 handlers took 0 ms
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #41): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #19): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #41): Sending Rebus.RabbitMq.Bug.TestType -> "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug@RebusTopics"
Received test 99
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "b5d72ad5-db61-4604-a61e-1cb254b76f9f" to 1 handlers took 0 ms
Press ENTER to check
( ... after a while this happens: )
Received test 14
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "cc1b6885-575e-4986-aa7f-57e7142d0971" to 1 handlers took 0 ms
Received test 18
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "c406bc05-4e6d-44a8-b21e-ce4f990fc5a8" to 1 handlers took 0 ms
Received test 22
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "0540adf7-41d4-4812-844a-9a95c72244ee" to 1 handlers took 0 ms
Received test 26
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "d0b9171c-d5dd-4c8c-aa3a-34ad0cf246eb" to 1 handlers took 0 ms
Received test 31
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "ed98815c-e1c0-4ae2-9af7-d877c506dc80" to 1 handlers took 0 ms
Received test 37
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "ec0c0b69-4c32-402e-a80c-b31c072085b3" to 1 handlers took 0 ms
Received test 43
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "2d26ac08-4859-4c09-92fc-7e2869d28254" to 1 handlers took 0 ms
Received test 49
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "13b2246a-30da-4741-8329-2b1a57c7474b" to 1 handlers took 0 ms
Received test 55
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "584c0755-d225-449f-83e9-a3b62ced0c13" to 1 handlers took 0 ms
Received test 61
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "52632cf2-7ead-4b01-a5e4-24f0a99015e6" to 1 handlers took 0 ms
Received test 67
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "5b6fe48b-7021-41fa-9832-3ddcf1541e0d" to 1 handlers took 0 ms
Received test 73
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "dd3e9d37-551d-480e-ba3a-6a7ba1ab1ccc" to 1 handlers took 0 ms
Received test 79
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "bfd812ad-8620-40a5-901f-bc614aaa6708" to 1 handlers took 0 ms
Received test 85
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "fb0edf39-f08f-4552-baed-50ac9ec4b3b9" to 1 handlers took 0 ms
Received test 91
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "b448343d-f643-4425-861b-ea5a1f108241" to 1 handlers took 0 ms
Received test 97
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "30da762f-d269-41b4-902e-97c624f4b12b" to 1 handlers took 0 ms
Received test 13
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "54beffc9-8b8e-4eb1-a2c3-5a90188fdf8b" to 1 handlers took 0 ms
Received test 16
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "695e6bb2-28ca-40db-92f3-2ae8ea161fc9" to 1 handlers took 0 ms
Received test 19
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "95905fb4-92b2-42c9-80e2-cd9561992159" to 1 handlers took 0 ms
Received test 20
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "0055d392-27e5-4dc8-952e-6d07cf4def58" to 1 handlers took 0 ms
Received test 21
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "8e13ca4d-0387-499a-8d24-b40521844d2a" to 1 handlers took 0 ms
Received test 23
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "62a9410b-d5d1-42c5-b666-b7666ed05b35" to 1 handlers took 0 ms
Received test 24
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "182fa802-4206-4ce0-92da-8366f181c4c9" to 1 handlers took 0 ms
Received test 25
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "b4065e82-340d-4b78-bcef-07de5abba67d" to 1 handlers took 0 ms
Received test 27
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "74fb87c8-df76-4285-a2f3-fa46d440817c" to 1 handlers took 0 ms
Received test 28
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "36c82e7f-ed1e-486a-bce8-503b707dc521" to 1 handlers took 0 ms
Received test 29
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "882900b2-0112-426a-b001-988a491425ca" to 1 handlers took 0 ms
Received test 32
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "defc4038-9ade-48f3-9a84-f4824f3865f3" to 1 handlers took 0 ms
Received test 34
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "c36cc2fb-35b5-41ed-9c9b-a68f086689fd" to 1 handlers took 0 ms
Received test 35
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "d71a6c76-4ba5-4849-90c7-9651fd6709a2" to 1 handlers took 0 ms
Received test 38
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "2b1e942c-2841-4eb9-89fe-b3b1f7160dc8" to 1 handlers took 0 ms
Received test 40
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "8a2f071a-2b69-473b-848b-84e1495b8321" to 1 handlers took 0 ms
Received test 41
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "537e3def-b602-41fb-990f-ad1dc4be3713" to 1 handlers took 0 ms
Received test 44
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "ff39a43c-3c1c-4b25-937f-5defa602060b" to 1 handlers took 0 ms
Received test 46
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "3477de5e-556f-4841-a432-eb267a733069" to 1 handlers took 0 ms
Received test 47
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "fbedc2eb-4f5d-4870-9eed-a6bb499be5b1" to 1 handlers took 0 ms
Received test 50
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "ec407ba3-bba4-49a0-b1b2-8ea91f5219f3" to 1 handlers took 0 ms
Received test 52
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "afdbe082-8caa-45ac-80ba-ab36c107906e" to 1 handlers took 0 ms
Received test 53
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "87098b4a-f410-402b-92c5-e019754ec237" to 1 handlers took 0 ms
Received test 56
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "347f163e-d282-4fa4-8bbe-687a5af78b28" to 1 handlers took 0 ms
Received test 58
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "975658e5-641f-4df0-8ec4-34d2d2d43ddd" to 1 handlers took 0 ms
Received test 59
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "bcc7d67c-da82-482c-9a85-e8e48d743d84" to 1 handlers took 0 ms
Received test 62
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "8895855f-35da-4338-9e81-1bdf9306ce93" to 1 handlers took 0 ms
Received test 64
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "d41d1f8d-4529-4eda-88af-3ffe7a18e8a5" to 1 handlers took 0 ms
Received test 65
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "94b45083-2a1a-47ec-abf4-109a5540dada" to 1 handlers took 0 ms
Received test 68
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "622702f1-76fd-4542-b927-7102fc608008" to 1 handlers took 0 ms
Received test 70
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "82106217-db38-4841-9360-22a57fcaf473" to 1 handlers took 0 ms
Received test 71
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "540234a6-cf8a-4f14-a314-0c252cc8a83d" to 1 handlers took 0 ms
Received test 74
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "42024edb-3c7e-48f1-997f-0d97b8a554e8" to 1 handlers took 0 ms
Received test 76
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "13a13a19-c915-4007-b9d9-2d2d36edf783" to 1 handlers took 0 ms
Received test 77
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "15d5107c-c588-42fb-b78f-0a83d97d64c7" to 1 handlers took 0 ms
Received test 80
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "ad093db7-8936-46d5-98b1-dc68f742fba4" to 1 handlers took 0 ms
Received test 82
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "bbbf97c6-039c-46a4-95b9-79168156a887" to 1 handlers took 0 ms
Received test 83
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "2f1b88ce-80fa-41f5-980e-8ff09edcebc0" to 1 handlers took 0 ms
Received test 86
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "fc679536-4945-4089-93ca-0fb7dc98a420" to 1 handlers took 0 ms
Received test 88
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "ca12f8fe-65d2-428f-a5a0-caf52de873f5" to 1 handlers took 0 ms
Received test 89
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "2e91da24-0814-4880-9ad1-bfff08b6056d" to 1 handlers took 0 ms
Received test 92
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "0353aaf6-bb0a-4cb5-be01-096ba273e30c" to 1 handlers took 0 ms
Received test 94
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "e9c2d277-245f-4d78-a028-67c6505806d2" to 1 handlers took 0 ms
Received test 95
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "d6d43f59-cc3e-4de3-8944-fa9fca786eb0" to 1 handlers took 0 ms
Received test 98
[DBG] Rebus.Pipeline.Receive.DispatchIncomingMessageStep (Rebus 1 worker 1): Dispatching "Rebus.RabbitMq.Bug.TestType, Rebus.RabbitMq.Bug" "ccabe039-a5f6-4ae3-8383-5bf3bcb9fdaa" to 1 handlers took 0 ms
( ... and then my final dictionary comparison concludes: )
All sent messages were received
mookid8000 commented 4 years ago

So in conclusion: I wasn't exactly able to reproduce any kind of LOSS of messages, rather some funny very reordered deliveries.

hulvei3 commented 4 years ago

I think we just need to re-iterate on what our original issue is. He is our experience and observations:

  1. Obviously, we are only seeing the issue with message-loss on the publisher side, because the receiving end would never "loose" messages as any non-handled messages will be un-acked and RMQ would re-deliver those messages to same or other consumers.
  2. We only see an issue with message-loss when NOT having publisher-confirms enabled. Enabling publisher-confirms clearly solves that. If experiencing connectivity issues, we can now react on any exceptions thrown from RMQ lib/Rebus transport. Nothing more to discuss here.
  3. The re-ordering is to be expected if messages are being re-delivered or using multiple competing consumers (RMQ doesn't promise that AFAIK) - but that's another talk and doesn't affect us in this scenario.

What we are trying to raise here is the fact that you cannot seem to rely the same exception (TimeoutException) every time the connection fails/times-out, when catching the connection failures and calling Publish() again.

How we are re-producing this

Components:

  1. Prepare service B to call API endpoint on service A 100x times that would produce 1 message sent to RMQ from service A per single HTTP request. This should result in 100 messages eventually arriving to a predefined queue in RMQ
  2. When service B starts to call service A 100x times, we cut the connection between service A and the connected RMQ node (service B can still call service A) Duration of the connection breakage should be at least as long as the configured RMQ heartbeat interval / 2 before you would even see any issues other than just a blocking requests.
  3. Service B is now affected by this as the request is blocking due to service A is waiting for ack from RMQ
  4. .NET RMQ Client lib discovers the connection failure due to a timeout and throws an TimeoutException - Rebus is not involved in this (I guess)
  5. Service A catches this exception and does it's first retry of the failed publish. Publish() is called from the same message, same instance of Rebus, using Polly.
  6. .NET RMQ Client lib now throws immediately again, _but now with a different exception__:NotSupportedException. My guess is that it's caused due to this: https://www.rabbitmq.com/dotnet-api-guide.html#model-sharing
  7. We re-enable the network connection and service A eventually succeed to publish the message, which makes the blocking request from service B complete.

As stated in the first comments, we actually succeed with implementing retries and delivering all messages throughout a full connection failure - without message loss. But we basically just want to raise what is - from our perspective - some odd behavior when you call the same method twice and get different exceptions. This shows some level of state in either the transport or Rebus itself that just provides confusion. Given the fact that the RMQ docs clearly states this exception (`NotSupportedException) is provoked by sharing the model across threads, I sense that there is a bug somewhere here, that we just happen to recover from, or the expectations of using the Rebus transport isn't clearly defined.

mookid8000 commented 4 years ago

You're absolutely right that Rebus' RabbitMQ transport has some internal state: It keeps a ConcurrentQueue<IModel>, which it uses as a connection pool.

The logic to discard closed connections when grabbing the next available IModel was slightly wonky, so I've changed it to quicker get rid of closed connections, when looking for the next available model.

Could you try Rebus.RabbitMq 7.1.1 ? (which is on NuGet.org in a few minutes...)

Btw. sorry for being so slow to pick up on your issue. 🙄