minghuaw / fe2o3-amqp

A rust implementation of the AMQP1.0 protocol based on serde and tokio.
MIT License
58 stars 7 forks source link

Way to accept/reject/release in another thread while still receiving new messages? #225

Closed Levyks closed 6 months ago

Levyks commented 6 months ago

Is there some way to process messages in another tokio thread, so that long running tasks don't block my consumer from processing more messages?

My goal is to have something like the example below (doesn't work):

loop {
    let message: Delivery<Value> = receiver.recv().await.unwrap();
    println!("Received message: {:?}", message);
    tokio::spawn(async move {
        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
        println!("Message processed");
        // somehow accept?
        // this doesn't work as receiver is used as mutable in the .recv() call
        receiver.accept(&message).await.unwrap();
        println!("Message accepted");
    });
}

But since Receiver does not implement Copy/Clone, I cannot get something like this to work

I've seen some crates deal with a similar issue in websockets by splitting the incoming/outgoing channels.

Would this approach be possible here? I've seen you mention in #125, that the Receiver struct itself can never implement Copy/Clone as tokio::sync::mpsc::Receiver does not as well, but the tokio::sync::mpsc::Sender used in outgoing does, but for that to work the link property would need to be Copy/Clone, is that possible?

EDIT: Or maybe there is already a workaround that lets me do that without implementing anything new at the crate level?

Levyks commented 6 months ago

nevermind, I was reading through #22 and found the cancel_safety example and that approach of using select! with a separate channel appears to be exactly what I needed.

but maybe that example could be renamed to something different to make it easier to find when searching for something like my issue? Or maybe add a reference to it in the README.md, since processing messages in a async way is such a common use-case for this kind of crate

minghuaw commented 6 months ago

Thank you for the information. That example was really to show and allow users to test that the APIs are cancellation safe which is kinda implied if you want to use the select! macro. That kind of use pattern, to me, is more like something enabled by the tokio or async rust ecosystem, and this is why I didn't make a separate example.

minghuaw commented 6 months ago

@Levyks #229 is published in version 0.9.6

Levyks commented 6 months ago

thanks for the quick merging/publishing