craigpastro / pgmq-go

A Go (Golang) client for Postgres Message Queue (PGMQ)
MIT License
42 stars 2 forks source link

Add transactional enqueuing #53

Open nathankawalec opened 2 months ago

nathankawalec commented 2 months ago

It would be very beneficial to be able to queue messages in a transaction, similar to what riverqueue offers (https://pkg.go.dev/github.com/riverqueue/river#Client.InsertTx), this would completely mitigate errors that could arise if data is added to a db and a crash occurs before the message could be queued, or vice versa. This can be done either by returning a pointer to a tx or by accepting a tx as an input parameter when queuing a message. If this is a feature you could add it would be very appreciated :)

craigpastro commented 2 months ago

Hi @nathankawalec! There is SendBatch, but perhaps this doesn't work for your use case? If so, then perhaps a SendTx makes sense. I'm happy to review a PR, otherwise, I will attempt an implementation when I have some spare time. Thanks!

nathankawalec commented 1 month ago

Sorry for the late response. Upon further review, I think all that is required is something like: NewFromTx() *PGMQ { ... }

Only issue I can think of is pgxpool transaction doesn't implement Close or Ping methods, which would require end user to wrap the transaction object in order to implement those methods (with no op)

I believe Close method called on rows in SendBatchWithDelay, ReadBatch, ArchiveBatch, and DeleteBatch can be safely called in a transaction with more work to be done, but I may be mistaken.

I think ideally it would be possible to use each method inside a transaction.

nathankawalec commented 1 month ago

Oops, didn't mean to close.

SoftExpert commented 1 month ago

My 2 cents here: with commit 1e953ef it became clear that the transaction is fully supported by pgmq. Perhaps the internal code of the GO client will be a bit more complicated, in order to call the correct APIs of pgmq ...