malthe / pq

A PostgreSQL job queueing system
376 stars 41 forks source link

Doesn't work with PgBouncer #30

Open omad opened 6 years ago

omad commented 6 years ago

Unfortunately pq seems to be non-functional when used with PgBouncer, especially combined with threads. I haven't done any comprehensive testing yet sorry.

1) Prepared statements seem to persist between connections somehow, so errors like: psycopg2.ProgrammingError: prepared statement "_count_queue_test" already exists occur.

2) Prepared statements will then continue to persist even if a new connection is created. Including from a completely new connection. Visible by running select * from pg_prepared_statements;

3) I've also seen the opposite error: psycopg2.OperationalError: prepared statement "_pull_item_queue_test_queue" does not exist

Tested with both the included tests, and a small threaded example, where PgBouncer is running on port 6432:

import threading
import time
from pq import PQ
from psycopg2.pool import ThreadedConnectionPool

pool = ThreadedConnectionPool(2, 5, 'dbname=test host=localhost port=6432')
pq = PQ(pool=pool)
def source(queue):
    """thread worker function"""
    queue = queue['test']
    for i in range(5):
        queue.put({'num': i})
        time.sleep(0.5)

def sink(queue):
    queue = queue['test']
    for job in queue:
        if job is None:
            break

        print('sink got %s' % job.data)

mysource = threading.Thread(target=source, args=(pq,))
mysink = threading.Thread(target=sink, args=(pq,))

mysource.start()
mysink.start()
malthe commented 6 years ago

This has to do with the way you've configured pgBouncer to deal with connections. It keeps a pool of connections open and randomly assigns one to your query.

I don't know the specifics of how it forwards your queries to actual connections.

As for the prepared statements: they are created on-demand. But it might be interesting to have a way to initialize a new connection and just prepare all statements eagerly. That should fix your problem.

omad commented 6 years ago

Unfortunately I don't have direct control over the PgBouncer configuration. Fortunately I can bypass it and connect directly, which will work okay for now.

According to this page in the PgBouncer FAQ, it's necessary to avoid using prepared statements at all when it's configured in transaction pooling mode. It would be great to be able to switch off the use of prepared statements for cases like this.

We're looking into using pq in a fairly big way, and if it proves successful we could look into implementing a flag to disable the prepared extensions when instantiating PQ().