psycopg / psycopg2

PostgreSQL database adapter for the Python programming language
https://www.psycopg.org/
Other
3.35k stars 506 forks source link

Relistening to PostgreSQL notifications fails after PostgreSQL restarts #1679

Closed olof-dev closed 8 months ago

olof-dev commented 8 months ago

Thank you so much for this package!

Below is a script that demonstrates the problem. It LISTENs to notifications on a channel test_update, and prints them when they arrive. It also reconnects and re-listens after a few seconds if the PostgreSQL server is restarted. For some reason NOTIFYs no longer come through after the reconnect + re-LISTEN (but the connection works otherwise).

# test_reader.py
import asyncio
import psycopg2
import signal
import time

db_name = 'dbname'
db_user = 'dbuser'

conn: psycopg2.extensions.connection
loop: asyncio.AbstractEventLoop

def connect_to_db_and_listen():
    global conn
    conn = psycopg2.connect(f"dbname={db_name} user={db_user}")
    conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
    cursor = conn.cursor()
    cursor.execute("LISTEN test_update;")
    loop.add_reader(conn, handle_notify)
    print("Listening for notifications on channel 'test_update'.")

def handle_notify():
    print("handle_notify()")
    try:
        conn.poll()
    except psycopg2.OperationalError as e:
        # Probably the postgresql server is restarting
        print(f"Caught an OperationalError: {e}")
        print("Attempting to reconnect to the database in 5 seconds.")
        time.sleep(5)
        connect_to_db_and_listen()
        return
    while conn.notifies:
        notify = conn.notifies.pop(0)
        print(f"Got NOTIFY: {notify.pid}, {notify.channel}.")

async def main():
    global loop
    loop = asyncio.get_running_loop()
    stop = loop.create_future()
    loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
    loop.add_signal_handler(signal.SIGINT, stop.set_result, None)
    connect_to_db_and_listen()
    await stop

asyncio.run(main())

To test:

python3 test_reader.py

In psql: NOTIFY test_update; results in the Python script printing:

Listening for notifications on channel 'test_update'.
handle_notify()
Got NOTIFY: 131981, test_update.

To restart PostgreSQL: sudo systemctl restart postgresql

The Python script then outputs:

Caught an OperationalError: server closed the connection unexpectedly
        This probably means the server terminated abnormally
        before or while processing the request.

Attempting to reconnect to the database in 5 seconds.
Listening for notifications on channel 'test_update'.

So far, so good. I can confirm that the reconnection works (e.g. by issuing a SELECT 1+1; and fetching the result). But now issuing NOTIFY test_update; in psql yields no result in the Python script.

What's even stranger is that if I create another connection (creating a copy of conn called conn2 and a corresponding handle_notify2), then:

dvarrazzo commented 8 months ago

Hello,

your example is very difficult to reason on. It uses asyncio + psycopg2, which is not really a supported combination, it uses global variables, signals. I have no idea what goes on there, probably a problem at asyncio level, nothing wrong in psycopg there.

You would be much better off to move to psycopg 3. Your program, working, is as simple as:

import asyncio
import psycopg

db_name = "piro"
db_user = "piro"

async def connect_to_db_and_listen():
    conn = await psycopg.AsyncConnection.connect(
        f"dbname={db_name} user={db_user}", autocommit=True
    )
    await conn.execute("LISTEN test_update")
    print("Listening for notifications on channel 'test_update'.")
    async for notify in conn.notifies():
        print(f"Got NOTIFY: {notify.pid}, {notify.channel}.")

async def main():
    while True:
        try:
            await connect_to_db_and_listen()
        except psycopg.OperationalError as e:
            print(f"Caught an OperationalError: {e}")
            print("Attempting to reconnect to the database in 5 seconds.")
            await asyncio.sleep(5)

asyncio.run(main())

In shell 1:

piro=# notify test_update;
NOTIFY

piro=# notify test_update;
NOTIFY

piro=# select pg_terminate_backend(pid) from pg_stat_activity where query ~* 'listen' and pid != pg_backend_pid();
t

piro=# notify test_update;
NOTIFY

piro=# notify test_update;
NOTIFY

In shell 2, running python test-1679-pg3.py:

Listening for notifications on channel 'test_update'.
Got NOTIFY: 3030993, test_update.
Got NOTIFY: 3030993, test_update.
Caught an OperationalError: consuming input failed: server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
Attempting to reconnect to the database in 5 seconds.
Listening for notifications on channel 'test_update'.
Got NOTIFY: 3030993, test_update.
Got NOTIFY: 3030993, test_update.

In psycopg2 every connection is isolated, I don't believe it is a problem in psycopg but I think that something breaks probably with loop.add_reader(). Closing it here because the combination is not supported. Please, use psycopg 3 :slightly_smiling_face:

olof-dev commented 8 months ago

Thank you very much! That works great in psycopg 3 - I am going to try migrating the project over.

Btw, signal is just there to handle quitting with CTRL-C etc, and the global variables are there just to avoid having to create wrappers around the callbacks, just for this MWE. In the actual code (where the problem was found), global variables aren't used. But yes, maybe the issue is with asyncio or postgresql rather than psycopg2. In any case, onwards to psycopg 3!

dvarrazzo commented 8 months ago

In order to handle a ctrl-c and a graceful exit you can use normal Python exceptions:

import asyncio
import psycopg

db_name = "piro"
db_user = "piro"

async def connect_to_db_and_listen() -> None:
    conn = await psycopg.AsyncConnection.connect(
        f"dbname={db_name} user={db_user}", autocommit=True
    )
    await conn.execute("LISTEN test_update")
    print("Listening for notifications on channel 'test_update'.")
    async for notify in conn.notifies():
        print(f"Got NOTIFY: {notify.pid}, {notify.channel}.")

async def main() -> None:
    while True:
        try:
            await connect_to_db_and_listen()
        except psycopg.OperationalError as e:
            print(f"Caught {type(e)}: {e}")
            print("Attempting to reconnect to the database in 5 seconds.")
            await asyncio.sleep(5)
        except asyncio.CancelledError as e:
            print(f"Caught {type(e)}")
            break

try:
    asyncio.run(main())
except KeyboardInterrupt as e:
    print(f"Caught {type(e)}")
dvarrazzo commented 8 months ago

And, instead of having your own retry loop, you can use a pool with one connection and rely on it to handle the reconnection, which would use some backoff itself to wait for the server restart. You can check the pool behaviour from the logging messages:

import asyncio
import logging
import psycopg
from psycopg_pool import AsyncConnectionPool

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logging.getLogger("psycopg").setLevel(logging.INFO)
logger = logging.getLogger()

db_name = "piro"
db_user = "piro"

pool = AsyncConnectionPool(
    f"dbname={db_name} user={db_user}",
    kwargs={"autocommit": True},
    min_size=1,
    open=False,
)

async def connect_to_db_and_listen() -> None:
    async with pool.connection() as conn:
        await conn.execute("LISTEN test_update")
        logger.info("Listening for notifications on channel 'test_update'.")
        async for notify in conn.notifies():
            logger.info(f"Got NOTIFY: {notify.pid}, {notify.channel}.")

async def main() -> None:
    await pool.open(wait=True)
    while True:
        try:
            await connect_to_db_and_listen()
        except psycopg.OperationalError as e:
            logger.warning(f"Caught {type(e).__name__}: {e}")

try:
    asyncio.run(main())
except KeyboardInterrupt as e:
    logger.info(f"Caught {type(e).__name__}")

Runtime may look like:

# Program startup 
2024-02-25 12:25:21,596 INFO waiting for pool 'pool-1' initialization
2024-02-25 12:25:21,599 INFO adding new connection to the pool
2024-02-25 12:25:21,599 INFO pool 'pool-1' is ready to use
2024-02-25 12:25:21,599 INFO connection requested from 'pool-1'
2024-02-25 12:25:21,599 INFO connection given by 'pool-1'
2024-02-25 12:25:21,600 INFO Listening for notifications on channel 'test_update'.

# Sending some notifications
2024-02-25 12:25:33,577 INFO Got NOTIFY: 3171904, test_update.
2024-02-25 12:25:34,501 INFO Got NOTIFY: 3171904, test_update.

# Terminating the connection; reconnection is immediate
2024-02-25 12:25:36,783 INFO returning connection to 'pool-1'
2024-02-25 12:25:36,784 WARNING discarding closed connection: <psycopg.AsyncConnection [BAD] at 0x7ff981ea9db0>
2024-02-25 12:25:36,784 WARNING Caught OperationalError: consuming input failed: server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
2024-02-25 12:25:36,784 INFO connection requested from 'pool-1'
2024-02-25 12:25:36,789 INFO adding new connection to the pool
2024-02-25 12:25:36,789 INFO connection given by 'pool-1'
2024-02-25 12:25:36,790 INFO Listening for notifications on channel 'test_update'.

2024-02-25 12:25:40,913 INFO Got NOTIFY: 3171904, test_update.

# Restarting the server

2024-02-25 12:25:45,144 INFO returning connection to 'pool-1'
2024-02-25 12:25:45,144 WARNING discarding closed connection: <psycopg.AsyncConnection [BAD] at 0x7ff981ea9e40>
2024-02-25 12:25:45,144 WARNING Caught OperationalError: consuming input failed: server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
2024-02-25 12:25:45,144 INFO connection requested from 'pool-1'
2024-02-25 12:25:45,146 WARNING error connecting in 'pool-1': connection failed: FATAL:  the database system is shutting down
2024-02-25 12:25:46,054 WARNING error connecting in 'pool-1': connection is bad: No such file or directory
    Is the server running locally and accepting connections on that socket?
2024-02-25 12:25:47,868 WARNING error connecting in 'pool-1': connection is bad: No such file or directory
    Is the server running locally and accepting connections on that socket?
2024-02-25 12:25:51,502 INFO adding new connection to the pool
2024-02-25 12:25:51,502 INFO connection given by 'pool-1'
2024-02-25 12:25:51,503 INFO Listening for notifications on channel 'test_update'.

2024-02-25 12:25:54,993 INFO Got NOTIFY: 3173147, test_update.

# Pressing ctrl-c

^C
2024-02-25 12:25:59,809 INFO returning connection to 'pool-1'
2024-02-25 12:25:59,809 INFO Caught KeyboardInterrupt
olof-dev commented 8 months ago

Fantastic - using a connection pool like that seems to provide a great separation of concerns. I'll try to set that up in the migration. Thanks!

olof-dev commented 8 months ago

I noticed one functional difference between the original code and the new one, to do with 'packets' of notifications. In the original code I would sometimes get a list of notifications all in one go (in 'one packet'/generated in a single transaction) in the handler, whereas with the async for notify in conn.notifies() method I think I always just get one at a time.

With the former method I could then batch-process the NOTIFYs and just perform one db-query to get all the required updated data. With the new method it looks like I will have to query the database separately for each notification, to pull in the updated data, even if they actually arrived in the same batch/from the same transaction.

I'm loving the async features of psycopg 3. Is there a way to maintain the old behaviour (of processing 'packets' of NOTIFYs all in one go) with it?

To test, one can issue in psql:

BEGIN; NOTIFY test_update, '1'; NOTIFY test_update, '2'; COMMIT;