tembo-io / pgmq

A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.
PostgreSQL License
2.55k stars 58 forks source link

DelaySeconds #283

Closed rgarcia closed 1 month ago

rgarcia commented 1 month ago

DelaySeconds is a useful SQS feature that allow you to send a message but only let consumers see it after a delay period. This is useful for cases where you want to requeue some work, e.g. you have a consumer that checks something every 10 minutes, so when the consumer is finished processing and deleting a message it sends a new message with a 10 minute DelaySeconds. I believe SQS limits it to a maximum of 900 seconds (15 mins).

jason810496 commented 1 month ago

Hi @rgarcia,

Though I am not the official maintainer of PGMQ, here is my workaround solution for an exponential backoff retry mechanism. The solution utilizes read with the vt option and set_vt with msg_id.

Here is an example using the pgmq-sqlalchemy Python client package.

Reference: https://pgmq-sqlalchemy.readthedocs.io/en/latest/api-reference.html#pgmq_sqlalchemy.PGMQueue.set_vt

from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import Message

def _exp_backoff_retry(msg: Message) -> int:
    # Exponential backoff retry
    if msg.read_ct < 5:
        return 2 ** msg.read_ct
    return 2 ** 5

def consume_with_backoff_retry(pgmq_client: PGMQueue, queue_name: str):
    msg = pgmq_client.read(
        queue_name=queue_name,
        vt=1000,  # Set vt to 1000 seconds temporarily
    )
    if msg is None:
        return

    # Set exponential backoff retry
    pgmq_client.set_vt(
        queue_name=queue_name,
        msg_id=msg.msg_id,
        vt_offset=_exp_backoff_retry(msg)
    )
ChuckHend commented 1 month ago

Hello!

There is a delay parameter on send(). This example makes the message unable to be read for 10 seconds after it reaches the queue. The default value for delay is zero, so if we dont set the value it just becomes immediately consumable.

select pgmq.send(
  queue_name => 'myqueue',
  msg => '{"hello": "world"}',
  delay => 10
);
rgarcia commented 1 month ago

Oh nice didn't see that! That is exactly what I am looking for. Thanks

On Sat, Aug 3 2024 at 1:44 PM, Adam Hendel @.***> wrote:

Hello!

There is a delay parameter on send(). This makes the message unable to be read for 10 seconds after it reaches the queue. The default value https://github.com/tembo-io/pgmq/blob/0964892d71f5cb637c2cfefb9ade4e7799ca9cf8/pgmq-extension/sql/pgmq.sql#L246 for delay is zero, so if we dont set the value it just becomes immediately consumable.

select pgmq.send( queue_name => 'myqueue', msg => '{"hello": "world"}', delay => 10 );

— Reply to this email directly, view it on GitHub https://github.com/tembo-io/pgmq/issues/283#issuecomment-2267083968, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAARXT3JKT3QJMRIZOLA44TZPUJJXAVCNFSM6AAAAABL5O5OY2VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDENRXGA4DGOJWHA . You are receiving this because you were mentioned.Message ID: @.***>