jhalterman / lyra

High availability RabbitMQ client
Apache License 2.0
263 stars 74 forks source link

Connection recovery hangs after EOFException is thrown #53

Closed karlney closed 6 years ago

karlney commented 8 years ago

Lyra version 0.5.2 (also tested 0.5.3-SNAPSHOT with the same result) amqp-client 3.5.5 Broker version 3.5.6

We have an automatic test suite of a java rabbit library built on top of lyra and amqp-client. We use docker and makes the junit test start and stop a real rabbit broker on the same machine that runs the tests. One of the tests simulates a broker crash by using the docker kill command on the broker image at the same time as we are consuming from a queue on the broker.

We use persistent messages so when the broker starts up again after a few seconds the messages are still in the queue BUT it seems that Lyra does not re-connect to the broker in all cases.

In my colleges machine, which is a Mac then Lyra correctly re-connects when the broker starts up again. But on my linux machine only one re-connection attempt is made then everything freezes.

The difference we can see is that on a Mac we get a Socket 'connection refused' exception but on linux it is an java.io.EOFException that is thrown.

This is a snippet from the logs in the test suite we have:

19:58:08.322 [main] INFO  c.m.d.test.RabbitConnectionTests - Killing the rabbitMQ broker
19:58:08.375 [AMQP Connection 127.0.0.1:5672] ERROR n.jodah.lyra.internal.ChannelHandler - Channel channel-1 on test-app-consume was closed unexpectedly
19:58:08.377 [AMQP Connection 127.0.0.1:5672] ERROR n.jodah.lyra.internal.ChannelHandler - Channel channel-1 on test-app-publish was closed unexpectedly
19:58:08.379 [AMQP Connection 127.0.0.1:5672] ERROR n.j.lyra.internal.ConnectionHandler - Connection test-app-publish was closed unexpectedly
19:58:08.385 [AMQP Connection 127.0.0.1:5672] ERROR n.j.lyra.internal.ConnectionHandler - Connection test-app-consume was closed unexpectedly
19:58:08.385 [lyra-recovery-1] INFO  n.j.lyra.internal.ConnectionHandler - Recovering connection test-app-publish to [localhost:5672]
19:58:08.387 [lyra-recovery-2] INFO  n.j.lyra.internal.ConnectionHandler - Recovering connection test-app-consume to [localhost:5672]
19:58:08.388 [rabbitmq-test-app-consume-consumer] ERROR c.m.d.c.impl.SingleChannelConsumer - The rabbit connection was unexpectedly disconnected. [ localPort=5672, queue="test-queue", consumerTag="test-app-consumer-1" ]
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:723) ~[amqp-client-3.5.5.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:713) ~[amqp-client-3.5.5.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:567) ~[amqp-client-3.5.5.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: java.io.EOFException: null
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.8.0_60]
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.5.5.jar:na]
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.5.5.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:536) ~[amqp-client-3.5.5.jar:na]
    ... 1 common frames omitted
karlney commented 8 years ago

On line 99 in ConnectionHandler

this if statement is never run because the argument is evaluated to false

                // Only fail on non-closures since closures will trigger a new recovery
                if (!Exceptions.isCausedByConnectionClosure(e)) {
                   ...
                }

If I print the stack trace just before that if statement I have this:

java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:350)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:648)
    at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:244)
    at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:237)
    at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:51)
    at net.jodah.lyra.internal.ConnectionHandler.createConnection(ConnectionHandler.java:237)
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:275)
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:42)
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:96)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:37)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:367)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:293)
    ... 11 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:536)
    ... 1 more

And the ShutdownSignalException exception 'in the middle' have _hardError= true and _initiatedByApplication=false _reason = null

At this point I have no clue on how to dig deeper unfortunately, but if we can't trust this library in all error cases we might have o roll our own which would be a shame :-(

You might want to consider adding a little more logging in this scenario also.

I also looked at https://github.com/jhalterman/lyra/issues/52 but I don't know if this is caused by the same thing or not so I created a new issue just in case.

This is also 100% reproducible on my machine and always seem to works on the Mac machine. If you want we can help you with setting up the junit test env so you can try to reproduce it yourself.

jhalterman commented 8 years ago

Hi @karlney Interesting difference between platforms. That's one of the problems I've faced is which errors should be considered recoverable and which shouldn't, since the exceptions you'll see for the same failure can by platform. If you have a reproducer for this that you could share, that would be great.

In the meantime, consider that you can get and modify the set of exceptions that Lyra will attempt to recover from which should resolve this situation for you:

http://jodah.net/lyra/javadoc/net/jodah/lyra/config/Config.html#getRecoverableExceptions--

As for what the appropriate solution should be... I'm not sure. Basically, we could add EOFException as one of the default exceptions to recover from. I'm just not sure how appropriate that is given the odd nature of this failure. Thoughts?

kdekooter commented 7 years ago

@jhalterman your remark about getRecoverableExceptions saved the day for me. I noticed I had to add all of the exceptions in the stacktrace to make it work: IOException, ShutdownSignalException and EOFException.

tdroenner commented 7 years ago

Got this reproducible. Use docker-compose to start a rabbitmq server: docker-compose.yml

version: '3.1'

networks:
  default:
    driver: "bridge"

services:
  # Rabbitmq
  rabbitmqlocal:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    networks:
      default:
        aliases:
          - rabbitmqlocal

Lyra config:

Config config = new Config()
                    .withRecoveryPolicy(RecoveryPolicies.recoverAlways())
                    .withRetryPolicy(RetryPolicies.retryAlways())
                    .withExchangeRecovery(true)
                    .withQueueRecovery(true)
                    .withConsumerRecovery(true);

            config.getRecoverableExceptions().add(IOException.class);
            config.getRecoverableExceptions().add(ShutdownSignalException.class);
            config.getRecoverableExceptions().add(EOFException.class);
            config.getRecoverableExceptions().add(NoRouteToHostException.class);
ShutdownListener listener = new ShutdownListener() {
                @Override
                public void shutdownCompleted(ShutdownSignalException sse) {
                    logger.info("Shutdown: " + sse.getMessage() + " ---- \n" + sse.toString());
                    //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
                }
            };

            ConnectionOptions options = new ConnectionOptions().withHost("localhost");
            Connection connection = Connections.create(options, config);
            connection.addShutdownListener(listener);
            //Connection connection = factory.newConnection();

            Channel channel1 = connection.createChannel();

            channel1.queueDeclare(QUEUE_NAME, false, false, false, null);
            Consumer consumer1 = new DefaultConsumer(channel1) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    String routingKey = envelope.getRoutingKey();
                    String contentType = properties.getContentType();
                    long deliveryTag = envelope.getDeliveryTag();

                    // (process the message components here ...)
                    logger.info(" [x] Received '" + message
                            + "' routingKey: " + routingKey
                            + " --- contentType: " + contentType
                            + " ---- " + envelope.toString()
                            + " ---- " + properties.toString());
                    channel1.basicAck(deliveryTag, false);
                }
            }

Do a docker restart for the rabbit server. There goes the error:

10998 [lyra-recovery-1] INFO  net.jodah.lyra.internal.ConnectionHandler  - Recovering connection cxn-1 to [localhost:5672]
21004 [AMQP Connection 127.0.0.1:5672] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler  - Caught an exception during connection recovery!
java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:116)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:112)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:360)
    at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:62)
    at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConnection(AutorecoveringConnection.java:531)
    at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:494)
    at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:53)
    at com.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.recoveryCanBegin(AutorecoveringConnection.java:435)
    at com.rabbitmq.client.impl.AMQConnection.notifyRecoveryCanBeginListeners(AMQConnection.java:700)
    at com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:694)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:584)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:398)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:304)
    ... 9 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:578)
    ... 1 more

Plattform: Docker for Windows using Windows 10. Docker is running on a Hyper-V Linux instance. Lyra 0.5.4 Rabbit-Client: 4.1.0

michaelklishin commented 6 years ago

Answered in https://github.com/jhalterman/lyra/issues/53#issuecomment-278593159. Lyra has a configurable list of exceptions to try. If there's interest in revisiting the default list, please file a separate issue (or submit a PR).