MagicStack / asyncpg

A fast PostgreSQL Database Client Library for Python/asyncio.
Apache License 2.0
6.95k stars 400 forks source link

backpressure on LISTEN? #463

Open jrozentur opened 5 years ago

jrozentur commented 5 years ago

My application subscribes to postgres notifications and fans them out to subscribers using streaming http.

I see that notifications emerge in connection._process_notification, where they are dispatched via call_soon(), each time calling _call_listener(), which will synchronously call the callback that I specify in conn.add_listener(). In the callback, I insert a task into the queue for asynchronous processing...

I see a lot of tasks inserted this way, e.g. ~5000 before my application has a chance to process them. Therefore application memory has big unpredictable spikes, like +300k

Question: how do you apply back-pressure in this setup? is there a way to limit the number of postgres notifications that are being converted to tasks, to control overall number of in-flight tasks? Thanks!

elprans commented 5 years ago

In short, there's no way to apply backpressure on notifications sent by the server, since these are completely asynchronous at the protocol level. That said, it may be possible to optimize the high-volume case somewhat by buffering the notifications and using call_later on a wrapper that loops over the buffer instead of call_soon on every notification.

shamrin commented 3 years ago

@elprans Is it possible to apply backpressure on the socket level somehow? For example, through .pause_reading() and .resume_reading() methods? Are they available through asynpg API?

For context, I'm trying to come up with a good API for LISTEN functionality in triopg library. The backpressure issue is the main problem. Since triopg is a thin wrapper around asyncpg, we are limited by its functionality.

shamrin commented 3 years ago

For comparison, psycopg2 does support applying LISTEN backpressure, when used in asynchronous mode. It exposes the socket file descriptor via conn.fileno(). The descriptor can then be used by "is there data on this socket?" APIs, like wait_readable function from Trio:

async def get_events(channel, **kwargs):
    import psycopg2
    import psycopg2.extensions
    import trio
    conn = psycopg2.connect(**kwargs)
    conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
    try:
        with conn.cursor() as cur:
            cur.execute(f"LISTEN {channel};")

        while True:
            await trio.lowlevel.wait_readable(conn)
            conn.poll()
            while conn.notifies:
                event = conn.notifies.pop(0)
                yield event
    finally:
        conn.close()

via https://gitter.im/python-trio/general?at=5edf845b7c64f31f114cf02b

elprans commented 3 years ago

There's no public API, but you can get to the socket like this: conn._transport.get_extra_info('socket')

ThibTrip commented 2 years ago

Hello, I am developing a queue system for PostgreSQL (with customizable job model, job history, sync/async support, bulk get/insert, LISTEN/NOTIFY optional support and more) that should hopefully go open-source soon and I think I ran into the same problem.

I use LISTEN/NOTIFY to tell me asap when something is available. I then need to use a complex query (the payload does not serve me any purpose here) to dequeue jobs (the query uses in particular FOR UPDATE and SKIP LOCKED and does not delete jobs but just marks them) similarly to the library pq and yield them asap to the user.

In my case I don't think I could use trio.lowlevel.wait_readable. It has no timeout and when I tried to wrap it with asyncio.wait_for e.g. await asyncio.wait_for(trio.lowlevel.wait_readable(socket), timeout=1) Python raised the following error : RuntimeError: must be called from async context.

Below is what I came up with. In my case I also have to deal with historic jobs i.e. things that happened before listening! It looks a little stupid but it works well even with many listeners. And it also does not use any additional library such as trio. I would be very thankful for any feedback or suggestion. I am a bit in over my head here :woozy_face:.

class PostgresQueue:
    # many methods that I did not copy here
    # ...

    # ASYNCHRONOUS LISTEN
    # callback for asyncpg.connection.Connection.add_listerner(channel, callback)
    # since callbacks cannot be async we will have to wrap that in a task (see `_on_notify` below)
    async def _async_on_notify(self, *args, **kwargs):
        # the value we put does not matter, we will use `get` to block until we get something
        await self._async_listen_queue.put(1)  # asyncio.Queue.put

    def _on_notify(self, *args, **kwargs):
        asyncio.create_task(self._async_on_notify())

    async def _await_for_more_jobs(self, driver_connection, timeout=1) -> list:
        await driver_connection.add_listener(channel_name, callback=self._on_notify)
        try:
            notification = await asyncio.wait_for(self._async_listen_queue.get(), timeout=timeout)
            self._async_listen_queue.task_done()
            return True
        # the time lost by catching this should be negligeable enough compared to the rest (0.4s slower for 100.000
        # rounds where a timeout exception systematically occured vs no exceptions - 9.1 vs 9.5s)
        except asyncio.TimeoutError:
            return False
        finally:
            await driver_connection.remove_listener(channel_name, self._on_notify)

    async def aiterqueue(self, timeout=1):
        # I actually use sqlalchemy in my library so I am going to have to get the
        # lowest level connection for listening
        raw_connection = await self.connection.get_raw_connection()
        driver_connection:asyncpg.connection.Connection = raw_connection.connection.driver_connection
        while True:
            # FETCH HISTORICAL DATA (before we started to listen using method `_await_for_more_jobs`)
            # Postgres won't notify us about previous events
            jobs = await self.aget()
            if len(jobs) == 0:
                # if there are no more jobs, wait for the queue to notify us
                has_notifications = await self._await_for_more_jobs(driver_connection=driver_connection, timeout=timeout)
                if not has_notifications:
                    # this is a sort of heartbeat to the user so he can make decisions when no more jobs are available
                    yield []
                    # the control is returned to the user so we will have to use `aget`
                    # to fetch historical data again since we are not listening anymore
            else:
                yield jobs

For reference, here is how I achieved the same functionality in synchronous mode.

Synchronous listen ```python class PostgresQueue: # SYNCHRONOUS LISTEN def _wait_for_more_jobs(self, timeout=1) -> list: with self.connection.begin(): self.connection.execute(text(f'LISTEN {channel_name};')) # wait for resource to receive some I/O select.select([self.connection.connection],[],[], timeout) self.connection.connection.poll() # we don't care about the notifications and their payloads, # just that jobs are available has_notifications = len(self.connection.connection.notifies) > 0 self.connection.connection.notifies = [] return has_notifications def iterqueue(self, timeout=1): while True: # FETCH HISTORICAL DATA (before we started to listen using method `_wait_for_more_jobs`) # Postgres won't notify us about previous events jobs = self.get() if len(jobs) == 0: # if there are no more jobs, wait for the queue to notify us has_notifications = self._wait_for_more_jobs(timeout=timeout) # case of no notifications after n seconds if not has_notifications: # this is a sort of heartbeat to the user so he can make decisions when no more jobs are available yield [] # the control is returned to the user so we will have to use `get` # to fetch historical data again since we are not listening anymore else: yield jobs ```

EDIT: I worked with a mock and did not test this well enough, I updated the code... Sorry for that.