rebus-org / Rebus.RabbitMq

:bus: RabbitMQ transport for Rebus
https://mookid.dk/category/rebus
Other
63 stars 44 forks source link

Automatic retries are not attempted when RabbitMQ connection is closed. The messages dispatched while re-connecting are lost, which makes the bus unreliable #29

Closed dmitrynovik closed 5 years ago

dmitrynovik commented 5 years ago

I'm using Rebus with RabbitMQ and I am following the retries guide as described here: here

The steps:

  1. Set simple retry strategy as Simple with retries int.MaxValue
  var configuration = Configure.With(Adapter)
                .Options(opts => opts.SimpleRetryStrategy(maxDeliveryAttempts: int.MaxValue))
  1. Start client, watch it connect and pub-sub works
  2. Close connection by Restarting RabbitMQ service

EXPECTED: when the client re-connects, the messages dispatched during disconnected time would be re-tried and eventually delivered ACTUAL: the client reconnects but the messages are lost

You can see my example code here It dispatches sequential number to subscriber and subscriber reports if the sequence is broken (to make that happen, you need to close connection by re-starting Rabbit or any other means):

                    if (value - _seq > 1)
                        WriteError($"Message loss detected. Expected: ${_seq + 1}, received: {value}, lost: {value - _seq - 1}");
mookid8000 commented 5 years ago

The problem seems to be that your code does not handle errors when publishing messages.

The IBus.Publish(..) method returns a Task, which you must always await (if that's possible within your current context) or .Wait() on to observe the result.

So, if you change the StartPublishing method from this:

private static void StartPublishing(IBus bus, CancellationToken cancellation)
{
    ulong counter = 0;
    while (!cancellation.IsCancellationRequested)
    {
        bus.Publish(new Message(++counter));
        Thread.Sleep(1000);
    }
}

into this

private static void StartPublishing(IBus bus, CancellationToken cancellation)
{
    ulong counter = 0;
    while (!cancellation.IsCancellationRequested)
    {
        var number = ++counter;

        while (true)
        {
            try
            {
                bus.Publish(new Message(number)).Wait();
                break;
            }
            catch (Exception)
            {
                Console.WriteLine($"Could not send message number {number} - waiting a short while...");
                Thread.Sleep(2000);
            }
        }

        Thread.Sleep(1000);
    }
}

then no messages are lost(*)


(*) I don't think you can call it "message loss" in your case, because the message was never sent.

dmitrynovik commented 5 years ago

Thank you for pointing out, albeit in Production grade code you probably won'r have a blocking call of Threas.Sleep on a single message which failed to publish. I basically had to code my own wrapper around IBus which has a concurrent queue of failed messages and does have a timer and batch retry once .Publish fails.

There is still a weird part though: if one does not call await on Publish - be it intentional or by mistake, and it is very easy to miss - the exception happening in Publish is being quietly swallowed, and I am not sure it does any good to anyone. If the message was not sent as in my case, I'd expect it to throw - with await or without. Even if it causes the process to die because of unhandled error - it is better to have an obvious failure which a developer can understand and fix quickly, rather than a false sense of security that something was sent when in a fact it was not.

mookid8000 commented 5 years ago

(...) if one does not call await on Publish - be it intentional or by mistake, and it is very easy to miss - the exception happening in Publish is being quietly swallowed, and I am not sure it does any good to anyone

....but that's how Tasks work 😄

If the message was not sent as in my case, I'd expect it to throw - with await or without

It's simply not possible to throw, so that the caller catches it, unless the caller waits for the exception to happen.

When you don't await or .Wait() the Task, the call proceeds on another (thread pool) thread, which is where the exception is going to happen.

(...) in Production grade code (....)

In "production grade code" you should probably just

await bus.Publish(yourEvent);

and then either let your current unit of work roll back and be retried (e.g. if you're in a Rebus handler, or if your unit of work gets retried in another way somehow), or use something like Polly to asynchronously retry the publish operation a couple of times.