minghuaw / fe2o3-amqp

A rust implementation of the AMQP1.0 protocol based on serde and tokio.
MIT License
58 stars 7 forks source link

Messages get stuck after a `MessageDecodeError` #232

Closed Levyks closed 5 months ago

Levyks commented 6 months ago

I've noticed that, when implementing something like the example below:

#[derive(Debug, Deserialize)]
#[serde(tag = "t", content = "c")]
pub enum MyMessage {
    Foo(String),
}
// FromBody and FromEmptyBody impl

let received = receiver.recv::<MyMessage>().await;

If I send something like this in that receiver's queue

{ "t": "Bar", "c": "Hello World!" }

I get a MessageDecodeError, which makes sense, however, the message is not rejected after it, and the error itself does not provide the DeliveryInfo necessary for me to do it myself, making the message be stuck as delivering in the queue until the connection is dropped.

Would it be possible to implement an auto-reject option to deal with this case? Or maybe just include the DeliveryInfo inside the error?

I haven't checked all the variants of the RecvError enum, but maybe there's a chance of that happening in other cases as well?

Levyks commented 6 months ago

As a workaround, I thought about just receiving a generic Body<Value> and dealing with the deserialization myself, but I couldn't figure out how to turn that into a MyMessage enum after that, since apparently serde_amp only implements a from_reader and a from_slice

minghuaw commented 6 months ago

[serde(tag = "t", content = "c")]

Most of these serde tags are targeted at JSON and a lot of them don't make sense for other encoding (including AMQP). There's a high chance that the decoding error is caused by these tags.

Would it be possible to implement an auto-reject option to deal with this case? Or maybe just include the DeliveryInfo inside the error?

Good idea. I prefer the latter option but it would require a breaking release. A better way might be allowing user to set a handler function/closure that has access to the receiver and delivery info. I'll give this a try.

but I couldn't figure out how to turn that into a MyMessage enum after that

serde_amqp has this method from_value https://docs.rs/serde_amqp/latest/serde_amqp/fn.from_value.html

Levyks commented 6 months ago

Most of these serde tags are targeted at JSON and a lot of them don't make sense for other encoding (including AMQP). There's a high chance that the decoding error is caused by these tags.

It works fine if I send it with an enum variant that actually exists, I just left it there as an example as that is how I'm using it in my project, it has some JSON interop.

serde_amqp has this method from_value https://docs.rs/serde_amqp/latest/serde_amqp/fn.from_value.html

I totally missed that somehow, I guess I will be able to use the workaround I mentioned then, thanks

minghuaw commented 6 months ago

The message decode error now carries the delivery info in 0.10.0 https://docs.rs/fe2o3-amqp/0.10.0/fe2o3_amqp/link/enum.RecvError.html#variant.MessageDecode

minghuaw commented 5 months ago

@Levyks Would you consider this issue resolved?

Levyks commented 5 months ago

yeah, sorry for not closing it