pardahlman / RawRabbit

A modern .NET framework for communication over RabbitMq
MIT License
747 stars 144 forks source link

Recovery from the network failure #301

Closed drak25 closed 6 years ago

drak25 commented 6 years ago

Hello, I am using 2.0 version of library

What is the correct configuration to make RawRabbit insensitive to connection or server failure ? I have a cluster of 3 rabbitmq servers, I use configuration:

RawRabbitConfiguration { Username = "xx", Password = "xx", Port = 9672, VirtualHost = "/", Hostnames = { "192.168.1.1", "192.168.1.2", "192.168.1.3" }, PublishConfirmTimeout = TimeSpan.FromSeconds(3), AutomaticRecovery = true, PersistentDeliveryMode = true, RecoveryInterval = TimeSpan.FromSeconds(10), TopologyRecovery = true,

            // more props here.
        };

When i start to send a batch of messages, RawRabbit establishes connection with one of the servers and start to send messages. It quickly scales up the channels. However, when i break the connection or stop the server the rawrabbit is connected to, within 10 seconds it establishes new connection but do not send any more messages.

The Publish method accepts new messages but they are never delivered to rabbitmq. Debugging the code i determined that everything stops at method :

    public override async Task InvokeAsync(IPipeContext context, CancellationToken token = default(CancellationToken))
    {
        var channel = await GetChannelAsync(context, token);
        SaveInContext(context, channel);
        await Next.InvokeAsync(context, token);
    }

From public class PooledChannelMiddleware : Middleware

It never gets past the first line of the method.

The second problem is that when i break the connection the library crashes in method

public void SetupScaling() in class AutoScalingChannelPool

on line

            var workPerChannel = ChannelRequestQueue.Count / Pool.Count;

Becease it turns out that when the connection is broken, the Pool.Count is 0. I temporarily mitigated this problem by adding if (Pool.Count == 0) return; Just before this line.

Library is great and i just need to find out correct way to use it to survive connection problems.

Regards Piotr WΓ³jcicki

pardahlman commented 6 years ago

Hello @drak25, thanks for reaching out πŸ‘‹

RawRabbit is designed to handle connectivity problems out of the box. It has publish buffers, subscription recovery and even logic for reconnecting and resetting publish ack sequences. That being said, there are still room for improvements πŸ˜‰.

The problem you described sounds similar to #299. I'm looking into this right now, and there are indeed scenarios where the scaling is not acting as desired (it is a relatively new feature of 2.0). Right now I'm leaning towards bailing on the scaling routine if the connection to the broker is closed. Until this is figured out your workaround is good (alternatively register another channel pool, like ResilientChannelPool.

pardahlman commented 6 years ago

Hello again, I've done some improvements in the channel pool. Running the sample application, I'm able to restart the broker at any time and still get responses. This fix will be part of 2.0.0-rc3 πŸ‘Œ

drak25 commented 6 years ago

I tried to run the new code. I started publishing messages, killed the connection on broker. Within 10 seconds I got an exception at the new line of code: token.Register(() => recoverTcs.SetCanceled());

in ChannelFactory

The exception was :

Stack Trace : w System.Threading.Tasks.TaskCompletionSource`1.SetCanceled() w RawRabbit.Channel.ChannelFactory.<>c__DisplayClass8_0.b__0() w C:\GitRepo\RawRabbit\src\RawRabbit\Channel\ChannelFactory.cs:wiersz 85 w System.Threading.CancellationToken.ActionToActionObjShunt(Object obj) w System.Threading.CancellationCallbackInfo.ExecutionContextCallback(Object obj) w System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx) w System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx) w System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state) w System.Threading.CancellationCallbackInfo.ExecuteCallback() w System.Threading.CancellationTokenSource.CancellationCallbackCoreWork(CancellationCallbackCoreWorkArguments args) w System.Threading.CancellationTokenSource.ExecuteCallbackHandlers(Boolean throwOnFirstException)

Tried it 3 times. The exception happened every single time.

Best regards Piotr

pardahlman commented 6 years ago

Oh noes πŸ€¦β€β™‚οΈ it shoud be TrySetCanceled(). Reopening this.

drak25 commented 6 years ago

Changing it to TrySetCanceled() Prevents the crash. However, the issue is not resolved for me. After i terminate the connection on broker, the library never resumes message sending. I will try to debug it myself next week.

pardahlman commented 6 years ago

Can you give me a step-by-step on how to reproduce this? Thanks!

drak25 commented 6 years ago

This is exactly what i do.

The application:

` static void Main(string[] args) { var config = new RawRabbitConfiguration { Username = "xx", Password = "xx", Port = 9672, VirtualHost = "/", Hostnames = { "192.168.1.5" } };

        var client = RawRabbitFactory.CreateSingleton(new RawRabbitOptions() { ClientConfiguration = config, Plugins = p => p.UseAttributeRouting() });
        Task.WaitAll(new[] {Task.Run(async () =>
        {
            await client.DeclareExchangeAsync(new ExchangeDeclaration(){Name = "customname", ExchangeType = "direct", Durable = true});
            await client.DeclareQueueAsync(new QueueDeclaration() {Name = "custom.request.queue", Durable = true});
            await client.BindQueueAsync("custom.request.queue", "customname", "custom.request.queue");

        })}); 

          Task.Run(async () => { await Sender(config); });

        Console.ReadKey();
    }

    private static async Task Sender(RawRabbitConfiguration config)
    {
        var client = RawRabbitFactory.CreateSingleton(new RawRabbitOptions() { ClientConfiguration = config, Plugins = p => p.UseAttributeRouting()});
        Parallel.For(0, 10, async x =>
        {
            for (int i = 0; i < 10; i++)
            {
                for (int j = 0; j < 10000; j++)
                {
                    await client.PublishAsync(new Message() { Counter = j},
                        ctx => ctx.UsePublishAcknowledge(false));
                }
            }
        });

    }

    [Exchange(Name = "customname", Type = ExchangeType.Direct)]
    [Routing(RoutingKey = "custom.request.queue")]
    public class Message
    {
        public int Counter { get; set; }
    }

` The Broker is : version 3.7.2 Erlang 20.1

I start the application and open rabbitmq management site. I observe that the messages are correctly sent to the queue. I go to the Connections tab - there are two connections. I click each one and click Force Close button. On the queue tab i can see that message are not being sent any more. I wait couple of seconds. I open connection tab and see that two connections we reestablished, but on the queue tab i can see that no more messages are being published to the queue.

Regards Piotr

drak25 commented 6 years ago

I got a hint for you regarding this issue. The problem is somehow connected to the class ConcurrentChannelQueue

In method Enqueue the new TaskCompletionSource is created and added to the queue. The invoke method on the queue is invoked only if the queue was empty before adding the new taks.

In my scenario, after i terminated connection on broker, the Queue starts to fill up. While the connection is down, it is filled with hundreds of items. When the connection is brought up again, the invoke method is never invoked again as the queue is never empty. After I comment the line if (raiseEvent)

everything is working as expected. Within 10 seconds after connection termination, it is brought up again and messages are being sent.

public TaskCompletionSource Enqueue() { var modelTsc = new TaskCompletionSource(); var raiseEvent = _queue.IsEmpty; _queue.Enqueue(modelTsc); if (raiseEvent) { Queued?.Invoke(this, EventArgs.Empty); }

        return modelTsc;
    }
drak25 commented 6 years ago

Another hint.

This time it is in StaticChannelPool class.

After the connection is up, the Queued event from ConcurrentChannelQueue seems to be invoked. This invokes StartServerChannels from StaticChannelPool method. However, the method returns here :

        if (!Monitor.TryEnter(_workLock))
        {
            return;
        }

When the connection is terminated, the StartServerChannel method returns at :

            if (_current.Value.IsClosed)
            {
                Pool.Remove(_current);
                if (Pool.Count != 0) continue;
                if (Recoverables.Count == 0)
                {
                    throw new ChannelAvailabilityException("No open channels in pool and no recoverable channels");
                }
                return;
            }

So the monitor is never realeased and it blocks the execution forever.

Maybe the simple

        try
        {
            do
            {
                _current = _current?.Next ?? Pool.First;
                if (_current.Value.IsClosed)
                {
                    Pool.Remove(_current);
                    if (Pool.Count != 0) continue;
                    if (Recoverables.Count == 0)
                    {
                        throw new ChannelAvailabilityException("No open channels in pool and no recoverable channels");
                    }

                    return;
                }

                if (ChannelRequestQueue.TryDequeue(out var cTsc))
                {
                    cTsc.TrySetResult(_current.Value);
                }
            } while (!ChannelRequestQueue.IsEmpty);
        }
        finally
        {
            Monitor.Exit(_workLock);
        }

in StartServeChannels method would do the trick. It seems to work when i try it.

pardahlman commented 6 years ago

Good catch, @drak25! The Channel pool concept is a recent addition to RawRabbit, and it's clear that some of the corner cases are not yet covered.

I think your suggestion makes a lot of sense, however I'm a bit hesitate to add such a large try block. Many parts of the code will never throw exception (TrySetResult, TryDequeue) etc. The things that I see that could provoke an exception are

I think the nicest solution would be to make avoid try/finally if possible but make sure that all code paths ends with a Monitor.Exit.

It feels like you have your head wrapped around this - want to create a PR?

drak25 commented 6 years ago

Sure. Will do :-)

drak25 commented 6 years ago

While further analizing this issue i found another possible problem. In StaticChannelPool you use LinkedList To store Channels.

This is not a thread safe collection. However, it is being modified by multiple threads. It is potentially possible that this collection is modified by several threads:

I believe that Pool variable should be private in StaticChannelPool and some form of synchronization should be implemented to avoid any possibility of race condition.

Another concern that I have, regarding scaling down : How it is ensured that currently used channel is not scaled down (removed from pool and disposed) ?

Also regarding scalling down in AutoScalingChannelPool, dont you think that the timer used to gracefully dispose the Channel me be garbage collected before it is run ?

            if (scaleDown && workPerChannel < _options.DesiredAverageWorkload)
            {
                _logger.Debug("The estimated workload is {averageWorkload} operations/channel, which is lower than the desired workload ({desiredAverageWorkload}). Creating channel.", workPerChannel, _options.DesiredAverageWorkload);
                var toRemove = Pool.FirstOrDefault();
                Pool.Remove(toRemove);
                Timer disposeTimer = null;
                disposeTimer = new Timer(o =>
                {
                    (o as IModel)?.Dispose();
                    disposeTimer?.Dispose();
                }, toRemove, _options.GracefulCloseInterval, new TimeSpan(-1));
            }
pardahlman commented 6 years ago

Nice work! I think it is a great idea to make the Pool private and expose methods to add and remove items to it πŸ‘Œ .

As to your question about whether or not the channel is in use: there is no way to be 100% sure about this (the RabbitMQ client/AMQP does not implement any method to check for this). That's way the GracefulCloseInterval is used; the channel is first removed from the pool and then at a later stage disposed. The channel pools are typically used for publishing messages, all consumer use their own, dedicated channels.

drak25 commented 6 years ago

Hi The synchronization of pool collection turned out more complicated than i anticcipated. The collection is used in the whole inheritance tree, and my first attempt to synchronize it decreased throughput for a bit. As there is no proof that the lack of synchronization is a source of any direct problem, i decided to create a pull request containing only the necessary fix of the Monitor.

Maybe i will be able to come back to the synchronization problem when i have some free time in the future.

Regards Piotr

drak25 commented 6 years ago

Hi again, I got another thing to discuss.

I created a producer app which at start tries to declare an exchange.

The method looks like this :

    private async Task EnsurePath(MessageAsync message)
    {
        while (true)
        {
            try
            {
                Console.WriteLine("Declaring exchange");
                await _client.DeclareExchangeAsync(new ExchangeDeclaration()
                {
                    Name = Names.BaseExchange,
                    ExchangeType = "direct",
                    Durable = true
                });
                await _client.DeclareQueueAsync(new QueueDeclaration() {Name = message.Routing, Durable = true});
                await _client.BindQueueAsync(message.Routing, Names.BaseExchange, message.Routing);
                _trackedTypes.Add(message.GetType());
                return;
            }
            catch (BrokerUnreachableException ex)
            {
                counter++;
                Console.WriteLine("Exception " + counter +  " " + ex.Message);
                await Task.Delay(TimeSpan.FromSeconds(1));
            }
        }
    }

Now when i try ti run this method while the Broker is down, it will correctly show exception an try egain in the endless loop.

However, when the broker is started again, it keeps giving same exception.

It turns out that the ChannelFactory is added to dependency injections as Singleton:

            .AddSingleton<IChannelFactory>(resolver =>
            {
                var channelFactory = new ChannelFactory(resolver.GetService<IConnectionFactory>(), resolver.GetService<RawRabbitConfiguration>());
                channelFactory
                    .ConnectAsync()
                    .ConfigureAwait(false)
                    .GetAwaiter()
                    .GetResult();
                return channelFactory;
            })

And if first run of ConnectAsync method gives exception, every following call do not call ConnectAsync method of ChannelFactory, but returns the same exception.

It seems to be a result of calling ConnectAsync in AddSingleton method.

Everything is working fine if i remove invocation of ConnectAsync from the object creation method.

What do you think about this issue ?

pardahlman commented 6 years ago

It is a design decision to throw an exception if none of the defined brokers are reachable when the client is wired up. It it useful when deploying a new version of the service that the deployment fails if something hinders the client to connect to the broker. This behavior can be overridden by either override the registration through the DependencyInjection property of RawRabbitOptions, or by using the Polly enricher for various retry policies.

I appreciate that you're looking through the code and critically examine the decisions made πŸ‘

drak25 commented 6 years ago

Throwing an exception is ok - this is not a problem. The problem is that if you throw it in Singleton resolver, if the exception is thrown once, at the first invocation, the connection attempt is never made again - the exception is being thrown from that moment on every call - regardless of Broker state.

pardahlman commented 6 years ago

Perhaps I'm missing something, and I'm not able to look into the code in depth at the moment... but here goes :bowtie:

I see what you are saying, but the exception is thrown when creating the instance (right?) - so why would you assume that the instance would be functional? This should work

do
{
    try
    {
        _client = RawRabbitFactory.CreateSingleton();
    }
    catch (BrokerUnreachableException e) { }
    await Task.Delay(TimeSpan.FromSeconds(1));
} while(_client == null)
drak25 commented 6 years ago

Please do it like that.

  1. Stop the broker.

Run the code :

        public async Task Test()
        {
            var client = RawRabbitFactory.CreateSingleton(new RawRabbitOptions()
            {
                ClientConfiguration = new ConfigurationFactory().GetConfiguration(),
                Plugins = p => p.UseAttributeRouting()
            });
            int counter = 1;
            while (true)
            {
                counter++;
                try
                {
                    await client.DeclareExchangeAsync(new ExchangeDeclaration()
                    {
                        Name = "xxx",
                        ExchangeType = "direct",
                        Durable = true
                    });
                    break;

                }
                catch (Exception e)
                {
                    Console.WriteLine("Exception " + counter);
                }

                await Task.Delay(500);
            }
            Console.WriteLine("Connection ok");
        }
  1. While seeing exceptions start the broker.

What should be the outcome ? The outcome is that exceptions do not stop. The method always throw exception even after broker is started.

pardahlman commented 6 years ago

No worries πŸ˜‰

I see what you mean... The 1.x version of the client would throw when creating the client in the first place. In 2.x, the connection is not initialized until it is actually used. I need to think about this scenario some more, but my initial thought is that an exception would be thrown if none of the brokers are available at the time of client creation.

drak25 commented 6 years ago

Let me make sure that i am being understood correctly.

I am not questioning the design decision. It is ok for me that i am able to create client and get the exception when i am actually trying to connect.

The problem is that when on the first call to client.DeclareExchangeAsnc i get an exception, the exception is being cached in dependency injection cache, and is being thrown on every following method call, regardless of the broker state. Seems strange, but it is what happens :-)

pardahlman commented 6 years ago

Here's what it looks like when the clients loses connection to the broker after @drak25 PR was merged. Thanks!

lost-connection-to-broker

I've also successfully shutdown the broker for a longer period of time and it recovers nicely.

pardahlman commented 6 years ago

@drak25 regarding your previous comment: I've now updated the dependency graph so that the channel factory is instantiated together with the bus client, making it throw BrokerUnreachable when trying to create the instance. I'm not 100% happy with the way I implemented it, but it gets the job done πŸ˜‰

The idea is that one or more brokers needs to be reachable when creating the client. It is up to the caller to catch exception and come up with an error handling (that is, if you don't use the Polly enricher)

pardahlman commented 6 years ago

Closing this issue, as a fix is available in RC4. Feel free to create a new issue if you still have similar problems! Thanks!