DavidBM / rsmq-async-rs

RSMQ port to async rust. RSMQ is a simple redis queue system that works in any redis v2.6+
MIT License
43 stars 8 forks source link

transaction interface #21

Open GopherJ opened 1 month ago

DavidBM commented 1 month ago

Transactions on Redis are kind of tricky. Turns out that Redis doesn't support rollback 1, so if something fails, you still end with a bad state.

I assume your ideal case is to be able to consume and send an event transactionally. Or to be able to send more than one event in a transaction.

In that case, you want the consumption to rollback if there is an error. You can simulate that with the timeout, so the message is only hidden for a couple of secs.

In the second case, to be able to emit several events, Redis lack of guarantees makes it impossible to make it work, because in case of error on the Redis side, it won't roll back.

What is the use case you want to support? Maybe is a use case different from the ones I describe?

GopherJ commented 1 month ago

I kind need send_message to be atomic, so eventually a two phase thing should work. send_message in a for loop then commit

need this in financial use case, I created this issue to take a note but redis-rs client doesn't have a good transaction support, at least it's not very easy to use

GopherJ commented 1 month ago

maybe a way is to write in lua sendMessages.lua etc, that should be interesting

OR pipeline many messages and make it atomic. I'm not sure what's the best yet

GopherJ commented 1 month ago

a working one is on: https://github.com/GopherJ/rsmq-sync/blob/feat/send-multiple-messages-using-lua/src/redis-scripts/sendMessage.lua

local ns = KEYS[1]
local qname = KEYS[2]
local ts_delay = tonumber(KEYS[3])
local queue_uids = cjson.decode(KEYS[4])
local messages = cjson.decode(KEYS[5])
local realtime = tonumber(KEYS[6])

local key = ns .. qname
local queue_key = ns .. qname .. ':Q'
local realtime_key = ns .. ':rt:' .. qname

for i = 1, #queue_uids do

redis.call('ZADD', key, ts_delay, queue_uids[i])
redis.call('HSET', queue_key, queue_uids[i], messages[i])

end

local result = redis.call('HINCRBY', queue_key, 'totalsent', #queue_uids)

if realtime == 1 then
    result = redis.call('ZCARD', key)
    redis.call('PUBLISH', realtime_key, result)
end

return result
DavidBM commented 1 month ago

Does the pipeline work for you? We could build something on that case.

For for a financial case, you might not want to use a library like this one. Guarantees are quite low for handling money.