malthe / pq

A PostgreSQL job queueing system
376 stars 41 forks source link

_pull_item query can be executed over non-existent q_names #14

Open 3manuek opened 8 years ago

3manuek commented 8 years ago

When pulling items over non-existent q_name, long execution queries with blocking enabled will hold unnecessary locks and introduce noise in the LISTEN/NOTIFY.

LISTEN/NOTIFY mechanism is nicely explained at Listening connections arent cheap . Also, the documentation quotes an important note regarding notify queue:

Although there is only one queue, notifications are treated as being
 database-local; this is done by including the sender's database OID
 in each notification message.  Listening backends ignore messages
 that don't match their database OID.

The above is important as is a limitation for scaling queues on the same database, as it forces to shard into databases by q_names. This can be considered as separated issues, however the mechanism is ruled by the async.c internals and it won't changes AFAIK.

Probably a q_name existence check could save failed attempts when starting _pull_item.

mhjohnson commented 8 years ago

Important clarification to add to the discussion:

The issue isn't just the length of time it takes to run the single query. It averages approximately 4ms for us - not murderous. The problem is compounded by the number of calls made and its aggregate cost.

Screenshot: alt text

The problem occurs when we use pq to iterates over get() in a loop for a non-existent q_name in the table. _This can happen in practice if old records for the qname are purged from the table. A single worker thread makes ~1000/calls per minute in this scenario in our experience. _This behavior is not present in situations where there is nothing to dequeue for q_name that still contains previously dequeued rows._

We could address this on our end by invoking a sleep when queue.get() returns None. But, we would like to ensure it can't be addressed in an upstream fashion beforehand.

Example Explains:

                                                     QUERY PLAN
 Update on tasks  (cost=115.98..124.00 rows=1 width=1122)
   CTE candidates
     ->  Limit  (cost=0.42..115.24 rows=2 width=740)
           ->  Index Scan using priority_idx2 on tasks tasks_1  (cost=0.42..4478.40 rows=78 width=740)
                 Index Cond: (q_name = 'faxer'::text)
   CTE selected
     ->  Limit  (cost=0.07..0.07 rows=1 width=24)
           ->  Sort  (cost=0.07..0.07 rows=1 width=24)
                 Sort Key: candidates.schedule_at NULLS FIRST, candidates.expected_at NULLS FIRST
                 ->  CTE Scan on candidates  (cost=0.00..0.06 rows=1 width=24)
                       Filter: (pg_try_advisory_xact_lock(id) AND ((schedule_at <= now()) OR (schedule_at IS NULL)))
   CTE next_timeout
     ->  Aggregate  (cost=0.06..0.07 rows=1 width=8)
           ->  CTE Scan on candidates candidates_1  (cost=0.00..0.05 rows=1 width=8)
                 Filter: (schedule_at >= now())
   InitPlan 4 (returns $3)
     ->  CTE Scan on next_timeout  (cost=0.00..0.02 rows=1 width=8)
   InitPlan 5 (returns $4)
     ->  CTE Scan on selected  (cost=0.00..0.02 rows=1 width=8)
   ->  Index Scan using tasks_pkey on tasks  (cost=0.56..8.58 rows=1 width=1122)
         Index Cond: (id = $4)
         Filter: (dequeued_at IS NULL)
(22 rows)