minghuaw / fe2o3-amqp

A rust implementation of the AMQP1.0 protocol based on serde and tokio.
MIT License
58 stars 7 forks source link

How to extract a bytes vector from the body, supporting several body formats #204

Closed VLorz closed 1 year ago

VLorz commented 1 year ago

Hi, I need to receive and process data sent by several nodes, which is, at the end, a vector of u8, but must support extracting those data from several different body Value types. For capturing all body formats I use receiver.recv::<Body<Value>>(), then I have a code similar to this:

        let msg = delivery.message();
        match &msg.body {
            Body::Data(ref batch) => {
                for data in batch.iter() {
                    let bytes = data.0.as_ref();
                    self.process_payload(bytes);
                }
            },
            Body::Value(ref value) => {
                use fe2o3_amqp::types::primitives::{Array, UByte};
                match &value.0 {
                    Value::Binary(ref buf) => {
                        let bytes = buf.as_ref();
                        self.process_payload(bytes);
                        return;
                    },
                    data => {
                        error!(r#"topic "{}" -> unsupported message body format: {:?}"#, &self.id, data);
                    },
                }
            },
            body => {
                error!(r#"topic "{}" -> unsupported message body format: {:?}"#, &self.id, body);
            },
        }

When a peer send a Vec<u8> buffer, I receive a List, which then produces: unsupported message body format: List([UByte(72), UByte(111), UByte(108), UByte(97), UByte(32), UByte(97), UByte(104), UByte(195), UByte(173)]).

How do I do for collecting a Vec<u8> that can come in a Value::List, Value::Array? Thanks a lot in advance.

minghuaw commented 1 year ago

Hi @VLorz

There are two ways around this.

  1. Maybe you can ask your peer to wrap the Vec<u8> inside a Binary, which is an alias for serde_bytes::ByteBuf. This is a current limitation of the serde ecosystem caused by the lack of specialization in rust. All Vec<T> including Vec<u8> are serialized as a sequence, which in the case of AMQP 1.0, are mapped to a List.

    Your peer could choose to use either Body::Data variant or Body::Value variant as the body section.

    The code would be similar to something like

use fe2o3_amqp::types::primitives::Binary;

// The default body section will be `Body::Value` if the `Vec<u8>` is not wrapped in `Binary`, and the value will be a `List` if the `Vec<u8>` is not wrapped inside an `Array`.
let val = Binary::from(vec![1u8,  2u8, 3u8, 4u8]);
let result = sender.send(val).await;

// You could also explicitly build a message to use `Body::Value`
let val = Binary::from(vec![1u8,  2u8, 3u8, 4u8]);
let msg = Message::builder()
    .value(val)
    .build();
let result = sender.send(msg).await;

// Or you can choose to use a `Body::Data` body section
let val = Binary::from(vec![1u8,  2u8, 3u8, 4u8]);
let msg = Message::builder()
    .data(val)
    .build();
let result = sender.send(msg).await;
  1. What you need in your code is essentially to match the Value::List and Value::Array variant (note that the value won't be an Value::Array if your peer did not explicitly wrap a vec inside an Array.). I have provided TryFrom/TryInto trait implementations for a bunch of types so that conversion from/to a serde_amqp::Value can be much more pleasant. It would be something like
// Let's say you have a `Value::List` that is simply a 
let value = Value::List(vec![Value::UByte(1), Value::UByte(2), Value::UByte(3)]);
match Vec::<u8>::try_from(value) {
    Ok(v) => {
        // The value is a list of u8
        println!("{:?}", v)
    },
    Err(v) => {
        // The value is not a list of u8
    },
};

A list of types that can be converted from a Value using the method can be found in the doc (https://docs.rs/serde_amqp/0.5.9/serde_amqp/value/enum.Value.html#impl-TryFrom%3CValue%3E-for-Array%3CT%3E)

minghuaw commented 1 year ago

The second method I mentioned above may need to take ownership of the Value. If you would like to inspect the value before acknowledging the delivery, one thing you could do is the store the delivery info before converting the delivery into the message body.

use fe2o3_amqp::link::delivery::DeliveryInfo;

let delivery_info = DeliveryInfo::from(&delivery);
let message = delivery.into_message();

// Process the message body
match message.body {
    Body::Value(value) => match Vec::<u8>::try_from(value) {
        Ok(v) => {
            // The value is a list of u8
            println!("{:?}", v)
        },
        Err(v) => {
            // The value is not a list of u8
        },
    },
    Body::Data => {// If you peer didn't explicitly use `Body::Data`, the message won't use `Body::Data` as the body section},
    _ => {// Other types of body section}
}

// Acknowledge delivery at the end
let result = receiver.accept(delivery_info).await;
VLorz commented 1 year ago

@minghuaw, second method worked, and your tip from your second message was also useful. Thanks a lot for your help and for sharing your work!!