mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Routing key for direct exchange queue seems to get ignored #633

Closed kimdre closed 6 days ago

kimdre commented 1 week ago

Hi, I'm not quite sure if I'm doing something wrong or this is a bug.

I have a publisher that publishes messages to a queue via an exchange to type direct with a specifc ID as the routing key. My consumer also uses this ID as the routing key to get messages of that queue. For some reason this routing key however gets ignored and instead of the consumer that uses this routing key, the message gets delivered to the next available consumer that consumes from this queue regardless of its routing key.

My consumer connects like this:

async def connect(self) -> None:
    """
    Establish connection with RabbitMQ

    :return: None
    """
    logger.info('Connecting Event Publisher to RabbitMQ...')
    try:
        self.connection = await connect_robust(rabbitmq.url)
        self.channel = await self.connection.channel(publisher_confirms=True)
        self.queue = await self.channel.declare_queue(name=self.queue_name, durable=True)
        self.exchange = await self.channel.declare_exchange(name=self.exchange_name, type=ExchangeType.DIRECT, durable=True)
        logger.debug('Successfully connected Event Publisher!')
    except AMQPConnectionError as e:
        logger.error("Connection to RabbitMQ failed for Event Publisher.")
        raise e

async def bind_queue(self, routing_key: str) -> None:
    await self.queue.bind(self.exchange, routing_key=routing_key)

This is how the publisher writes new messages to the queue

...
message = Message(
    body=json.dumps(message).encode(),
    delivery_mode=DeliveryMode.PERSISTENT
)

await self.exchange.publish(
    message,
    routing_key=str(self.studio_id)
)

And the consumer connects and consumes like this:

async def consume_messages(self):
    """
    Consume messages from RabbitMQ

    :return: None
    """

    self.channel = await self.connection.channel()
    await self.channel.set_qos(prefetch_count=self.prefetch_count)

    self.exchange = await self.channel.declare_exchange(self.exchange_name, ExchangeType.DIRECT, durable=True)

    self.queue = await self.channel.declare_queue(name=self.queue_name, durable=True)
    await self.queue.bind(self.exchange, routing_key=str(self.studio_id))

    logger.info(f"[{self.studio_id}] Consumer successfully started. Ready to consume and process messages.")

    try:
        await self.queue.consume(self.on_message)
    except (CancelledError, KeyboardInterrupt):
        logger.debug('Stopped consumer routine.')
        await self.queue.cancel(consumer_tag=str(self.studio_id))
        await self.channel.close()
        logger.debug("Canceled queue and stopped channel.")

By checking a message in the queue manually via the Admin UI, I can see that the messages have the correct routing key, however the consumer does not care about the routing key. So even if I change the routing key on the conumer to something different like "test" the message still gets consumes by it.

Am I doing something wrong here?

I use aio-pika v9.4.1 with python 3.12.4

Update

I just tested it with a Topic exchange instead of Direct exchange and get the same result.

mosquito commented 1 week ago

Try capturing traffic dump via tcpdump from the publisher and the consumer. Check if the routing key is present in the packet for sending. If it is, the library correctly sends information to the server. If not, attach your dump to this issue.

kimdre commented 1 week ago

Try capturing traffic dump via tcpdump from the publisher and the consumer. Check if the routing key is present in the packet for sending. If it is, the library correctly sends information to the server. If not, attach your dump to this issue.

This is my tcpdump from the publisher and consumer with direct exchange. The publisher routing key in this case was set to 1210003760, the consumer routing key was set to test. The exchange and queue are called events.

Publisher:

tcpdump: verbose output suppressed, use -v[v]... for full protocol decode
listening on eth0, link-type EN10MB (Ethernet), snapshot length 262144 bytes
21:53:29.247274 IP 172.23.0.7.41220 > 172.23.0.4.5672: Flags [P.], seq 504595676:504595948, ack 716923193, win 249, options [nop,nop,TS val 2314515233 ecr 2392701331], length 272
E..D.)@.@..Q...........(....*.a9....Yp.....
...!..........#.<.(...events
1210003760........5.<..........8....... 96be1fbcfcc8414183b455a82d1cb259........{"timestamp": 1719081076374, "uuid": "3fa85f64-5717-4562-b3fc-2c963f66afa6", "entity_id": 7893459, "type": "CONTRACT_UPDATED", "content": {"contractId": 12345}}.
21:53:29.247308 IP 172.23.0.4.5672 > 172.23.0.7.41220: Flags [.], ack 272, win 249, options [nop,nop,TS val 2392717859 ecr 2314515233], length 0
E..4..@.@............(..*.a9........X`.....
...#...!
21:53:29.271022 IP 172.23.0.4.5672 > 172.23.0.7.41220: Flags [P.], seq 1:22, ack 272, win 249, options [nop,nop,TS val 2392717883 ecr 2314515233], length 21
E..I..@.@..p.........(..*.a9........Xu.....
...;...!........<.P..........
21:53:29.271028 IP 172.23.0.7.41220 > 172.23.0.4.5672: Flags [.], ack 22, win 249, options [nop,nop,TS val 2314515257 ecr 2392717883], length 0
E..4.*@.@..`...........(....*.aN....X`.....
...9...;
^C
4 packets captured
4 packets received by filter
0 packets dropped by kernel

Consumer

tcpdump: verbose output suppressed, use -v[v]... for full protocol decode
listening on eth0, link-type EN10MB (Ethernet), snapshot length 262144 bytes
21:53:29.248778 IP 172.23.0.4.5672 > 172.23.0.6.40744: Flags [P.], seq 3647223052:3647223369, ack 1815325837, win 249, options [nop,nop,TS val 1664289984 ecr 3479127517], length 317
E..qU.@.@............(.(.d5.l3......Y......
c3..._E.......P.<.<&ctag1.2815812f188e46e38cd44b70d4a8dd30..........events
1210003760.......5.<..........8....... 96be1fbcfcc8414183b455a82d1cb259........{"timestamp": 1719081076374, "uuid": "3fa85f64-5717-4562-b3fc-2c963f66afa6", "entity_id": 7893459, "type": "CONTRACT_UPDATED", "content": {"contractId": 12345}}.
21:53:29.248798 IP 172.23.0.6.40744 > 172.23.0.4.5672: Flags [.], ack 317, win 249, options [nop,nop,TS val 3479148094 ecr 1664289984], length 0
E..4j.@.@.w..........(.(l3...d6I....X_.....
._.>c3..
21:53:29.425046 IP 172.23.0.6.40744 > 172.23.0.4.5672: Flags [P.], seq 1:22, ack 317, win 249, options [nop,nop,TS val 3479148270 ecr 1664289984], length 21
E..Ij.@.@.w..........(.(l3...d6I....Xt.....
._..c3..........<.Z..........
21:53:29.425066 IP 172.23.0.4.5672 > 172.23.0.6.40744: Flags [.], ack 22, win 249, options [nop,nop,TS val 1664290160 ecr 3479148270], length 0
E..4U.@.@............(.(.d6Il3......X_.....
c3.p._..
^C
4 packets captured
4 packets received by filter
0 packets dropped by kernel

As you can see, the consumer gets the message even though it has a different routing key.

kimdre commented 1 week ago

@mosquito Do you know a solution to this problem? I am willing to pay for help in this matter. 🙂

kimdre commented 6 days ago

Sorry I think I misunderstood the concept of routing keys. I thought they work in both directions but it seems they exist to publish messages to a specifc queue and not to also retrieve messages with a specifc routing key from a queue.