Open tekumara opened 1 month ago
To mimic SQS's reservation behaviour I've limited the concurrency:
action_options:
ReceiveSQS:
max_concurrent_actions: 1
Although in practice the concurrency in my system is higher, I don't have interactions between consumers so this accurately models the system's state I think ... but open to other ideas!
process
has been left as non-atomic, so it can interleave with other actions.
The fundamental issue is, after processing a message, sqs_fifo.pop(0) removes the first message in the queue. But, you should actually remove the processed message from the queue.
That's you'll need a message id to indicate the message that just be removed. (Sqs requires the receipthandle you got from the receive request. For simplify, you can just use the message id here)
This will ensure, even if the same message was sent to the receiver twice (probably due to the visibility timeout) you delete only that message.
-- In send, sqs_fifo.append(record(msg_id=next_id, score=5, message='Hello')) next_id += 1
In receive, , msg=sqs_fifo[0]
sqs_fifo.remove(msg)
I see... that would be a model that's closer to how SQS works, but I still have the same problem that if ReceiveSQS
runs concurrently, and process
is non-atomic, then two threads can get the same message and one of them will delete it before the other, eg:
---
options:
max_actions: 3
---
action Init:
sqs_fifo = []
msg_id = 0
db = []
atomic action Send:
sqs_fifo.append(record(msg_id=msg_id, score=5, message='Hello'))
msg_id +=1
atomic action ReceiveSQS:
if sqs_fifo:
msg = sqs_fifo[0]
process(msg)
oneof:
sqs_fifo.remove(msg)
pass # Do nothing i.e, in this case, don't remove the message
fair action Increment:
v = db
v.append("i")
db = v
func process(message):
v = db
v.append("p")
db = v
now errors with:
E1011 20:33:27.726531 87310 starlark.go:55] Error executing stmt: remove: element not found
panic: Line 21: Error executing statement: sqs_fifo.remove(msg, )
Sorry, I just got back from vacation.
Actually, you might need to atomically check if the first entry is still the msg that was processed.
oneof:
atomic:
if sqs_fifo and sqs_fifo[0] == msg:
sqs_fifo.remove(msg)
pass # Do nothing i.e, in this case, don't remove the message
Alternatively, that's could be expressed in a single line
oneof:
sqs_fifo.remove(msg) if sqs_fifo and sqs_fifo[0] == msg else None
pass # Do nothing i.e, in this case, don't remove the message
Does this answer your question?
I see.. so that will avoid the error during deletion, but would that still allow two concurrent actions to receive the same message? To avoid that I guess I need max_concurrent_actions: 1
.
The following:
Will crash with:
If I use:
It won't crash, because
process
will no longer interleave withIncrement
.Is there a way to capture the reservation behaviour of SQS (ie: a message can only be received by a single consumer at a time) whilst also having a non-atomic
process
(because in practice, my system does not atomically performprocess
)?Thank you!