tokahuke / yaque

Yaque is yet another disk-backed persistent queue for Rust.
Other
81 stars 11 forks source link

Handling of ctrl+c with save is unclear #29

Closed MTRNord closed 11 months ago

MTRNord commented 11 months ago

Hi maybe I am just doing something wrong but I tried to handle ctlrl_c and ran into this:

error[E0499]: cannot borrow `receiver` as mutable more than once at a time
   --> crates/erooster_smtp/src/servers/mod.rs:86:25
    |
84  | /                 tokio::select! {
85  | |                     _ = signal::ctrl_c() => {
86  | |                         receiver.save().expect("Unable to save queue");
    | |                         ^^^^^^^^ second mutable borrow occurs here
87  | |                     },
88  | |                     data = receiver.recv() => {
    | |                            -------- first mutable borrow occurs here
...   |
112 | |                     }
113 | |                 }
    | |_________________- first borrow might be used here, when `output` is dropped and runs the destructor for type `servers::start::{closure#0}::{closure#0}::__tokio_select_util::Out<std::result::Result<(), std::io::Error>, std::result::Result<erooster_deps::yaque::queue::RecvGuard<'_, std::vec::Vec<u8>>, std::io::Error>>`

with this code:

// Start listening for tasks
    let mut receiver = ReceiverBuilder::default()
        .save_every_nth(None)
        .open(config.task_folder.clone());
    if let Err(e) = receiver {
        warn!("Unable to open receiver: {:?}. Trying to recover.", e);
        recover(&config.task_folder)?;
        receiver = ReceiverBuilder::default()
            .save_every_nth(None)
            .open(config.task_folder.clone());
        info!("Recovered queue successfully");
    }

    match receiver {
        Ok(mut receiver) => {
            loop {
                tokio::select! {
                    _ = signal::ctrl_c() => {
                        receiver.save().expect("Unable to save queue");
                    },
                    data = receiver.recv() => {
                        match data {
                            Ok(data) => {
                                let email_bytes = &*data;
                                let email_json = serde_json::from_slice::<EmailPayload>(email_bytes).expect("Unable to parse email payload json");

                                if let Err(e) = send_email_job(&email_json).await {
                                    tracing::error!(
                                        "Error while sending email: {:?}. Adding it to the queue again",
                                        e
                                    );
                                    // FIXME: This can race the lock leading to an error. We should
                                    //        probably handle this better.
                                    let mut sender = Sender::open(config.task_folder.clone()).expect("Unable to open queue sender");
                                    let json_bytes = serde_json::to_vec(&email_json).expect("Unable to convert email to bytes");
                                    sender.send(json_bytes).await.expect("Unable to add email to queue");
                                }
                                // Mark the job as complete
                                data.commit().expect("Unable to commit data");
                            }
                            Err(e) => {
                                tracing::error!("Error while receiving data from receiver: {:?}", e);
                            }
                        }
                    }
                }
            }
        }
        Err(e) => {
            error!("Unable to open receiver: {:?}. Giving up.", e);
        }
    }

The error is obvious and makes sense, but I wonder what is a better way of handling this in a way where I save on crash :)

tokahuke commented 11 months ago

Hum... I think that, in your case, it would be best receiver were behind a Mutex and you spawned a task in tokio specifically to handle ctrl+c. This allows you to break up concerns ("handle signal" vs. "receive message") a bit better.

However, you can make this work in the current code putting receiver behind a RefCell. We know that no two mutable references will exist in runtime, so just move the borrow checking to runtime.

Tell me which approach happens to solve it for you!

MTRNord commented 11 months ago

I didnt test this yet but it compiles. I am also not entirely sure yet which way around I want the tokio::spawn and loop. :D Either could run in the tokio thread. In the end it probably doesn't matter too much here.

match receiver {
        Ok(receiver) => {
            let receiver = Arc::new(Mutex::new(receiver));
            let receiver_clone = Arc::clone(&receiver);

            tokio::spawn(async move {
                tokio::signal::ctrl_c()
                    .await
                    .expect("failed to listen for ctrl-c event");
                info!("Received ctr-c. Cleaning up");
                receiver_clone
                    .lock()
                    .await
                    .save()
                    .expect("Unable to save queue");
                exit(0);
            });

            loop {
                let mut receiver_lock = receiver.lock().await;
                let data = receiver_lock.recv().await;

                match data {
                    Ok(data) => {
                        let email_bytes = &*data;
                        let email_json = serde_json::from_slice::<EmailPayload>(email_bytes)?;

                        if let Err(e) = send_email_job(&email_json).await {
                            tracing::error!(
                                "Error while sending email: {:?}. Adding it to the queue again",
                                e
                            );
                            // FIXME: This can race the lock leading to an error. We should
                            //        probably handle this better.
                            let mut sender = Sender::open(config.task_folder.clone())?;
                            let json_bytes = serde_json::to_vec(&email_json)?;
                            sender.send(json_bytes).await?;
                        }
                        // Mark the job as complete
                        data.commit()?;
                    }
                    Err(e) => {
                        tracing::error!("Error while receiving data from receiver: {:?}", e);
                    }
                }
            }
        }
        Err(e) => {
            error!("Unable to open receiver: {:?}. Giving up.", e);
            Ok(())
        }
    }

I will close this when I confirmed that this actually runs too :) Thanks for the help!

MTRNord commented 11 months ago

Hm turns out apart from systemd actually using SIGTERM not SIGINT the lock() actually gets never free. So the cleanup never happens and it just gets stuck until systemd does SIGKILL :D

Edit: working on fixing this in the above code

MTRNord commented 11 months ago

Ooops just realised that I was blocking on the receivers.recv() "forever". So that will obviously cause issues.

I went with this solution now:

https://github.com/erooster-mail/erooster/blob/bb0133ec579cc0074ded6d6f1d794db1c01382d1/crates/erooster_smtp/src/servers/mod.rs#L92-L168

That seems to work nicely :)

tokahuke commented 11 months ago

You are welcome!

tokahuke commented 11 months ago

Oh, BTW... https://github.com/erooster-mail/erooster/blob/bb0133ec579cc0074ded6d6f1d794db1c01382d1/crates/erooster_smtp/src/servers/mod.rs#L103 you can use Receiver::recv_timeout here instead.