retupmoca / P6-Net-AMQP

MIT License
6 stars 4 forks source link

Consumed incorrect amount of times and from wrong queue #23

Open bbkr opened 4 months ago

bbkr commented 4 months ago

I have following code consuming from 2 queues:

use Net::AMQP;

my $handle = Net::AMQP.new( ... );

my $connection = $handle.connect( ).result;
my $channel = $handle.open-channel( 1 ).result;

for 'q1', 'q2' -> $queue-name {

    my $queue = $channel.queue( $queue-name ).result;

    $queue.message-supply.tap(
        {
            say 'message ' ~ .body.decode ~ ' in queue ' ~ .queue;
        }
    );

    $queue.consume();
}

await $connection;

When I run it and produce message foo only once to q1 it prints:

message foo in queue q1
message foo in queue q2

Why is that? I don't understand not only why second message was received but also why message object points to wrong queue object.

bbkr commented 4 months ago

I get the same crooked result in reactive way:

use Net::AMQP;

my $handle = Net::AMQP.new( ... );

react {
    whenever $handle.connect( ) -> $connection {
        whenever $handle.open-channel( 1 ) -> $channel {
            for 'q1', 'q2' -> $queue-name {
                whenever $channel.queue( $queue-name ) -> $queue {
                    whenever $queue.message-supply -> $message {
                        say 'message ' ~ $message.body.decode ~ ' in queue ' ~ $message.queue;
                    }
                    $queue.consume();
                }
            }
        }
    }
}
bbkr commented 4 months ago

And it gets weirder. I declared more queues - 'q1', 'q2', 'q3', 'q4', 'q5'. Message published to q1 is received twice (as shown above). But message published to q3 is delivered once and shown as arriving from... q5.

bbkr commented 4 months ago

I checked source code and I'm not sure if my $delivery-lock = Lock.new; in Queue::message-supply is correct. Looks like this should be variable provided by Channel. Because in current form each Queue has its own lock, not protecting correct order of $!header / $!body supplies tapping.

My conclusion may not be correct, I haven't debugged it deeply. But fact that each Queue creates its own localized Lock only during Supplier construction feels bad.

jonathanstowe commented 4 months ago

Really you want to use declare-queue rather than just queue which does work as expected. However it appears that there is a race condition in consume whereby the queues wind up with the same consumer-tag, you can demonstrate this by putting a sleep 1 before the .consume. The problem arises because the consume-ok method from RabbitMQ doesn't specify the queue name.

I'll find a way of fixing this.

jonathanstowe commented 4 months ago

The part that I overlooked completely was that .consume returns a Promise (like pretty much all of the methods,) and really the code should await on that, which does indeed solve the problem.

I'll still make a fix because mostly people don't need to get the result of the .consume (it's the consumer-tag,) and it the behaviour is rather surprising.

bbkr commented 4 months ago

Indeed await $queue.consume did the trick. Thank you for quick response (and for Raku module itself).