Antti / rust-amqp

AMQP client in pure rust. Corresponds to rabbitmq spec.
MIT License
247 stars 45 forks source link

Silent failure to ack some messages #39

Open Keruspe opened 8 years ago

Keruspe commented 8 years ago

I need to handle several message in parallel so I couldn't use a consumer (see #38), I'm thus using the GetIterator approach. I have some code that looks like this:

        loop {
            for get_result in channel.basic_get(&queue, false) {
                /* aquire a semaphore */
                thread::spawn(move || {
                  /* handle the message */
                  /* ack the message */
                  /* release the semapore */
                })
            }

            println!("No more messages, waiting for 60 seconds");
            thread::sleep(Duration::from_secs(60));
        }

My problem is quite simple, let's say that:

While there are messages in the queue, there are 6 unacked messages (the 5 in the active threads and one waiting for a free slot to start a new thread). When each thread has finished its jobs, the message is ack'ed properly and a new thread is started, a new messages is poped and is waiting for ack.

The problem appears when there are no more messages in the queue:

Now if some messages are published to the queue, we'll have 11 unacked instead of 6 as the 5 from earlier are never acked effectively.

Keruspe commented 8 years ago

Ok, I slightly reworked this to have one channel per thread, now I have something that looks like

    loop {
       /* acquire a semaphore */

        let mut channel = session.open_channel(chan).expect("Failed to create channel");
        chan += 1;

        thread::spawn(move || {
            for get_result in channel.basic_get(&handler.conf.backup_queue, false) {
                /* handle message */
                /* ack message */
            }
            println!("No more messages, waiting for 60 seconds");
            thread::sleep(Duration::from_secs(60));
           /* release semaphore */
        })
    }

What I'm seeing now is just so awkward I cannot think of any reasonable explanation. Here is what the scenario I wrote above becomes:

While no new message get in the queue, there are 6 unacked messages (the 5 in the active threads and one waiting for a free slot to start a new thread). I thus see 94 ready and 6 unacked. The 100 are processed and I still see 94 ready 6 unacked in the rabbitmq cluster (if I stop the software it gets back to 100 ready), but no message is consumed anymore as the 100 have already been handled. Now, if 10 more messages gets in the queue, when they'll all have been handled, I get 94 ready 16 unacked.

I will try to provide a full code example for the two versions.

Keruspe commented 8 years ago

I just pushed a test project here https://github.com/Keruspe/rust-amqp-bugs

You need to export AMQP_URL then run cargo run --bin publish-data to post some initial data. I did not really manage to reproduce anyting with get-then-thread, but in production there are lots of other factors (like the message handling can take up to several hours sometimes). You can run cargo run --bin thread-then-get though, and republish some data as above when there aren't any more. You'll see that some messages definitely don't get ack'ed. Sample output, after having pusblished data several times, then waited for the process to finish dealing with those:

keruspe@Lou ~/Projects/rust-amqp-bugs (git)-[master] % cargo run --bin get-then-thread    
    Finished debug [unoptimized + debuginfo] target(s) in 0.0 secs                        
     Running `target/debug/get-then-thread`                                               
New message: test 98                                                                      
New message: test 99                                                                      
New message: test 3                                                                       
New message: test 4                                                                       
New message: test 95                                                                      
New message: test 96                                                                      
New message: test 97                                                                      
New message: test 98                                                                      
New message: test 99                                                                      
New message: test 3                                                                       
New message: test 4                                                                       
New message: test 5                                                                       
New message: test 10                                                                      
New message: test 95                                                                      
New message: test 96                                                                      
New message: test 97                                                                      
New message: test 98                                                                      
New message: test 99                                                                      
New message: test 3                                                                       
New message: test 4                                                                       
New message: test 5                                                                       
New message: test 10                                                                      
New message: test 95                                                                      
New message: test 96                                                                      
New message: test 97                                                                      
New message: test 98                                                                      
New message: test 99                                                                      
No more messages, waiting for 6 seconds                                                   
^C                                                                                        
keruspe@Lou ~/Projects/rust-amqp-bugs (git)-[master] % cargo run --bin get-then-thread    
    Finished debug [unoptimized + debuginfo] target(s) in 0.0 secs                        
     Running `target/debug/get-then-thread`                                               
New message: test 3                                                                       
New message: test 4                                                                       
New message: test 96                                                                      
New message: test 97                                                                      
New message: test 98                                                                      
New message: test 99                                                                      
No more messages, waiting for 6 seconds                                                   
^C                                                                                        
keruspe@Lou ~/Projects/rust-amqp-bugs (git)-[master] % cargo run --bin get-then-thread    
    Finished debug [unoptimized + debuginfo] target(s) in 0.0 secs                        
     Running `target/debug/get-then-thread`                                               
New message: test 96                                                                      
New message: test 97                                                                      
New message: test 98                                                                      
New message: test 99                                                                      
No more messages, waiting for 6 seconds                                                   
^C                                                                                        
keruspe@Lou ~/Projects/rust-amqp-bugs (git)-[master] % cargo run --bin get-then-thread    
    Finished debug [unoptimized + debuginfo] target(s) in 0.0 secs                        
     Running `target/debug/get-then-thread`                                               
New message: test 96                                                                      
New message: test 97                                                                      
New message: test 98                                                                      
New message: test 99                                                                      
No more messages, waiting for 6 seconds                                                   
^C                                                                                        
keruspe@Lou ~/Projects/rust-amqp-bugs (git)-[master] % cargo run --bin get-then-thread    
    Finished debug [unoptimized + debuginfo] target(s) in 0.0 secs                        
     Running `target/debug/get-then-thread`                                               
New message: test 96                                                                      
New message: test 97                                                                      
New message: test 98                                                                      
New message: test 99                                                                      
No more messages, waiting for 6 seconds                                                   
^C                                                                                        
keruspe@Lou ~/Projects/rust-amqp-bugs (git)-[master] % cargo run --bin get-then-thread    
    Finished debug [unoptimized + debuginfo] target(s) in 0.0 secs                        
     Running `target/debug/get-then-thread`                                               
New message: test 96                                                                      
New message: test 97                                                                      
New message: test 98                                                                      
New message: test 99                                                                      
No more messages, waiting for 6 seconds                                                   

You can see that some random messages don't get ack'ed and the last 4 never get ack'ed