rebus-org / Rebus.RabbitMq

:bus: RabbitMQ transport for Rebus
https://mookid.dk/category/rebus
Other
63 stars 44 forks source link

Stop receiving messages #39

Closed yan-oreshchenkov closed 5 years ago

yan-oreshchenkov commented 5 years ago

Hi there! We noticed some unreliable behavior of rebus in case of network issues, please advise.

  1. Micro-service on host#1 (netcore, rebus 4.2.1) connects to RabbitMQ 3.6.9 on host#2.
  2. Everything works perfect until some network issue appears. We know about the network issue, because all micro-services start to report about similar issue, plus external monitoring reports about its agent inaccessibility.
  3. The network issue usually takes 5-15 mins
  4. Sometimes rebus do a few reconnect attempts and continue to work, but sometimes we have the following in log:
    
      An error occurred when attempting to receive the next message: Rebus.Exceptions.RebusApplicationException: Queue throw EndOfStreamException(meaning it was canceled by rabbitmq) ---> System.IO.EndOfStreamException: SharedQueue closed
         at RabbitMQ.Util.SharedQueue`1.EnsureIsOpen()
         at RabbitMQ.Util.SharedQueue`1.Dequeue(Int32 millisecondsTimeout, T& result)
         at Rebus.RabbitMq.RabbitMqTransport.Receive(ITransactionContext context, CancellationToken cancellationToken)
         --- End of inner exception stack trace ---
         at Rebus.RabbitMq.RabbitMqTransport.Receive(ITransactionContext context, CancellationToken cancellationToken)
         at Rebus.Workers.ThreadPoolBased.ThreadPoolWorker.ReceiveTransportMessage(CancellationToken token, ITransactionContext context)

info: Rebus.RabbitMq.ConnectionManager[0] Existing connection found to be CLOSED

warn: Rebus.RabbitMq.ConnectionManager[0] Could not establish connection: None of the specified endpoints were reachable

warn: Rebus.RabbitMq.RabbitMqTransport[0] Could not initialize consumer: RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable ---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed ---> System.TimeoutException: The operation has timed out. at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, Int32 millisecondsTimeout) at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, Int32 timeout) --- End of inner exception stack trace --- at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector) at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IEndpointResolver endpoints) at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName) --- End of inner exception stack trace --- at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName) at Rebus.RabbitMq.ConnectionManager.GetConnection() at Rebus.RabbitMq.RabbitMqTransport.InitializeConsumer() at Rebus.RabbitMq.RabbitMqTransport.EnsureConsumerInitialized() - waiting 2 seconds

info: Rebus.RabbitMq.ConnectionManager[0] Existing connection found to be CLOSED

After that the micro-service stops to receive any queue messages without any tries to reconnect. No any new log messages appear, but docker says the service is up and running. If we restart the service, it gets up.
Another strange thing that we actually has a set of quite similar micro-services on the host#1 and the most of them do graceful reconnect and continue to work. It looks like a floating bug.

To be clear, the expected behavior is on of the following:
- do reconnect
- throw unhanded exception to initiate the app shutdown

The queue configuration (Autofac extension is used)
```csarp
builder.RegisterRebus(c => c
                .Logging(l => l.Use(new LoggerFactoryAdapter(loggerFactory)))
                .Serialization(s => s.UseNewtonsoftJson(new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }))
                .Transport(t => t.UseRabbitMq(settings.ConnectionString, settings.Name)
                                .ExchangeNames(settings.DirectExchange, settings.TopicsExchange)
                                .Prefetch(settings.Prefetch) //50
                                .Ssl(new SslSettings(true, settings.Ssl.ServerName, acceptablePolicyErrors: SslPolicyErrors.RemoteCertificateChainErrors)))
                .Options(e =>
                {
                    e.SetNumberOfWorkers(settings.NumberOfWorkers); //1 in the most services
                    e.SetMaxParallelism(settings.MaxParallelism);   //1 in the most services
                }));

Could you please advice what we can try to do?

yan-oreshchenkov commented 5 years ago

If I shutdown the docker container, the service writes the following into the log:

Application is shutting down...
info: Rebus.Bus.RebusBus[0]
      Setting number of workers to 0

which means the process is alive, but rebus listener is not some way.

mookid8000 commented 5 years ago

Which version of the RabbitMQ driver does your application use?

Rebus uses the RabbitMQ driver with automatic recovery enabled, so in theory the driver should recover on its own, when the connection has been lost.

If that happens in some of your endpoints, is that because they're using a newer version of driver, maybe?

yan-oreshchenkov commented 5 years ago

If I understand correctly, its 5.0.1: screenshot

All micro-services has the same codebase, so the driver is certainly the same.

mookid8000 commented 5 years ago

yeah, that's 5.0.1 😄 can you figure out which version your other endpoints are using?

yan-oreshchenkov commented 5 years ago

It's the same everywhere: 5.0.1, I've checked it twice. I explicitly upgraded it to 5.1.0, maybe it's a bug of the driver. I'll let you know about the results, but it takes a time to wait the next network 'storm'.

mookid8000 commented 5 years ago

Great, thanks 🙂 I hope it's a bug in the driver 🤞

yan-oreshchenkov commented 5 years ago

Nothing has happened yet :) But while the waiting for a network storm, we have added a simple health check endpoint to our services (based on .net core 2.2 embedded mechanism). It's very easy to extend with custom logic and it comes to my mind that it would be great if we could check the queue connectivity someway as a part of the healthcheck procedure. Could you advise a way to do it? Having something like a "current connection status for the queue" would be the great option.

mookid8000 commented 5 years ago

Rebus provides a customizer callback in the RabbitMQ configuration extensions, with which you get to modify (or completely replace!) the IConnectionFactory used by the transport.

It's used like this:

Configure.With(...)
    .Transport(t => t.UseRabbitMq(..., customizer: connectionFactory => {
        // maybe do something in here?

        return connectionFactory;
    }))
    .(...)
    .Start();

Maybe you can use that to hook something up that checks the connectivity...?

mookid8000 commented 5 years ago

Btw. I just realized that Rebus' RabbitMQ transport underwent quite a few changes recently... it's been out as a prerelase for a while now, but I've just released Rebus.RabbitMq 5.0.0, so you should probably use that.