minghuaw / fe2o3-amqp

A rust implementation of the AMQP1.0 protocol based on serde and tokio.
MIT License
62 stars 7 forks source link

Send to Event Hub hangs if idle for 30+ minute #200

Open ciaran-conditionalai opened 1 year ago

ciaran-conditionalai commented 1 year ago

Hi,

I've been running some tests based on the event hubs simple sender example. If a simulated idle occurs for 30+ minutes (this worked for 20 minutes) the second send call in the test pasted below hangs. Before calling it both the connection and session indicate they are not closed or ended. Ideally, the send should return an error instead of hanging. Details follow.

Dependencies

[dependencies] fe2o3-amqp = { version = "0.8.20", features = ["rustls"] } tokio = {version = "1.27.0", features = ["net", "rt", "rt-multi-thread", "macros"] }

Test to Reproduce

use std::env;

use dotenv::dotenv;
use fe2o3_amqp::{
    connection::ConnectionHandle,
    sasl_profile::SaslProfile,
    session::SessionHandle,
    types::{
        messaging::{Message, Properties},
        primitives::Binary,
    },
    Connection, Sender, Session,
};
use tokio::time::{sleep, Duration};

// https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-troubleshoot
// cargo test test_send_after_idle_for_30_mins -- --nocapture
#[tokio::test]
pub async fn test_send_after_idle_for_30_mins() {
    println!("Start");

    dotenv().ok();

    let hostname = env::var("HOST_NAME").unwrap();
    let sa_key_name = env::var("SHARED_ACCESS_KEY_NAME").unwrap();
    let sa_key_value = env::var("SHARED_ACCESS_KEY_VALUE").unwrap();
    let event_hub_name = env::var("EVENT_HUB_NAME").unwrap();

    let url = format!("amqps://{}", hostname);
    let mut connection = Connection::builder()
        .container_id("rust-connection-1")
        .alt_tls_establishment(true) // EventHubs uses alternative TLS establishment
        .sasl_profile(SaslProfile::Plain {
            username: sa_key_name,
            password: sa_key_value,
        })
        .open(&url[..])
        .await
        .unwrap();
    let mut session = Session::begin(&mut connection).await.unwrap();
    let mut sender = Sender::attach(&mut session, "rust-simple-sender", event_hub_name)
        .await
        .unwrap();

    println!("Before first send message");
    send(&connection, &session, &mut sender, "test message 1").await;

    println!("Before simulate idling for 30mins");
    sleep(Duration::from_secs(60 * 30)).await;

    println!("Before second send message");
    send(&connection, &session, &mut sender, "test message 2").await;

    println!("Before close, end, close");
    sender.close().await.unwrap();
    session.end().await.unwrap();
    connection.close().await.unwrap();

    println!("Finished");
}

async fn send(
    connection: &ConnectionHandle<()>,
    session: &SessionHandle<()>,
    sender: &mut Sender,
    data: &str,
) {
    println!(
        "connection.closed={}, session.ended={}",
        connection.is_closed(),
        session.is_ended()
    );

    let message = Message::builder()
        .properties(
            Properties::builder()
                .group_id(String::from("send_to_event_hub"))
                .build(),
        )
        .data(Binary::from(data))
        .build();
    let outcome = sender.send(message).await.unwrap();
    outcome.accepted_or_else(|outcome| outcome).unwrap();
}
minghuaw commented 1 year ago

I will look into it

minghuaw commented 1 year ago

My initial test actually ended up giving me an IllegalState error after sleeping for 40 minutes. I will do more investigations

ciaran-conditionalai commented 1 year ago

sorry, hit wrong button. Running test again with trace output enabled to see if I can get any additional information for you. One thing I have noticed is that just before the idle timeout ends there are 4 requests successfully processed by the event hub - what they are I don't know yet.

minghuaw commented 1 year ago

@ciaran-conditionalai I found the problem. Event Hubs force detaching a link if the link has been idle for 30 minutes. Below is the detach frame received from Event Hubs

frame=Detach { handle: Handle(0), closed: true, error: Some(Error { condition: LinkError(DetachForced), description: Some("The link 'G29:140866372:rust-simple-sender' is force detached. Code: ServerError. Details: AmqpEventHubPublisher.IdleTimerExpired: Idle timeout: 00:30:00. TrackingId:af1ffc0b0000a4ba0064c6a364339590_G29_B53, SystemTracker:fe2o3-amqp-event-hubs-example:eventhub:test-example~10922, Timestamp:2023-04-10T05:30:24"), info: None }) }

minghuaw commented 1 year ago

I guess one thing you can do is to detach and then re-attach the link if there is a LinkStateError. This method may be helpful https://docs.rs/fe2o3-amqp/0.8.20/fe2o3_amqp/link/sender/struct.Sender.html#method.detach_then_resume_on_session

ciaran-conditionalai commented 1 year ago

Interesting, when logging trace output the test does generate an expected error for this scenario and the test bombs out:

thread 'test_send_after_idle_for_30_mins' panicked at 'called `Result::unwrap()` on an `Err` value: LinkStateError(RemoteClosedWithError(Error { condition: LinkError(DetachForced), description: Some("Idle link tracker, link rust-simple-sender has been idle for 1800000ms TrackingId:ccae0df3-9e98-4969-8cec-ef6c8c5707fa_G14, SystemTracker:coreilly-dev-eventhub-ns:EventHub:dev_coreilly, Timestamp:2023-04-10T05:30:07"), info: None }))'

but I do see the test hang if no logging output being generated so adding recovery code around the LinkStateError looks to be the right thing to do it likely won't get called due to the hang.

minghuaw commented 1 year ago

That is interesting. My test cases never had the link hanging whether logging with either tracing or log is enabled or not.

Another thing I found in the log is that the session will be forced to close after the link is forced to close. And then the AMQP connection will be considered inactive after all its sessions/links are closed, and the connection will be closed after the connection is inactive for 300000 milliseconds.

So you may need recover all the way from connection if there is only one link on that connection.

minghuaw commented 1 year ago

@ciaran-conditionalai FYI, I am currently working on an AMQP 1.0 based Event Hubs SDK for rust (Azure/azure-sdk-for-rust#1260). Though the producer client API already works, I haven't implemented auto-recovery for the producer client unfortunately. I was planning to work on recovery after implementing the consumer client, but now I may prioritize auto-recovery

ciaran-conditionalai commented 1 year ago

@minghuaw thanks for the link, I'd just started on trying to write similar event hub producer/consumer clients myself based on the Java SDK, which I've used previously.

minghuaw commented 1 year ago

@ciaran-conditionalai I was wondering if you have any suggestion for this issue #40 ? The sender is kinda lazy that in the case of being forced to close after inactivity, it won't automatically reply to the remote Detach unless the sender tries to send something or detach/close itself.

ciaran-conditionalai commented 1 year ago

I am not overly familiar with the underlying protocol, but my thoughts would be that the sender should be actively listening (blocking on a separate thread) on protocol control signals sent from the remote. It might be worth plumbing the depths of the Java amqp library as they are most likely addressing (I have used in production and it handles these idle scenarios with little issue - i.e. the setup is bursts of streamed data).

minghuaw commented 1 year ago

Thanks for your feedback. I have discovered another behavior of Event Hubs. It doesn't allow detaching then re-attaching the same link. So upon closed link/session/connection due to inactivity, you would actually need to create entirely new links.

minghuaw commented 1 year ago

It doesn't allow detaching then re-attaching the same link. So upon closed link/session/connection due to inactivity, you would actually need to create entirely new links.

This is probably my fault. I didn't do CBS auth before re-attaching

ciaran-conditionalai commented 1 year ago

Good catch.

On Tue, Apr 11, 2023, 7:00 PM minghuaw @.***> wrote:

It doesn't allow detaching then re-attaching the same link. So upon closed link/session/connection due to inactivity, you would actually need to create entirely new links.

This is probably my fault. I didn't do CBS auth before re-attaching

— Reply to this email directly, view it on GitHub https://github.com/minghuaw/fe2o3-amqp/issues/200#issuecomment-1503852743, or unsubscribe https://github.com/notifications/unsubscribe-auth/ASJ4V6HR24LEX4OZOASR7B3XAWL35ANCNFSM6AAAAAAWYI7XCU . You are receiving this because you were mentioned.Message ID: @.***>

minghuaw commented 1 year ago

@ciaran-conditionalai I have briefly tested auto recovery on the recent commit https://github.com/Azure/azure-sdk-for-rust/pull/1260/commits/7e881fa11c6d84cdc76ea78aff6462f855bfef96 (in this branch https://github.com/minghuaw/azure-sdk-for-rust/tree/eventhubs_over_amqp), which you could probably give a try (I haven't add any documentation yet). I have tested both inactivity and manually turning off my router. It seems to work fine so far.

The receiver client has not been implemented yet. However, it doesn't seem like Event Hubs enforce the same inactivity rule for the receivers anyway.

ciaran-conditionalai commented 1 year ago

Good to hear, it will be a little while before I can swing back and look at this in detail. It'll be nice to see this get incorporated into the azure rust offering.

On Tue, Apr 11, 2023, 10:22 PM minghuaw @.***> wrote:

@ciaran-conditionalai https://github.com/ciaran-conditionalai I have briefly tested auto recovery on the recent commit @.*** https://github.com/Azure/azure-sdk-for-rust/commit/7e881fa11c6d84cdc76ea78aff6462f855bfef96, which you could probably give a try (I haven't add any documentation yet). I have tested both inactivity and manually turning off my router. It seems to work fine so far.

The receiver client has not been implemented yet. However, it doesn't seem like Event Hubs enforce the same inactivity rule for the receivers anyway.

— Reply to this email directly, view it on GitHub https://github.com/minghuaw/fe2o3-amqp/issues/200#issuecomment-1504109321, or unsubscribe https://github.com/notifications/unsubscribe-auth/ASJ4V6AGQK7UQTWFG6JWOK3XAXDQJANCNFSM6AAAAAAWYI7XCU . You are receiving this because you were mentioned.Message ID: @.***>

minghuaw commented 1 year ago

@ciaran-conditionalai I have published the initial release of the event hub sdk on crates.io (https://crates.io/crates/azeventhubs). Both EventHubProducerClient and EventHubConsumerClient are implemented. Processor APIs have not been implemented yet.

ciaran-conditionalai commented 1 year ago

Sounds good, thanks for the update.

On Tue, Apr 25, 2023, 7:23 AM minghuaw @.***> wrote:

@ciaran-conditionalai https://github.com/ciaran-conditionalai I have published the initial release of the event hub sdk on crates.io ( https://crates.io/crates/azeventhubs). Both EventHubProducerClient and EventHubConsumerClient are implemented. Processor APIs have not been implemented yet.

— Reply to this email directly, view it on GitHub https://github.com/minghuaw/fe2o3-amqp/issues/200#issuecomment-1521215124, or unsubscribe https://github.com/notifications/unsubscribe-auth/ASJ4V6HEPXE7MURXP32OHWDXC5UV3ANCNFSM6AAAAAAWYI7XCU . You are receiving this because you were mentioned.Message ID: @.***>