tembo-io / pgmq

A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.
PostgreSQL License
2.52k stars 55 forks source link

Consider supporting FIFO + message keys #294

Open fggrtech opened 1 month ago

fggrtech commented 1 month ago

Firstly, this project is real neat.

Any thoughts on supporting FIFO queues with message key values, similar to SQS FIFO + MessageGroupId ?

Riffing off your existing work, here are the prototype functions which illustrate the idea:

CREATE TYPE message_record AS (
    msg_id BIGINT,
    read_ct INTEGER,
    enqueued_at TIMESTAMP WITH TIME ZONE,
    vt TIMESTAMP WITH TIME ZONE,
    group_id UUID,
    message JSONB
);

CREATE FUNCTION create_queue(queue_name TEXT)
RETURNS void AS $$
BEGIN

  EXECUTE FORMAT(
    $QUERY$
    CREATE TABLE IF NOT EXISTS %s (
        msg_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
        read_ct INT DEFAULT 0 NOT NULL,
        enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
        vt TIMESTAMP WITH TIME ZONE NOT NULL,
        group_id UUID NOT NULL,
        message JSONB NOT NULL
    )
    $QUERY$,
    queue_name
  );

  EXECUTE FORMAT(
    $QUERY$
    CREATE INDEX IF NOT EXISTS %s_vt_idx ON %s (vt ASC);
    $QUERY$,
    queue_name, queue_name
  );

  EXECUTE FORMAT(
    $QUERY$
    CREATE INDEX IF NOT EXISTS %s_group_id_order ON %s (group_id, msg_id);
    $QUERY$,
    queue_name, queue_name
  );

END;
$$ LANGUAGE plpgsql;

CREATE FUNCTION enqueue(
    queue_name TEXT,
    group_id UUID,
    msg JSONB,
    delay INTEGER DEFAULT 0
) RETURNS SETOF BIGINT AS $$
DECLARE
    sql TEXT;
BEGIN
    sql := FORMAT(
        $QUERY$
        INSERT INTO %s (vt, group_id, message)
        VALUES ((clock_timestamp() + interval '%s seconds'), '%s'::uuid, $1)
        RETURNING msg_id;
        $QUERY$,
        queue_name, delay, group_id
    );
    RETURN QUERY EXECUTE sql USING msg;
END;
$$ LANGUAGE plpgsql;

CREATE FUNCTION dequeue(
    queue_name TEXT,
    vt INTEGER,
    qty INTEGER
)
RETURNS SETOF message_record AS $$
DECLARE
    sql TEXT;
BEGIN
    sql := FORMAT(
        $QUERY$

        WITH cte0 AS 
        (
          SELECT group_id, MIN(msg_id) AS msg_id
          FROM %s
          GROUP BY group_id
        ),
        cte1 AS
        (
          SELECT t1.msg_id AS msg_id
          FROM %s AS t1
          JOIN cte0 AS t2
            ON t1.group_id = t2.group_id
            AND t1.msg_id = t2.msg_id
          WHERE vt <= clock_timestamp()
          ORDER BY msg_id ASC
          LIMIT $1
          FOR UPDATE SKIP LOCKED
        )

        UPDATE %s m
        SET
            vt = clock_timestamp() + interval '%s seconds',
            read_ct = read_ct + 1
        FROM cte1
        WHERE m.msg_id = cte1.msg_id
        RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.group_id, m.message;
        $QUERY$,
        queue_name, queue_name, queue_name, vt
    );
    RETURN QUERY EXECUTE sql USING qty;
END;
$$ LANGUAGE plpgsql;
SELECT * FROM create_queue('test');

-- Two enqueued messages with the same group id. These should respect FIFO rules.
SELECT * FROM enqueue('test', '0470da73-c6a4-4273-aa31-f48b05555539', '{"email-address": "john.homeowner-1@yopmail.com"}');
SELECT * FROM enqueue('test', '0470da73-c6a4-4273-aa31-f48b05555539', '{"email-address": "john.homeowner-1@yopmail.com", "phone-number": 5551112222}');

-- Third enqueued message with different group id.
SELECT * FROM enqueue('test', gen_random_uuid(), '{"email-address": "john.homeowner-2@yopmail.com}");

-- This should return message id 1 and 3. Message id 2 will NOT be available until 1 has been deleted.
SELECT * FROM dequeue('test', 5, 5);

Thoughts?

ChuckHend commented 1 month ago

Hi @fggrtech , thank you. I'd like to see features like this make it into the project too.

I'll have to think a bit about how this would impact the existing queue APIs and tables. I don't think we'd want to break the simple queue structure that currently exists and ideally we could find a way to add this functionality without adding overhead or requiring the usage of the new keys, but worst case we add a new API specifically for FIFO.