Antti / rust-amqp

AMQP client in pure rust. Corresponds to rabbitmq spec.
MIT License
247 stars 45 forks source link

Consumer stops every <30 consumed messages #37

Open kopiczko opened 8 years ago

kopiczko commented 8 years ago

I'm running this code on cloudamqp with configuration: Cluster: cheerful-squirrel (change) RabbitMQ 3.5.7, Erlang 18.2

The program is able to consume up to 30 messages and I see that the queue disappears in the RabbitMQ dashboard, which probably means the connection is lost. After that the consumer hangs. When I set auto_delete to false in queue_declare I observe that Ready messages count grows while consumer hangs.

Here's the code, it mostly follows the provided example:

    let mut session: Session = match Session::open_url(url) {
        Ok(s) => s,
        Err(e) => panic!("Session::open_url: {}", e),
    };
    let mut channel: Channel = match session.open_channel(1) {
        Ok(s) => s,
        Err(e) => panic!("Session.open_channel: {}", e),
    };

    channel.exchange_declare(exchange,
                          "fanout",
                          false, // passive
                          false, // durable
                          false, // auto_delete
                          false, // internal
                          false, // nowait - hangs when set to true
                          Table::new())
        .err()
        .map(|e| panic!("Channel.exchange_declare: {}", e));
    channel.queue_declare(queue,
                       false, // passive
                       false, // durable
                       true, // exclusive
                       true, // auto_delete
                       false, // nowait - hangs when set to true
                       Table::new())
        .err()
        .map(|e| panic!("Channel.queue_declare: {}", e));
    channel.queue_bind(queue,
                    exchange,
                    "",
                    false, // nowait - hangs when set to true
                    Table::new())
        .err()
        .map(|e| panic!("Channel.queue_bind: {}", e));

    let consumer = Consumer { cnt: 0 };
    let consumer_name = match channel.basic_consume(consumer,
                                                    queue,
                                                    "",
                                                    false, // no_local
                                                    false, // no_ack
                                                    true, // exlusive
                                                    false, // nowait - hangs when set to true
                                                    Table::new()) {
        Ok(s) => s,
        Err(e) => panic!("Channel.basic_consume: {}", e),
    };

    println!("{} Starting consumer: {}", date_str(), consumer_name);
    channel.start_consuming();

    channel.close(200, "Bye")
        .err()
        .map(|e| panic!("Channel.close: {}", e));
    session.close(200, "Good Bye");

Consumer:

struct Consumer {
    cnt: u64
}

impl amqp::Consumer for Consumer {
    fn handle_delivery(&mut self,
                       channel: &mut Channel,
                       deliver: protocol::basic::Deliver,
                       _: protocol::basic::BasicProperties,
                       body: Vec<u8>) {
        self.cnt += 1;
        let s = str::from_utf8(&body).unwrap();
        println!("{} Consumed #{}: {}", date_str(), self.cnt, s);
        channel.basic_ack(deliver.delivery_tag, false)
            .err()
            .map(|e| panic!("Consumer.handle_delivery basic_ack: {}", e));
    }
}
kopiczko commented 8 years ago

I should have mentioned this is HA cluster.

Antti commented 8 years ago

Can you run the consumer with the logger initialized: env_logger::init().unwrap(); and RUST_LOG=debug. This will give you some idea of what's going on.