Open germainc opened 11 months ago
I'm glad you brought this up. This is a deficiency I haven't found a clean solution for. Without an external action to "wake up" the queue, a message may not be retried immediately after its retry delay has elapsed.
My best idea so far is to rely on a cron task to periodically run a procedure to sweep any of the stragglers. Last I checked, PostgreSQL doesn't have a built-in cron solution, but there are some 3rd party solutions. In the meantime, I think I'll at least write the procedure and document this in the README.
I'm open to other ideas, should you have any.
I was trying to come up with some way to have a function that is always waiting for the earliest future "not_until_time" if one exists. Possibly using an advisory lock, or something similar, to try to limit that function to a single instance. When the "message_waiting" table is changed, the function runs with the earliest future time, sleeps until that time hits, then tries to run anything waiting in the queue.
Whoops, didn't mean to close the ticket.
I came up with a procedure that can be called with a timer:
https://github.com/perfectsquircle/pg_mq/pull/10
This at least provides a workaround. I don't love it though because it flies in the face of the "No Polling" thing.
I'm intrigued by your idea, but I'm not sure how it would be implemented.
When the "message_waiting" table is changed...
Would this be a trigger? If it's a trigger it's execution must eventually end, so I'm not sure how to integrate a sleep. If you have time, could you provide a rough example?
I haven't tried the procedure I wrote below so I don't know if it's sound, but this is roughly what I was thinking:
CREATE OR REPLACE PROCEDURE mq.future_run()
LANGUAGE plpgsql
AS $$
DECLARE
future_not_until_time BIGINT
BEGIN
-- get the earliest future not_until_time from the message_waiting table
SELECT MIN(extract(EPOCH FROM mw.not_until_time)::BIGINT) INTO future_not_until_time
FROM mq.message_waiting mw
WHERE mw.not_until_time > now();
IF future_not_until_time IS NULL THEN
RETURN NULL
END
-- attempt to set an advisory lock on that timestamp
has_lock := SELECT pg_try_advisory_lock(future_not_until_time);
-- return if it's already locked (another instance is already waiting to run)
IF has_lock IS NOT TRUE THEN
RETURN NULL
END
{sleep until future_not_until_time}
{call the function to match waiting messages}
-- unlock that timestamp value
SELECT pg_advisory_unlock(future_not_until_time);
END
$$;
If you nack a message with a delay, it won't retry unless a new message comes in or a new channel is added after the "not_until_time". So a 5 minute retry delay could turn into 5 hours for systems that have long periods of downtime. Any ideas on how to solve this?