tokahuke / yaque

Yaque is yet another disk-backed persistent queue for Rust.
Other
81 stars 11 forks source link

Stream or Iterator? #26

Open MTRNord opened 1 year ago

MTRNord commented 1 year ago

Hi,

on first sight I am wondering if it would make sense to have an iterator/stream to receive the data. Currently I would need to call it in a loop but a stream might be more optimized. I dont have any metrics or something to justify this assumption but its a gut feeling that it would be a nice addition.

MTRNord commented 1 year ago

Basically my scenario is that I do this:

 // Start listening for tasks
    let mut receiver = Receiver::open(config.task_folder.clone())?;

    loop {
        let data = receiver.recv().await;

        match data {
            Ok(data) => {
                let email_bytes = &*data;
                let email_json = serde_json::from_slice::<EmailPayload>(email_bytes)?;

                if let Err(e) = send_email_job(data, email_json).await {
                    tracing::error!("Error while sending email: {:?}", e);
                }
            }
            Err(e) => {
                tracing::error!("Error while receiving data from receiver: {:?}", e);
            }
        }
    }
tokahuke commented 1 year ago

That looks like a reasonable request. I think that I did not include this at first because of the RecvGuard type (I remember some borrow check complaints back then). Let me see if I can give this a try today. Just to be sure, you would like to have:

use futures::StreamExt;

let mut receiver = Receiver::open(config.task_folder.clone())?;

    while let Some(data) =  receiver.next().await {
        match data {
            Ok(data) => {
                let email_bytes = &*data;
                let email_json = serde_json::from_slice::<EmailPayload>(email_bytes)?;

                if let Err(e) = send_email_job(data, email_json).await {
                    tracing::error!("Error while sending email: {:?}", e);
                }
            }
            Err(e) => {
                tracing::error!("Error while receiving data from receiver: {:?}", e);
            }
        }
    }

Like so? Or even, perhaps use the adaptors of TryStreamExt, right?

MTRNord commented 1 year ago

yeah something along those lines would be great to have :)

tokahuke commented 1 year ago

So... nothing is ever as simple as it seems. So, the idea behind yaque is that you open a transaction every time you take elements from the queue. Then, you need to call a commit operation to actually mark the elements as read (btw., that is missing in your example. Just check your code, otherwise you will always read the same element over and over again).

So, supposing you could care less on what happens on failure in your service, this should solve your problems:

pub fn stream(receiver: Receiver) -> impl Stream<Item=io::Result<Vec<u8>>> {
    futures::stream::try_unfold(receiver, |mut receiver| async move {
        let guard = receiver.recv().await?;
        let item = guard.try_into_inner()?;
        Ok(Some((item, receiver)))
    })
}

This will read and commit the read. You will not be able to go back. I could try to stream the RecvGuards, but it will require some unsafe shenanigans.

Tl;DR: what is your usecase in terms of failure?

(Edit: no, the RecvGuard idea is just unsound. RecvGuard has a mutable reference to the Receiver)

MTRNord commented 1 year ago

btw., that is missing in your example.

It's hidden in the send_email_job method :) That one calls it if it was successful. That's intentional, as it does network ops, so I want it to basically treat it as a rollback in case of failure. (Unless I didn't get that right here?)

Tl;DR: what is your usecase in terms of failure?

So the use I have here is an SMTP sender queue. So the idea is "try again until it works" essentially. That's a bit of a simplification of what SMTP in reality expects here, but it does the job. To add some context, I am using this inside of a rust mail server, I am writing. :)

A failure of the send_email_job essentially can mean both the server didn't accept it or the connection failed at the time. I likely should differentiate these, but for now I don't.

However now that I read this I wonder if I should instead requeue things to be at the end rather than retrying directly and possibly locking up the code on a failure.

MTRNord commented 1 year ago

For context my full code of where I use this is currently at https://github.com/erooster-mail/erooster/blob/main/crates/erooster_smtp/src/servers/mod.rs#L68-L87 and https://github.com/erooster-mail/erooster/blob/main/crates/erooster_smtp/src/servers/sending.rs#L276

The commit call is at https://github.com/erooster-mail/erooster/blob/main/crates/erooster_smtp/src/servers/sending.rs#L402

Be warned its not the prettiest currently as it didnt get much love yet :D (Its also migrated from sqlxmq which sadly uses features pgbouncer didnt support)

tokahuke commented 1 year ago

I see. So... I'm afraid it can't be done the Stream way, because RecvGuard takes a &mut Receiver internally (and it needs, too: access to the recv side of the queue must be exclusive). The Stream and Iterator traits don't allow for this kind of pattern: calling next() twice would yield two RecvGuards pointing to the same Receiver.

However, I agree that we could try an think about ergonomic solutions for your usecase. Perhaps a Receiver::for_each?

About your implementation, what happens if your server can never send the e-mail, for some reason? Since the access to the queue is exclusive, that would clog the queue, right?

MTRNord commented 1 year ago

However, I agree that we could try an think about ergonomic solutions for your usecase. Perhaps a Receiver::for_each?

hm yeah that might work here as well.

About your implementation, what happens if your server can never send the e-mail, for some reason? Since the access to the queue is exclusive, that would clog the queue, right?

Right. Thats something I didnt consider. I wonder if I should requeue failing messages at the end or maybe even in a secondary queue which is like a "slow lane". I think at the very least I should requeue instead of not committing on failure. And then probably consider the various failure cases I have and possibly consider multiple queues here. Nice would be to have gradual backoff for my usecase so thats probably what I will spend my time on tomorrow :) Thanks for the input!

tokahuke commented 1 year ago

So, an idea for you is to implement a Dead letter queue. Create a secondary queue and dump all faulty e-mails there. Then select from both queues to run the service.

< rant > This is actually a very common pattern in this kind of system. When I created yaque, I had the idea of creating something a bit more low level, which one could use to build other stuff on top (Write ahead logs and other DB stuff, some Kafkaesque systems and, of course, service queues). Think of how axum is built on top of hyper. Same idea!

So, perhaps, time has come to write a proper service queue on top of yaque. I have some other fish to fry atm, so, you may take the lead (if you are interested), or we could doing it "with foru hands", if you like! < /rant >

MTRNord commented 1 year ago

Yeah seems like I probably want that or maybe even different kinda of queues since mail has cases where you just have to wait for a while. So it might make sense to have a few. I also realised I don't recover from fails/exits yet :D so that's also fixed now. Though I now added a possible sender lockfile race 🙈. So yeah I will take tomorrow actually thinking about it some more and trying a few things :)

VanuPhantom commented 1 year ago

So, an idea for you is to implement a Dead letter queue. Create a secondary queue and dump all faulty e-mails there. Then select from both queues to run the service.

< rant > This is actually a very common pattern in this kind of system. When I created yaque, I had the idea of creating something a bit more low level, which one could use to build other stuff on top (Write ahead logs and other DB stuff, some Kafkaesque systems and, of course, service queues). Think of how axum is built on top of hyper. Same idea!

So, perhaps, time has come to write a proper service queue on top of yaque. I have some other fish to fry atm, so, you may take the lead (if you are interested), or we could doing it "with foru hands", if you like! < /rant >

Although your rant about Yaque being lower-level does make sense, your library does actually have some amazing high-level applications on its own! For example, I'm currently developing something slightly akin to a DLQ for an IoT gateway. Yaque seems interesting specifcally because of its transaction system; I only want changes to the queue to be committed if the delivery of a message taken from the queue succeeds. (I'm forwarding messages coming in through a named pipe from other local services to a server.)