minghuaw / fe2o3-amqp

A rust implementation of the AMQP1.0 protocol based on serde and tokio.
MIT License
58 stars 7 forks source link

How to listen to messages coming from multiple queues? #198

Closed Obiwan1995 closed 1 year ago

Obiwan1995 commented 1 year ago

Hello,

I have an app which declares multiple message queues so I created one Receiver per queue. Then, I wanted to create a thread per receiver to wait for incoming messages:

// Create connection
let mut connection = Connection::open("my_service", AMQP_BROKER_URL)
    .await
    .map_err(|e| Error(e));

// Create session
let mut session = Session::begin(&mut connection)
    .await
    .map_err(|e| Error(e));

// Create receivers
for queue in queues {
    let mut receiver = Receiver::attach(
        &mut session,
        queue,
        queue
     )
    .await
    .map_err(|e| Error(e));

    // Start listening to incoming messages
    spawn(async move {
        loop {
            match receiver.recv::<Message>().await {
                Err(err) => error!("AMQP Reception error: {}", err),
                Ok(delivery) => {
                     // Do someting with the delivery
                }
            }
        }
    });
}

When I do this, I get an infinite loop of errors: Local error: IllegalSessionState.

I also tried to put the connection and session creation inside the thread and it's working. But I don't like this approach as errors are not sent back to the main thread (and so we can't act on a connection error for instance).

What's the best way to achieve this? Is it because multiple receivers cannot share the same session? And if I really need to instanciate connection and session inside the thread, should I use a different container_id for each receiver?

Thanks in advance for your answer!

minghuaw commented 1 year ago

Thanks for raising the issue. You can definitely have more than one receiver/sender on a single session.

I also tried to put the connection and session creation inside the thread and it's working. But I don't like this approach as errors are not sent back to the main thread (and so we can't act on a connection error for instance).

This sounds like the session is dropped before the receivers are closed. The SessionHandle which is the session variable in your code, manages the lifetime of the underlying session event loop, and when the variable session is dropped, the underlying session event loop is stopped as well. This applies to the connection as well.

I think a good practice is to .await for the spawned tasks to finish before closing the session and connection. For example

// Create connection
let mut connection = Connection::open("my_service", AMQP_BROKER_URL)
    .await
    .map_err(|e| Error(e));

// Create session
let mut session = Session::begin(&mut connection)
    .await
    .map_err(|e| Error(e));

let mut handles = vec![];
// Create receivers
for queue in queues {
    let mut receiver = Receiver::attach(
        &mut session,
        queue,
        queue
     )
    .await
    .map_err(|e| Error(e));

    // Start listening to incoming messages
    let handle = spawn(async move {
        loop {
            match receiver.recv::<Message>().await {
                Err(err) => error!("AMQP Reception error: {}", err),
                Ok(delivery) => {
                     // Do someting with the delivery
                }
            }
        }
    });

    handles.push(handle);
}

for handle in handles {
    handle.await.unwrap();
}

session.close().await.unwrap();
connection.close().await.unwrap();
minghuaw commented 1 year ago

@Obiwan1995 Please note that AMQP sessions employs a window-based flow control, and all links attached to the same session are subject to the same session. So in some occasions it would make sense to have only one link (sender or receiver) on a session. If you dig into azure's event hub sdk, you would find that this is actually what they do.

Obiwan1995 commented 1 year ago

Hello, thanks for your quick reply :)

My receivers are running indefinitely until application closes, so, unless I put everything in a thread, I can't wait for them to stop :/ Is there a way to keep the connection/session alive?

minghuaw commented 1 year ago

My receivers are running indefinitely until application closes,

Well, you then just need to make sure the connection and session live until the end of your application. One option is to store the ConnectionHandle and SessionHandle (ie. the connection and session variables) in your application's states and make sure that they are not closed/dropped until your application closes

Obiwan1995 commented 1 year ago

Hello,

I managed to store the connection and session in my application and now it's working correctly :)

Thanks for your help!

minghuaw commented 1 year ago

I will close this for now. Feel free to re-open the issue if there is more questions/problems.