duckdb / postgres_scanner

https://duckdb.org/docs/extensions/postgres
MIT License
227 stars 36 forks source link

Can't call a procedure with postgres_execute after retrieving an attached table in a transaction #222

Closed c-meier closed 3 weeks ago

c-meier commented 5 months ago

What happens?

In a transaction, when calling a procedure (via postgres_execute) after retrieving from a table there is the following error:

duckdb.duckdb.Error: Failed to execute query "CALL test()": another command is already in progress

To Reproduce

Create procedure and table on postgres instance (db: postgresscanner):

create table log ( comment text );
create procedure test() language sql as $$ SELECT 'hello'; $$;

Launch the following python script:

import duckdb
con = duckdb.connect(database=':memory:')
tr = con.begin()
tr.execute("""ATTACH 'dbname=postgresscanner' AS s (TYPE postgres);""")
tr.sql("""SELECT * FROM s.public.log;""").fetchall()
tr.execute("CALL postgres_execute('s', 'CALL test()')")
tr.commit()

OS:

Archlinux

PostgreSQL Version:

16.2

DuckDB Version:

10.2

DuckDB Client:

Python

Full Name:

Christopher Meier

Affiliation:

HEIG-VD

Have you tried this on the latest main branch?

Have you tried the steps to reproduce? Do they include all relevant data and configuration? Does the issue you report still appear there?

c-meier commented 5 months ago

After further tests, any call to postgres_execute after a select in a transaction throws the same error:

import duckdb
con = duckdb.connect(database=':memory:')
con.execute("""ATTACH 'dbname=postgresscanner' AS s (TYPE postgres);""")
tr = con.begin()
tr.sql("""SELECT * FROM s.public.log;""").fetchall()
tr.execute("CALL postgres_execute('s', 'SELECT 1;')")
# tr.execute("CALL postgres_execute('s', 'CALL test()')")
# tr.execute("CALL postgres_execute('s', 'CREATE TABLE log2(id INT)')")
tr.commit()

Throws duckdb.duckdb.Error: Failed to execute query "SELECT 1;": another command is already in progress

Mytherin commented 5 months ago

Thanks for the report! Could you try running SET pg_connection_cache = false to see if that fixes the issue?

c-meier commented 5 months ago

Same issue if I add at the start

con = duckdb.connect(database=':memory:')
con.execute("""SET pg_connection_cache  = false""")
clayton-cc commented 3 months ago

I am also seeing the another command is already in progress when trying to use transactions.

Use case: I have an attached postgres database with a queue table, I have multiple workers running at once reading/writing/updating/deleting from the queue. I would like to interface directly between postgres and duckdb without having to pop out to Python which is executing the duckdb commands.

My postgres table has been created with:

CREATE TABLE IF NOT EXISTS queues.entity_queue (
    id SERIAL PRIMARY KEY,
    entity_type TEXT NOT NULL,
    entity_id TEXT NOT NULL,
    processing BOOLEAN DEFAULT FALSE NOT NULL,
    locked_until TIMESTAMP WITH TIME ZONE DEFAULT NULL,
    enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

I attach the remote postgres instance with ATTACH as warehouse.

My overall query flow looks like:

USE warehouse;
BEGIN;

Open a transaction on the attached postgres database

        SELECT
            *
        FROM POSTGRES_QUERY('warehouse', $$
            SELECT
                DISTINCT entity_id
            FROM
                queues.entity_queue
            WHERE
                entity_type = '{entity_type}'
                AND (
                    (processing = FALSE AND ((locked_until < CURRENT_TIMESTAMP) OR locked_until IS NULL)) 
                    OR (processing = TRUE AND locked_until < CURRENT_TIMESTAMP)
                )
            LIMIT {num_entities}
            FOR UPDATE
        $$)

Fetch num_entities of an entity_type from the queue and mark with FOR UPDATE. We will pull out the entity_ids into a python list of strs for use in the next query

CALL POSTGRES_EXECUTE('warehouse', $$
    UPDATE queues.entity_queue
    SET
        processing = TRUE,
        locked_until = NOW() + INTERVAL '2 minutes'
    WHERE
        entity_id in ({','.join(queue_entity_ids)})
$$)

Use POSTGRES_EXECUTE to execute the command and get another command is already in progress errors with or without setting pg_connection_cache = false. Attempting to do the update not within a POSTGRES_ function with or without pg_experimental_filter_pushdown set to True leads to the whole table attempting to be copied to duckdb afaict?

UPDATE warehouse.queues.entity_queue
    SET
        processing = TRUE,
        locked_until = NOW() + INTERVAL '2 minutes'
    WHERE
        entity_id in ({','.join(queue_entity_ids)})
 COPY (SELECT "id", ctid FROM "queues"."entity_queue" WHERE ctid BETWEEN '(0,0)'::tid AND '(4294967295,0)'::tid) TO STDOUT (FORMAT binary);

I would be happy to hear I am approaching this the incorrect way, but the behavior seems like it should be supported/work

Mytherin commented 3 weeks ago

This should now be fixed in https://github.com/duckdb/postgres_scanner/pull/258 - essentially there is some odd internal state in libpq that is not entirely clear to me that leads to this error message - switching from PQsendQuery to PQexec fixes the issue.