tembo-io / pgmq

A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.
PostgreSQL License
2.23k stars 42 forks source link

Transaction Support (Rust) #257

Open haydenflinner opened 2 weeks ago

haydenflinner commented 2 weeks ago

Background

One of the reasons for choosing to use postgres as your message queue rather than a second system as queue is that you don't have to worry about idempotence. That is, if all of the state for your application is in this one Postgres instance, you can do something like

let transaction = conn.begin_transaction()
let msg = pgmq.read_and_remove(transaction)
modify_pg_bizlogic_tables(msg, transaction)
transaction.end()

And if you fail at any point, the transaction gets rolled back, and you're all good. This is a strictly safer approach than the one presented by having a separate msg queue, which is that you modify your biz logic tables, and then you commit the msg read in the queue (in pgmq, this would be pgmq.archive), and you cross your fingers that either (1) you don't crash between doing the side effects and committing the read or (2) your side effects are idempotent, it won't matter if you read the same msg again.

Solution

If pgmq supported receiving an ongoing transaction rather than managing its own connection, this would be trivial, I could just begin the transaction, pass it in to pgmq and otherwise use it as normal, and commit the transaction when I'm done.

ChuckHend commented 2 weeks ago

Thanks @haydenflinner, just to clarify you are saying this is a limitation with the Rust client library, not the extension itself, correct? You should be able to get the desired behavior in Rust by executing SQL directly.

haydenflinner commented 2 weeks ago

Right; this could be done in plain SQL, it's just the Rust client library doesn't support this usage with the convenient API.

I may give this little refactor a shot today, found another example. Biggest hurdle will be me figuring out how to vendor the code for my own usage 😄 I guess I could fork this repo and give Cargo the URL of my fork, but that would be a pretty slow feedback loop!

v0idpwn commented 2 weeks ago

I agree pgmq-rs should support easily wrapping statements in a transaction. In the elixir library, one can simply:

MyApp.Repo.transaction(fn ->
  MyApp.Pgmq.send("q1", message)
  MyApp.Pgmq.send("q2", message)
end)

Seems hard to achieve something similar in pgmq-rs without breaking compatibility significantly :/

v0idpwn commented 1 week ago

I'd love to see the same in the python client, cc @ChuckHend @tavallaie IMO this is one of the big selling points of pgmq over queues outside the database

ChuckHend commented 1 week ago

IMO this is one of the big selling points of pgmq over queues outside the database

Totally agree.

One way that might work could be to add a parameter to allow users to pass their transaction into the call. I haven't tested it yet, but something like this:

    pub async fn send<T: Serialize>(
        &self,
        queue_name: &str,
        message: &T,
        cxn: Option<&mut sqlx::Transaction<'_, sqlx::Postgres>>
    ) -> Result<i64, PgmqError> {
        check_input(queue_name)?;
        let msg = serde_json::json!(&message);
        let prepared = sqlx::query!(
            "SELECT send as msg_id from pgmq.send($1::text, $2::jsonb, 0::integer);",
            queue_name,
            msg
        );
        let sent = match cxn {
            Some(tx) => prepared.fetch_one(&mut **tx).await?,
            None => prepared.fetch_one(&self.connection).await?
        };
        Ok(sent.msg_id.expect("no message id"))
    }

It's a breaking change, but would be an easy update for users who don't want transactions. It would be just passing in None as the last parameter.

ChuckHend commented 1 day ago

@haydenflinner , @v0idpwn, @brianpursley -- what do you think about adding an optional parameter (for the transaction) to all the functional calls? Draft over here: https://github.com/tembo-io/pgmq/pull/273 (implemented for pgmq.send() only)