minghuaw / fe2o3-amqp

A rust implementation of the AMQP1.0 protocol based on serde and tokio.
MIT License
60 stars 7 forks source link

Method of accepting/disposing messages on a Receiver which doesn't stop the receiving of messages on that Receiver #282

Open lmcnulty4 opened 1 week ago

lmcnulty4 commented 1 week ago

Following the cancel safety example, it seems the generally accepted way of accepting messages from another thread is to use a channel to send the original message back from the other thread to the receiver function and tokio::select! over that channel and the Receiver's recv() method. The problem is that once we've got a message back on this channel and we call receiver.accept(msg) on it, we then must await the result of that call - and while awaiting that, we are no longer receiving messages. The usual way to we'd do things concurrently would be to use tokio::spawn or tokio::task::spawn_local to spawn another task which can .await the call to receiver.accept or receiver.dispose. If we do that, we'd need to ensure the receiver would live long enough whilst doing this, so we'd need to wrap it in an Arc (if using tokio::spawn) or Rc (if using tokio::task::spawn_local). However, even that won't be enough for this because the receiver's .recv() method takes a mutable reference to the Receiver. I think even using tokio::task::spawn_local and using a Cell for interior mutability wouldn't work because the cell would need to be "locked" across an .await so would just panic at run time.

Would it be possible then to have some new mechanism for disposing of messages without holding a reference to the original Receiver? Perhaps a new .create_message_disposer() method on Receiver, which returns a new MessageDisposer struct which has dispose/accept/etc. methods?

minghuaw commented 6 days ago

I think there's some confusion. When you dispose (accept, reject, etc) a message on a receiver, all the receiver does is simply sending a new Disposition frame to the session event loop and it doesn't wait for any Disposition response because that is handled by the session event loop. The dispose methods returns an error because the session or connection might get closed or the remote peer decides to close the link.

Would it be possible then to have some new mechanism for disposing of messages without holding a reference to the original Receiver? Perhaps a new .create_message_disposer() method on Receiver, which returns a new MessageDisposer struct which has dispose/accept/etc. methods?

Depending on the message settle mode, there are some internal states that need to be modified. Having a separate entity that does the disposal simply means the lock is hidden from you but it's still there. Plus, I'm also not aware of any other AMQP 1.0 clients that does this. Please refer me to one if you know one.

minghuaw commented 6 days ago

While I may not fully understand the value behind this request, you are welcome to create your own fork and implement it as you see fit

lmcnulty4 commented 6 days ago

I think there's some confusion. When you dispose (accept, reject, etc) a message on a receiver, all the receiver does is simply sending a new Disposition frame to the session event loop and it doesn't wait for any Disposition response because that is handled by the session event loop.

Ah, that does improve things then - if no IO is actually happening during the .await while disposing then I suppose it's not as big deal as it would otherwise be to .await while receiving.

I've found that I'm having the same problem on the publisher end. Say we have a tokio mpsc Receiver and we pull messages from that receiver in order to publish them. When sending them, the .send method on a Sender takes a mutable reference to self so it's not possible to spawn a task (tokio::spawn/tokio::task::spawn_local) which takes (a reference counted) instance of that sender and a message and sends the message, awaiting the result. Code sample that fails to compile:

async fn publish(mut tokio_receiver: tokio::sync::mpsc::Receiver<usize>, amqp_sender: fe2o3_amqp::Sender) {

    let rc_sender = std::sync::Arc::new(amqp_sender);

    while let Some(msg) = tokio_receiver.recv().await {
        let body = format!("hello: {}", msg);
        let msg = fe2o3_amqp::types::messaging::Message::builder().value(body).build();
        let sender = rc_sender.clone();
        tokio::spawn(async move {
            let _ = sender.send(msg).await;     ➤  cannot borrow data in an `Arc` as mutable  trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<fe2o3_amqp::Sender>`
        });
    };

}

I think in this case the .await of the sender.send(msg) waits for the broker to confirm it has received the message so is performing IO. This means we have no concurrency for any given fe2o3_amqp::Sender, throughput then is 1/latency, for a message to be sent it must wait for the broker to confirm receipt of the previous sent message.

Is this understanding correct? If so, is there some way to achieve concurrency for a fe2o3_amqp::Sender?

minghuaw commented 5 days ago

the .await of the sender.send(msg) waits for the broker to confirm it has received the message so is performing IO

In this case, you have the Sender::send_batchable method (this is probably a really bad name, I'd love to get it a more proper name if you have one) available which doesn't wait for the broker to confirm, and you will get a future which you can poll to wait for the remote acknowledgement. The .await on the send_batchable really is just the .await on the internal channel to the session.

lmcnulty4 commented 1 day ago

Apologies for the delayed response. Thank you, yes the send_batchable method will work! I did actually skip over it because of the name, but I'm not sure what a better name would be.