minghuaw / fe2o3-amqp

A rust implementation of the AMQP1.0 protocol based on serde and tokio.
MIT License
58 stars 7 forks source link

How to exchange AMQP 1.0 messages using fe2o3-amqp and RabbitMQ with Topic Exchange #195

Closed VLorz closed 1 year ago

VLorz commented 1 year ago

I'm trying to use the fe2o3-amqp crate (Rust) for receiving and sending messages through a RabbitMQ broker configured with the AMQP 1.0 protocol.

How do I configure the Connection, Session and Sender/Receiver for publishing/subscribing to a topic exchange?

This is what I've done so far. For topic exchange in RabbitMQ, I've configured an Exchange (named MyExchange, Durable, Type: Topic), a Queue (named MyQueue, type: Classic, Durable), and a Binding from MyExchange to MyQueue with routing key my.topic.

Using a Python script with pika, on Python 3.9.2, I can send messages to my.topic that can be viewed enqueued in MyQueue. I'm not sure here pika is using AMQP version 1.0, but at least it confirms the exchange-routing-queue is working. The relevant part in python is as simple as follows, the rest is just glue (simply creating channel and connection):

channel.basic_publish(exchange='MyExchange', routing_key='my.topic', body='Hello topic!')

This is what most examples in documentation do, except examples don't use the Sender::builder():

    let mut connection = create_connection(
        "a-sender",
        "blah-blah",
    ).await?;

    let session = Session::begin(&mut connection).await?;

    let sender = Sender::builder()
        .name("rust-sender-link-1")
        .target("some-queue-name")
        .attach(&mut session)
        .await
        .unwrap();

If I use MyQueue as "my.topic" it fails with error code SenderAttachError. Any other queue name just creates a new queue.

So, how do I specify the exchange and route?

BR. p.d. I've posted this same question at Stackoverflow: https://stackoverflow.com/questions/75430056/how-to-exchange-amqp-1-0-messages-using-fe2o3-amqp-and-rabbitmq-with-topic-excha

minghuaw commented 1 year ago

@VLorz I think rabbitmq's AMQP 1.0 plugin uses a routing and addressing that is slightly different from the 0.9.1 version (see https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbitmq_amqp1_0/README.md).

There is another relevant question on stackoverflow that uses a c# client (https://stackoverflow.com/questions/69800475/unable-to-get-topic-exchange-to-work-in-rabbitmq-amqp-1-0). Below is a copy of the accepted answer

i think you mix two ways of doing it. Either you publish to address "/topic/test" -where test is your routingkey OR you publish to "/exchange/test.exchange" and set the Subject-property to "test".

Both works. if you use the "/topic/"-prefix in your address you are going through the default "amq.topic"-exchange and not your own "test-exchange".

made sense? more information in the "Routing and Addressing"-secion here: https://github.com/rabbitmq/rabbitmq-amqp1.0

So I think you should try using "/topic/test" as your target address (ie. .target("/topic/test") method for the sender, and this will be .source("/topic/test") for the receiver), where "test" is your routing key

minghuaw commented 1 year ago

There are actually more rust implementations of AMQP 0.9.1 protocol than the 1.0 protocol. Just to name two:

Because 1.0 protocol is actually very different from 0.9.1, I think you may get better support for rabbitmq if you use one of those AMQP 0.9.1 clients.

VLorz commented 1 year ago

@minghuaw I'm aware 0.9.1 and 1.0 protocol versions are more like two different protocols that two different versions, but I'm still a newbie on these protocol families. Final application protocol must be version 1.0. RabbitMQ just seemed convenient for development during early stages. I'll try as you comment above and give you feddback about how it goes. If you can recommend any other protocol version 1.0 compliant broker I can use for development I'll very much appreciate it. BR

minghuaw commented 1 year ago

@VLorz There's a curated list of AMQP 1.0 related resources (including AMQP 1.0 compliant brokers) maintained by someone working at Microsoft (https://github.com/xinchen10/awesome-amqp).

I think there are docker images for activemq classic and activemq-artemis ("docker.io/vromero/activemq-artemis") that you can use to quickly spin up a AMQP 1.0 compliant broker.

minghuaw commented 1 year ago

@VLorz I have just tested the routing and addressing scheme mentioned above on my local machine and it seems to work. I have tested the default topic exchange (ie. "amq.topic") and two other custom topic exchanges ("test.topic" and "test") and the following addresses worked

// "amq.topic"
let sender = Sender::attach(&mut session, "amq_topic_sender", "/topic/amq").await.unwrap();
let receiver = Receiver::attach(&mut session, "amq_topic_receiver", "/topic/amq").await.unwrap();

// "test.topic"
let sender = Sender::attach(&mut session, "amq_topic_sender", "/topic/test").await.unwrap();
let receiver = Receiver::attach(&mut session, "amq_topic_receiver", "/topic/test").await.unwrap();

// "test2"
let sender = Sender::attach(&mut session, "amq_topic_sender", "/topic/test2").await.unwrap();
let receiver = Receiver::attach(&mut session, "amq_topic_receiver", "/topic/test2").await.unwrap();
VLorz commented 1 year ago

@minghuaw, thanks a lot for your time and comments. It worked following your tips. One configuration step I was missing was setting user permissions, terrible fault indeed. Now I'll set everything up in my project. Thanks again!