psycopg / psycopg2

PostgreSQL database adapter for the Python programming language
https://www.psycopg.org/
Other
3.36k stars 505 forks source link

Notification is not added to notifies list when received during the commit #1727

Closed romank0 closed 3 weeks ago

romank0 commented 1 month ago

The issue

I'm using the recommended way to receive async notifications that is:

  1. invoke select.select to wait on the data available on the connection
  2. invoke connection.poll
  3. process connection.notifies list. Each notification is processed in a transaction

Basically the code is almost the copy and paste from the documentation except that I do not use autocommit because I manually manage transactions:

curs = conn.cursor()
curs.execute("LISTEN test;")
conn.commit()

while True:
    if select.select([conn],[],[],1) == ([],[],[]):
        print("Timeout")
    else:
        conn.poll()
        while conn.notifies:
            notify = conn.notifies.pop(0)
            print("Got NOTIFY:", notify.pid, notify.channel, notify.payload)
            # some processing that updates the DB based on the notify content
            conn.commit()

The problem is that sometimes (specifically when the notification is received during the commit) this code gets into the state that:

Because of this the code loops on the select.select and some notifications processing is delayed until the new notification comes in later.

My interpretation of what happens

I've tracked this down to the problem in the pq_commit and/or pq_execute_command_locked. Namely, according to pqlib documentation:

You should, however, remember to check PQnotifies after each PQgetResult or PQexec, to see if any notifications came in during the processing of the command.

pq_execute_command_locked does invoke PQexec and that according to my understanding reads up the data from the socket but then PQnotifies are not checked in pq_commit. So the followup invocation of select shows there is not more data to read and there is no progress in the listen loop.

Most probably this is not the only place this happens. For example in pq_get_result_async I can see that notifications are checked before executing PQgetResult but according to pqlib docs above that should happen after.

The script reproducing the issue.

import threading
import select
import psycopg2
import psycopg2.extensions
import time

listen_conn = psycopg2.connect("postgres://postgres:123@localhost:5432/postgres")
notify_conn = psycopg2.connect("postgres://postgres:123@localhost:5432/postgres")

stop_event = threading.Event()
batch_processed_event = threading.Event()  # Event to signal batch completion

def setup_db():
    """Setup the table and trigger with a sleep to simulate slow commit."""
    with listen_conn.cursor() as curs:
        # Create a test table
        curs.execute("DROP TABLE IF EXISTS test_table;")
        curs.execute(
            """
        CREATE TABLE test_table (
            id SERIAL PRIMARY KEY,
            value TEXT
        );
        """
        )

        # Create the trigger function that will induce a sleep
        curs.execute(
            """
        CREATE OR REPLACE FUNCTION slow_commit_trigger()
        RETURNS TRIGGER AS $$
        BEGIN
            PERFORM pg_sleep(0.5);  -- 500ms sleep to simulate slow commit
            RETURN NEW;
        END;
        $$ LANGUAGE plpgsql;
        """
        )

        # Create the trigger on update
        curs.execute(
            """
        CREATE TRIGGER slow_commit
        AFTER UPDATE ON test_table
        FOR EACH STATEMENT
        EXECUTE FUNCTION slow_commit_trigger();
        """
        )

        # Insert an initial row to update during the notification processing
        curs.execute(
            "INSERT INTO test_table (id, value) VALUES (1, 'initial') ON CONFLICT DO NOTHING;"
        )

    listen_conn.commit()

# Function to listen for notifications
def listen(num_notifications):
    with listen_conn.cursor() as curs:
        curs.execute("LISTEN test;")

    listen_conn.commit()

    print(f"listener: Waiting for {num_notifications} notifications on channel 'test'")

    while True:
        notifications_received = 0
        timeout_occurred = False

        start_time = time.time()
        while (
            notifications_received < num_notifications and time.time() - start_time < 10
        ):
            print("listener: select on the connection socket ....")
            if select.select([listen_conn], [], [], 1) == ([], [], []):
                print("listener:  nothing to read from socket")
                continue

            print("listener: poll after select")
            listen_conn.poll()
            while listen_conn.notifies:
                notify = listen_conn.notifies.pop(0)
                print(
                    f"listener: Got NOTIFY: pid={notify.pid}, channel={notify.channel}, payload={notify.payload}"
                )
                notifications_received += 1

                # Simulate processing by updating the table, which will trigger the delay
                with listen_conn.cursor() as curs:
                    curs.execute(
                        "UPDATE test_table SET value = 'processed' WHERE id = 1;"
                    )
                print("listener: commit (with delay due to trigger)")
                listen_conn.commit()

        # Signal the notify thread that the batch has been processed
        print("listener: Batch processed, signaling notifier to send next batch")
        batch_processed_event.set()  # Signal that the listener has processed the batch

        # If we had a timeout, we explicitly poll and check if a delayed notification was received
        if notifications_received < num_notifications:
            if listen_conn.notifies:
                assert False, "we have notifications after the timeout"
            else:
                print("listener: we don't have notifications after the timeout")
            print("listener: polling..")
            listen_conn.poll()
            if listen_conn.notifies:
                print("listener: we have notifications after the poll")
                stop_event.set()
                return
            else:
                print("listener: we don't have notifications after the poll")

# Function to send notifications
def notify(num_notifications):
    while not stop_event.is_set():
        for i in range(1, num_notifications + 1):
            print(f"notifier: Sending notification {i}/{num_notifications}")
            with notify_conn.cursor() as curs:
                curs.execute(f"NOTIFY test, 'notification {i}';")
            notify_conn.commit()
            time.sleep(0.1)

        print("notifier: waiting for listener to process notifications")

        batch_processed_event.wait()
        batch_processed_event.clear()  # Reset the event for the next batch

# Main section to run both threads
def main(num_notifications):
    setup_db()

    listener_thread = threading.Thread(
        target=listen, args=(num_notifications,), daemon=True
    )
    notifier_thread = threading.Thread(
        target=notify, args=(num_notifications,), daemon=True
    )

    listener_thread.start()
    notifier_thread.start()

    listener_thread.join()
    notifier_thread.join()

if __name__ == "__main__":
    # Specify how many notifications you want to send and process in each loop
    num_notifications = 3
    main(num_notifications)
dvarrazzo commented 1 month ago

Out of curiosity, does the same happen with psycopg 3 too?

romank0 commented 1 month ago

In the real life scenarios where this was spotted we still use psycopg2. I haven't tried the above script on psycopg3, but from the source code I can see it has the same issue that notifications are not fetched after libpq.PQgetResult. But I can't say I can follow the code completely. I'll try to test that.

romank0 commented 1 month ago

Re psycopg 3: it is not an apple to apple comparison because the API is quite different. And more importantly it seems that psycopg 3 does not allow to use the connection to execute queries while iterating over notifications or from the notification processing callback (if I try to do that the execute is blocked indefinitely).

Because of this I can't do a proper comparison but if we are talking about the commit operation specifically I can see from the stacktrace it does process notifications properly.

romank0 commented 1 month ago

The workaround that works is:

while True:
    if select.select([conn],[],[],1) == ([],[],[]):
        print("Timeout")
    else:
        need_to_poll = True
        while need_to_poll:
            need_to_poll = False
            conn.poll()
            while conn.notifies:
                notify = conn.notifies.pop(0)
                print("Got NOTIFY:", notify.pid, notify.channel, notify.payload)
                # some processing that updates the DB based on the notify content
                conn.commit()
                need_to_poll = True
dvarrazzo commented 3 weeks ago

Fixed by merging #1728. Thank you very much for reporting the problem and providing a solution!

I will wrap a release in the next few days.