janbjorge / PgQueuer

PgQueuer is a Python library leveraging PostgreSQL for efficient job queuing.
https://pgqueuer.readthedocs.io/en/latest/index.html
MIT License
211 stars 2 forks source link

Retry Timer for Job Dequeue #26

Closed janbjorge closed 1 month ago

janbjorge commented 1 month ago

Adds a retry_timer parameter to the dequeue method. If specified, this selects jobs that have been in 'picked' status for longer than the specified duration. If None, the retry timer job checking is skipped.


The performance of the new query plan is on par with the current implementation, with a slight improvement of a few percentage points. The plan efficiently handles the selection and update of jobs from the queue with minimal overhead and optimized buffer usage.

analyze (verbose) pgqueuer;

explain (analyze, buffers)

WITH next_job_queued AS (
    SELECT id
    FROM pgqueuer
    WHERE status = 'queued'
    ORDER BY priority DESC, id ASC
    FOR UPDATE SKIP LOCKED
    LIMIT 10
),
next_job_timeout AS (
    SELECT id
    FROM pgqueuer
    WHERE status = 'picked' and updated < NOW() - INTERVAL '30 seconds'
    ORDER BY id ASC
    FOR UPDATE SKIP LOCKED
    LIMIT 10
),
combined_jobs AS (
    SELECT DISTINCT id
    FROM (
        SELECT id FROM next_job_queued
        UNION ALL
        SELECT id FROM next_job_timeout
    ) AS combined
),
updated AS (
    UPDATE pgqueuer
    SET status = 'picked', updated = NOW()
    WHERE id = ANY(SELECT id FROM combined_jobs)
    RETURNING *
)
SELECT * FROM updated ORDER BY priority DESC, id ASC
time PGUSER=testuser PGPASSWORD=testpassword PGDATABASE=testdb psql -h 127.0.0.1 -f foo.sql                                    
psql:foo.sql:1: INFO:  analyzing "public.pgqueuer"
psql:foo.sql:1: INFO:  "pgqueuer": scanned 4413 of 4413 pages, containing 472150 live rows and 0 dead rows; 30000 rows in sample, 472150 estimated total rows
ANALYZE
                                                                                  QUERY PLAN                                                                                   
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Sort  (cost=100.90..100.93 rows=11 width=92) (actual time=0.151..0.153 rows=10 loops=1)
   Sort Key: updated.priority DESC, updated.id
   Sort Method: quicksort  Memory: 25kB
   Buffers: shared hit=139 dirtied=2 written=1
   CTE next_job_queued
     ->  Limit  (cost=0.42..1.19 rows=10 width=14) (actual time=0.016..0.021 rows=10 loops=1)
           Buffers: shared hit=14
           ->  LockRows  (cost=0.42..36199.30 rows=472150 width=14) (actual time=0.015..0.019 rows=10 loops=1)
                 Buffers: shared hit=14
                 ->  Index Scan Backward using pgqueuer_priority_id_id1_idx on pgqueuer  (cost=0.42..31477.80 rows=472150 width=14) (actual time=0.008..0.009 rows=10 loops=1)
                       Filter: (status = 'queued'::pgqueuer_status)
                       Buffers: shared hit=4
   CTE next_job_timeout
     ->  Limit  (cost=0.13..5.91 rows=1 width=18) (actual time=0.003..0.004 rows=0 loops=1)
           Buffers: shared hit=2
           ->  LockRows  (cost=0.13..5.91 rows=1 width=18) (actual time=0.003..0.003 rows=0 loops=1)
                 Buffers: shared hit=2
                 ->  Index Scan Backward using pgqueuer_updated_id_id1_idx on pgqueuer pgqueuer_1  (cost=0.13..5.90 rows=1 width=18) (actual time=0.003..0.003 rows=0 loops=1)
                       Index Cond: (updated < (now() - '00:00:30'::interval))
                       Filter: (status = 'picked'::pgqueuer_status)
                       Buffers: shared hit=2
   CTE updated
     ->  Update on pgqueuer pgqueuer_2  (cost=0.73..93.39 rows=11 width=46) (actual time=0.102..0.141 rows=10 loops=1)
           Buffers: shared hit=139 dirtied=2 written=1
           ->  Nested Loop  (cost=0.73..93.39 rows=11 width=46) (actual time=0.045..0.057 rows=10 loops=1)
                 Buffers: shared hit=56
                 ->  Subquery Scan on combined_jobs  (cost=0.30..0.52 rows=11 width=32) (actual time=0.036..0.039 rows=10 loops=1)
                       Buffers: shared hit=16
                       ->  HashAggregate  (cost=0.30..0.41 rows=11 width=4) (actual time=0.031..0.033 rows=10 loops=1)
                             Group Key: next_job_queued.id
                             Batches: 1  Memory Usage: 24kB
                             Buffers: shared hit=16
                             ->  Append  (cost=0.00..0.28 rows=11 width=4) (actual time=0.018..0.028 rows=10 loops=1)
                                   Buffers: shared hit=16
                                   ->  CTE Scan on next_job_queued  (cost=0.00..0.20 rows=10 width=4) (actual time=0.017..0.023 rows=10 loops=1)
                                         Buffers: shared hit=14
                                   ->  CTE Scan on next_job_timeout  (cost=0.00..0.02 rows=1 width=4) (actual time=0.004..0.004 rows=0 loops=1)
                                         Buffers: shared hit=2
                 ->  Index Scan using pgqueuer_pkey on pgqueuer pgqueuer_2  (cost=0.42..8.44 rows=1 width=10) (actual time=0.001..0.001 rows=1 loops=10)
                       Index Cond: (id = combined_jobs.id)
                       Buffers: shared hit=40
   ->  CTE Scan on updated  (cost=0.00..0.22 rows=11 width=92) (actual time=0.103..0.144 rows=10 loops=1)
         Buffers: shared hit=139 dirtied=2 written=1
 Planning:
   Buffers: shared hit=100
 Planning Time: 0.283 ms
 Trigger tg_pgqueuer_changed: time=0.608 calls=1
 Execution Time: 0.825 ms
(48 rows)