minghuaw / fe2o3-amqp

A rust implementation of the AMQP1.0 protocol based on serde and tokio.
MIT License
58 stars 7 forks source link

How to work with qpid management framework? #196

Closed elsasslegend closed 1 year ago

elsasslegend commented 1 year ago

Hi,

For some reasons we have to use QPid broker at work using AMQP 1.0 Protocol. This broker is unfortunately extremely poor documented. I must be able to create queues and exchanges without using qpid-config tool (written in python), so the only way to solve that is by using what they called "Qpid Management Framework", that is a special topic for sending specific configuration messages. The format is quite awful and described here and here The implementation of qpid-config tool can be found here

From what I've read, it seems to be mandatory to pass some special keys as properties :

  request = Message(reply_to=reply_to, content=content)
  request.properties["x-amqp-0-10.app-id"] = "qmf2"
  request.properties["qmf.opcode"] = "_method_request"

Can also be seen here : Message(subject='broker', reply_to='qmf.default.topic/direct.8702f596-b112-427d-b93e-7e0ae28f2ae8;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}', correlation_id='1', properties={'qmf.opcode': '_method_request', 'x-amqp-0-10.app-id': 'qmf2', 'method': 'request'}, content={'_object_id': {'_object_name': 'org.apache.qpid.broker:broker:amqp-broker'}, '_method_name': 'create', '_arguments': {'strict': True, 'type': 'queue', 'name': u'test', 'properties': {}}})

The problem I have, is that I cannot add custom properties to the "properties" map (or I didn't find a way to do that). Because of that, the broker seems not to be able to understand any message management commands. I'm already using your great lib in my project today, which is the only one that really fully works for AMQP 1.0 protocol.

Would it be possible to add some kind of "extra" properties, or should I use embed another lib for doing that (maybe one that is compatible with AMQP 0.10, only for this scenario) ?

Thanks :)

minghuaw commented 1 year ago

@djflex68 If I understood this correctly, what you are trying to do is to create a node (queue/topic) on demand.

As you mentioned, the particular message format comes from AMQP 0.10 and are not defined in AMQP 1.0 at all. However, it seems like red hat has a vague description on how to achieve this in AMQP 1.0 protocol (https://access.redhat.com/documentation/en-us/red_hat_enterprise_mrg/3/html/messaging_programming_reference/on-demand_create_workaround_for_legacy_applications).

I haven't got time to set up a test environment to test the followings, but below are what I think the documentation linked above is trying to to do

If the addressed node is to be created on demand - either through use of '#' as the name,

I am not too sure what "the name" really means here, but my guess is the following

let sender = Sender::attach(&mut session, "sender-name", "#").await.unwrap();

or through the create policy - the node properties are sent as dynamic-node-properties on the source or target.

This sounds similar to the dynamic sender example (https://github.com/minghuaw/fe2o3-amqp/blob/main/examples/dynamic_sender/src/main.rs). The linked example is a very simple one. Below is probably closer to what QMF needs.

// This will ask the broker to dynamically create a node, 
// and the sender/receiver should not supply an address
// as it will be determined by the broker

// The `dynamic_node_properties` is a map of key value pairs
// I am not sure if and how all QMF properties values could be mapped into an AMQP 1.0 `Value`
let mut properties = OrderedMap::new();
properties.insert(Symbol::from("key"), Value::from("value"));
let target = Target::builder()
        .dynamic(true)
        .dynamic_node_properties(properties)
        .build();
let sender = Sender::builder()
        .name("dynamic-sender")
        .target(target)
        .attach(&mut session)
        .await
        .unwrap();

// The address can then be obtained by
let address = sender.target().as_ref().and_then(|target| target.address.as_ref()).unwrap();

For receiver, you will be setting the .dynamic() and .dynamic_node_properties() on the source instead of the target.

Hopefully this helps. Feel free to let me know if this works or if you need more help with this.

elsasslegend commented 1 year ago

Many thanks for your help, I'll have a look at office tomorrow and let you know ! You gave me good ideas to try !

elsasslegend commented 1 year ago

Unfortunately I've tried your mentioned suggestions, but although the request seems to be sent, it does not have any effect on the broker. I've tried several things including :

 async fn create_sender(&mut self, mut session: &mut SessionHandle<()>) -> Sender {      
        let mut properties = OrderedMap::new();
        properties.insert(Symbol::from("x-amqp-0-10.app-id"), Value::from("qmf2"));
        properties.insert(Symbol::from("app-id"), Value::from("qmf2"));
        properties.insert(Symbol::from("qmf.opcode"), Value::from("_method_request"));
        properties.insert(Symbol::from("method"), Value::from("request"));

            // , // link name
            // "qmf.default.direct",              // target address

        let target = Target::builder()
        .dynamic_node_properties(properties)
        .build();

        // The exchange we bind on has the same name as the microservice itself (AMQP)
        Sender::builder()
        .name("#")
        .target(target)
        .attach(&mut session)                           // Session
        .await
        .unwrap()

        //return Sender::attach(&mut session, "#", "qmf.default.direct").await.unwrap();

    }

    pub async fn send_configuration(mut self) {
        let mut connection = Connection::open(
            "qpid_configuration_manager_link",
            "amqp://broker-address",
        )
        .await
        .unwrap();

        let mut session = Session::begin(&mut connection).await.unwrap();
        let mut sender = self.create_sender(&mut session).await;

        let config_message_contents = QpidConfigurationMessage::new();

        // Send message
        let message = Message::builder()
            // .properties(
            //     Properties::builder()
            //         .subject("broker")
            //         //.reply_to("qmf.default.topic/direct.8702f596-b112-427d-b93e-7e0ae28f2ae8;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}")
            //         .build(),
            // )
            // .application_properties(
            //     ApplicationProperties::builder()
            //         .insert("x-amqp-0-10.app-id", "qmf2")
            //         .insert("app-id", "qmf2")
            //         .insert("qmf.opcode", "_method_request")
            //         // .insert("reply-to", "reply-queue")
            //         .build(),
            // )
            .value(config_message_contents)
            .build();

        let outcome = sender
            .send(message)
            .await
            .map_err(|err| {
                println!("{}", err);
            })
            .unwrap();

        if outcome.is_accepted() {
            println!("OK!")

        }

        sender.close().await.unwrap();
        session.end().await.unwrap();
        connection.close().await.unwrap();

With QpidConfigurationMessage declared as :

use serde::{Serialize, Deserialize};
use serde_json::json;

#[derive(Debug, Serialize, Deserialize)]
pub struct QpidConfigurationMessage<'a> {
    _object_id: serde_json::Value,
    _method_name: &'a str,
    _arguments: serde_json::Value,
    #[serde(rename(serialize = "x-amqp-0-10.app-id", deserialize = "x_app_id"))]
    x_app_id: &'static str,
    #[serde(rename(serialize = "app-id", deserialize = "app_id"))]
    app_id: &'static str,
    #[serde(rename(serialize = "qmf.opcode", deserialize = "qmf_code"))]
    qmf_code: &'static str,
    method: &'static str
}

impl<'a> QpidConfigurationMessage<'a> {
    pub fn new() -> Self {
        Self {
            _object_id: json!({
                "_object_name": "org.apache.qpid.broker:broker:amqp-broker"
            }),
            _method_name: "create",
            _arguments: json!({
                        "type":"queue",
                        "strict": true,
                        "name":"my-queue",
                        "properties": {
                            "durable": true
                        }
            }),
            x_app_id: "qmf2",
            app_id: "qmf2",
            qmf_code: "_method_request",
            method: "request"
            // reply_to: "reply-queue"
        }
    }
}

Commented lines are things I've tried but that do not work.. It's also unclear if I should send the message as a string value or as a json serialized value, since theren't any useful infos on that. Still digging inside the QMF framework in C++ for now...

elsasslegend commented 1 year ago

I've forgotten "dynamic(true)" on target builder :

        let target = Target::builder()
        .dynamic(true)
        .dynamic_node_properties(properties)
        .build();

but now I'm getting another error :

thread 'tokio-runtime-worker' panicked at 'called Result::unwrap() on an Err value: DynamicNodePropertiesIsSomeWhenDynamicIsFalse', (...) qpid_configuration_manager.rs:46:10

minghuaw commented 1 year ago

Is there a docker image that i can use as a test environment? @djflex68 i can probably try something

minghuaw commented 1 year ago

I found a doc on AMQP 1.0 support for qpid-cpp (https://github.com/apache/qpid-cpp/blob/main/docs/amqp-1.0.txt). However, it is still quite vague, and I am still not clear on how to map QMF to AMQP 1.0. I will probably have to take a look at their cpp code.

minghuaw commented 1 year ago

@djflex68 It seems like it uses the ApplicationProperties field in AMQP 1.0 (see here). Though the python code (which is a wrapper over c) named the field properties, it's actually the application-properties defined in the AMQP 1.0 spec, which is indicated by its documentation as well as the c code (there is no doc but the all AMQP 1.0 properties are defined as individual fields, like the ones defined above line 58)

minghuaw commented 1 year ago

@djflex68 Do you mind sharing a working configuration for qpid broker? I tried a docker image (https://hub.docker.com/r/okoko/qpid-cpp) but couldn't get it to speak AMQP 1.0

elsasslegend commented 1 year ago

Hi @minghuaw, i'll share that with you Tomorrow, I'm not at office right now. We'll make a container for your ready

elsasslegend commented 1 year ago

Hello again, My colleague is preparing a docker container with a working QPID broker on it. It will be available in a couple of hours for testing. Note that you can enable logs by setting some environment variables in the docker file : QPID_LOG_ENABLE=trace+ QPID_LOG_TO_FILE=/var/log/qpidout.log

I've tried your suggestions, by using application properties instead of trying properties. My code looks like that :

async fn create_sender(&mut self, mut session: &mut SessionHandle<()>) -> Sender {
return Sender::attach(&mut session, "#", "qmf.default.direct")
            .await
            .unwrap();
}

 pub async fn send_configuration(mut self) {
        let mut connection = Connection::open(
            "qpid_configuration_manager_link",
            "amqp://distantip:5672",
        )
        .await
        .unwrap();

        let mut session = Session::begin(&mut connection).await.unwrap();
        let mut sender = self.create_sender(&mut session).await;

        let config_message_contents = QpidConfigurationMessage::new();

        //Send message
        let message = Message::builder()
            .application_properties(
                ApplicationProperties::builder()
                    .insert("app-id", "qmf2")
                    .insert("qmf.opcode", "_method_request")
                      .insert("x-amqp-0-10.app-id", "qmf2")
        .insert("method", "request")
                    .build(),
            )
            .value(config_message_contents)
            .build();

        let outcome = sender
            .send(message)

            .await
            .map_err(|err| {
                println!("{}", err);
            })
            .unwrap();

        if outcome.is_accepted() {
            println!("OK!")
       }
  }

I see the message coming on the broker in qpid traces, but it's not interpreted as a Management Message, which is the case when using the python CLI tool. Something is missing to make the broker "aware" that it's a management message.

Many thanks for your help until now 🙏

elsasslegend commented 1 year ago

Hi @minghuaw ,

After testing several combinations the whole day, it seems that the only one that produces interesting traces on the broker is by using dynamic properties :

 let mut properties = OrderedMap::new();
        properties.insert(Symbol::from("x-amqp-0-10.app-id"), Value::from("qmf2"));
        properties.insert(Symbol::from("app-id"), Value::from("qmf2"));
        properties.insert(Symbol::from("qmf.opcode"), Value::from("_method_request"));
        properties.insert(Symbol::from("method"), Value::from("request"));
        properties.insert(Symbol::from("subject"), Value::from("broker"));
        properties.insert(Symbol::from("correlation-id"), Value::from(1));
        properties.insert(Symbol::from("content-type"),  Value::from("amqp/map"));
        properties.insert(Symbol::from("routing_key"), Value::from("broker"));

        // properties.insert(Symbol::from("content"), Value::from("{_object_id': {'_object_name': 'org.apache.qpid.broker:broker:amqp-broker'}, '_method_name': 'create', '_arguments': {'strict': True, 'type': 'queue', 'name': u'test', 'properties': {}}}"));
        properties.insert(Symbol::from("reply_to"), Value::from("qmf.default.topic/direct.8702f596-b112-427d-b93e-7e0ae28f2ae8;{node:{type:topic}, link:{x-declare:{exclusive:True}}}"));
        // properties.insert(Symbol::from("reply-to"), Value::from("qmf.default.topic/direct.8702f596-b112-427d-b93e-7e0ae28f2ae8;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}"));

        // //     // , // link name
        // //     // "qmf.default.direct",              // target address

        let mut target = Target::builder()
        //.address("qmf.default.direct")
        .dynamic(true)
        .dynamic_node_properties(properties)
        .build();

        // target.dynamic_node_properties.insert(properties);

        // // The exchange we bind on has the same name as the microservice itself (AMQP)
        return Sender::builder()
        .name("#")
        .target(target)
        .attach(&mut session)                           // Session
        .await
        .unwrap();

The broker seems to start interpreting those properties :

023-02-27 15:55:18 [Broker] debug Processing node property x-amqp-0-10.app-id = qmf2
2023-02-27 15:55:18 [Broker] debug Processing node property app-id = qmf2
2023-02-27 15:55:18 [Broker] debug Processing node property qmf.opcode = _method_request
2023-02-27 15:55:18 [Broker] debug Processing node property method = request
2023-02-27 15:55:18 [Broker] debug Processing node property subject = broker
2023-02-27 15:55:18 [Broker] debug Processing node property correlation-id = 1
2023-02-27 15:55:18 [Broker] debug Processing node property content-type = amqp/map
2023-02-27 15:55:18 [Broker] debug Processing node property routing_key = broker
2023-02-27 15:55:18 [Broker] debug Processing node property reply_to = qmf.default.topic/direct.8702f596-b112-427d-b93e-7e0ae28f2ae8;{node:{type:topic}, link:{x-declare:{exclusive:True}}}

But the library seems to fail because the broker then writes :

2023-02-27 15:55:18 [Protocol] trace [qpid.172.25.0.2:5672-10.67.130.146:59013]: FRAME: 0 <- @detach(22) [handle=0x0, closed=true, error=@error(29) [condition=:"amqp:invalid-field", description="DynamicNodePropertiesIsSomeWhenDynamicIsFalse"]]

Which is the error I also get on rust client side :

thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: DynamicNodePropertiesIsSomeWhenDynamicIsFalse'
minghuaw commented 1 year ago

That is caused bya check when a link receives the remote attach frame. It is only checking if there's non-compliant behavior.

I think there's a possibility that qpid broker didn't follow the spec strictly when it replies to a dynamic link. (I have found that many implementations do not strictly follow the spec and allow non-compliant behavior).

I think one thing i can do is to allow user to disable this check so that you could still connect to the broker even if some fields in the remote attach frame aren't strictly following the spec

elsasslegend commented 1 year ago

That would be nice Indeed, i'll provide you with the container asap so you can test your library behavior against

elsasslegend commented 1 year ago

Okay @minghuaw container images are now available at the address mentionned earlier. You can download the entire folder and let me now if it works for you, to remove it.

minghuaw commented 1 year ago

Thanks @djflex68 I will give it a try in about 1 hour. I will let you know then

minghuaw commented 1 year ago

@djflex68 I have made a new release ("0.8.20" or "0.7.26") that added two new methods (verify_incoming_source() and verify_incoming_target()) for sender/receiver builder. The source/target verification is enabled by default but can be disabled using those two methods.

Sender::builder()
        .verify_incoming_source(false) // this will disable verification of the source field in the incoming attach frame
        .verify_incoming_target(false) // disable the verification of the target field in the incoming attach frame
...

I am going to try the docker image. I have downloaded everything, so you can remove the link if you want

minghuaw commented 1 year ago

@djflex68 I have run a quick test with verify_incoming_source(false) and verify_incoming_target(false), and everything else is the same as the example you presented above. And it seems to work. The number of queues that I read from qpid-config increased from 1 to 2 when the test is running but comes back to 1 once the test program stops. I guess this has something to do with the durability of the queue.

minghuaw commented 1 year ago

The address of the dynamically created queue is actually "<container_id>_<link_name>" where <container_id> is the name you give to the connection and <link_name> is the name you give to the link

minghuaw commented 1 year ago

The number of queues that I read from qpid-config increased from 1 to 2 when the test is running but comes back to 1 once the test program stops. I guess this has something to do with the durability of the queue.

This is actually controlled by a key-value pair in the dynamic node property. The queue will be persisted if the following entry is inserted into the node properties

properties.insert(Symbol::from("auto-delete"), Value::from(false));
elsasslegend commented 1 year ago

Hi @minghuaw,

Many thanks for you reactivity ! I'm will try that this morning and let you know 😉

elsasslegend commented 1 year ago

Okay after setting

 .verify_incoming_source(false)
 .verify_incoming_target(false)

and

properties.insert(Symbol::from("auto-delete"), Value::from(false));

I'm able to persist a queue on the broker, which is a big step forward 👍 👍 Now I've to see how it would be possible to configure queue name and bindings, because the message contents seems to be fully ignored, it's unclear for now how to pass a queue name and some arguments such as durable properties (In fact the created queue use defaults from container and sender name)

elsasslegend commented 1 year ago

For now I cannot create anything else that a default queue that uses container id. According to redhat doc, using dynamic_node seems also to be some kind of "legacy" behavior, so it's hard to find some info on how to configure properties when using this mode. I get some infos from (qpid source) but it's still unclear how to create topic exchanges with bindings like that, and even with setting properties like :

.insert("name", "foo")
                    .insert("supported-dist-modes", "copy")
                    .insert("exchange-type", "topic")

I still cannot create an exchange, it only creates a queue.

minghuaw commented 1 year ago

@djflex68 It doesn't seem like the python packages required to run their python examples are not publicly available. Do you have any working python code that I could probably use the logs to find out how the messages are formatted?

elsasslegend commented 1 year ago

I do not have working python code unfortunately, but on the given broker image, you have qpid-tools installed, which is written in python. qpid-tools allows you to create/delete queues/exchanges using cli. I hope this helps, I'm still digging into the dynamic nodes mode, this broker is such a pain...

minghuaw commented 1 year ago

@djflex68 I've figured out how to create an exchange using a AMQP 1.0 message. I managed to create a new exchange named "test-fanout" (see the last entry in the pasted shell output below). This was created using the message listed here (https://access.redhat.com/documentation/en-us/red_hat_enterprise_mrg/3/html-single/messaging_programming_reference/index#Creating_Exchanges_from_an_Application)

root@5c9df94db982:/# qpid-config -r exchanges
Exchange '' (direct)
    bind [fe6b00ed-ee81-467f-a884-161b785ac892:0.0] => fe6b00ed-ee81-467f-a884-161b785ac892:0.0
Exchange 'amq.direct' (direct)
Exchange 'amq.fanout' (fanout)
Exchange 'amq.match' (headers)
Exchange 'amq.topic' (topic)
Exchange 'qmf.default.direct' (direct)
Exchange 'qmf.default.topic' (topic)
    bind [direct.648f0730-9741-4497-b289-5aa844eedfa1] => fe6b00ed-ee81-467f-a884-161b785ac892:0.0
Exchange 'qpid.management' (topic)
Exchange 'test-fanout' (fanout)
minghuaw commented 1 year ago

@djflex68 I have added an example (https://github.com/minghuaw/fe2o3-amqp/blob/main/examples/qpid_management_framework/src/main.rs) to show how to work with QMF using AMQP 1.0. I have tried adding as much documentation in the example's code as possible. Hopefully the example is self-explanatory enough.

The example creates a "fanout" exchange, but you can easily create a "topic" exchange by changing the value of the "exchange-type" entry.

elsasslegend commented 1 year ago

Wow @minghuaw what can I say more that thank you so much for your time and your patience. This is really nice from you ! I'm going to try your example right now and let you know, I think the part that I was doing wrong is that I was trying to serialize the message content where you are building an ordered map (I didn't know about that).

elsasslegend commented 1 year ago

It works !! 🎉 😄 You've succeeded in 1 hour where I was failing since 3 days 😃 That was the perfect solution I was looking for, I think I would never have found it without your expertise and knowledge about AMQP protocol and brokers. I saw you renamed the issue, you're right it's more about QPID configuration and I hope it helped you nevertheless in improving your library and it might help someone else. This is an awesome work ! Thank you again !

minghuaw commented 1 year ago

@djflex68 Is it ok if I close this issue for now?

elsasslegend commented 1 year ago

Yes it can be closed, thanks for you help :)