jhalterman / lyra

High availability RabbitMQ client
Apache License 2.0
263 stars 74 forks source link

Consumers never recover if withRequestedHeartbeat() is used #44

Closed Gyllsdorff closed 7 years ago

Gyllsdorff commented 9 years ago

If we set a request heartbeat > 0 and the connection goes down Lyra will only try to recover for the heartbeat duration.

In the example below handleShutdownSignal will be called 10 seconds after the network goes down, once that have happened Lyra seems to stop trying.

If the network goes up after 7 seconds the consumer recovers without any problem. If we remove the heartbeat the consumer is able to recover after 30 seconds.

Config config = new Config()
        .withConnectionRecoveryPolicy(RecoveryPolicies.recoverAlways())
        .withConsumerRecovery(true)
        .withChannelRetryPolicy(RetryPolicies.retryAlways())
        .withRecoveryPolicy(RecoveryPolicies.recoverAlways());

final ConnectionOptions connectionOptions = new ConnectionOptions()
        .withUsername(username)
        .withPassword(password)
        .withHost(url)
        .withRequestedHeartbeat(Duration.seconds(10))
        .withVirtualHost(virtualHost);

try {
    final ConfigurableConnection connection = Connections.create(connectionOptions, config);
    final Channel channel = connection.createChannel();
    channel.basicConsume("foo", new DefaultConsumer(channel) {
        @Override
        public void handleCancel(final String consumerTag) throws IOException {
            super.handleCancel(consumerTag);
            log.info("handleCancel");
        }

        @Override
        public void handleCancelOk(final String consumerTag) {
            super.handleCancelOk(consumerTag);
            log.info("handleCancelOk");
        }

        @Override
        public void handleConsumeOk(final String consumerTag) {
            super.handleConsumeOk(consumerTag);
            log.info("handleConsumeOk");
        }

        @Override
        public void handleDelivery(final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body) throws IOException {
            final String message = new String(body);
            onCloudMessageHandler.onCloudMessage(message);
            log.info("Got a message");
        }

        @Override
        public void handleRecoverOk(final String consumerTag) {
            super.handleRecoverOk(consumerTag);
            log.info("handleRecoverOk");
        }

        @Override
        public void handleShutdownSignal(final String consumerTag, final ShutdownSignalException sig) {
            super.handleShutdownSignal(consumerTag, sig);
            log.info("handleShutdownSignal");
        }
    });
} catch (IOException e) {
    e.printStackTrace();
}
jhalterman commented 9 years ago

Not able to reproduce this so far using a fresh rabbit 3.4.1 container. Do you have any more details about how you're reproducing this?

michaelklishin commented 7 years ago

Not enough information and @jhalterman could not reproduce => closing.