MagicStack / asyncpg

A fast PostgreSQL Database Client Library for Python/asyncio.
Apache License 2.0
6.84k stars 395 forks source link

cannot stream cursor on very large table #462

Open AlJohri opened 5 years ago

AlJohri commented 5 years ago

I'm running into an issue with using cursors on a very large postgres table. The table has approximately 60 million rows. The table has only 8 columns and each row is very small. I'm trying to use a server side cursor to stream the results in (ordered) but it just completely freezes while establishing the con.cursor. This is my code:

import asyncio
import asyncpg

dsn = 'postgresql://...:...@....us-east-1.rds.amazonaws.com/...'

async def run():
    con = await asyncpg.connect(dsn)
    print(con)
    async with con.transaction():
        async for record in con.cursor('SELECT * FROM pageviews ORDER BY user_id', prefetch=10, timeout=2):
            print(record)
    await con.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

I tried using both the iterable cursor (with explicit prefetch) and the regular cursor and neither worked. The following psycopg2 code using a "named cursor" (server-side cursor) works properly:

import datetime
import psycopg2
from psycopg2.extras import RealDictCursor

cursor_name = datetime.datetime.now().isoformat()

dsn = 'postgresql://...:...@....us-east-1.rds.amazonaws.com/...'
conn = psycopg2.connect(dsn, cursor_factory=RealDictCursor)

with conn.cursor(cursor_name) as cursor:
    sql = "SELECT * FROM pageviews ORDER BY user_id;"
    cursor.execute(sql)
    for i, row in enumerate(cursor):
        print(i, row)

I need to stream the entire ordered table and perform a streaming groupby and then run some computation on each grouped object. I'm looking to asyncpg to decrease the IO time.

AlJohri commented 5 years ago

I tried using the databases library (which also uses asyncpg) just in case I got the API wrong. Same result:

import asyncio
from databases import Database

dsn = 'postgresql://...:...@....us-east-1.rds.amazonaws.com/...'

async def run():
    database = Database(dsn)
    await database.connect()
    query = "SELECT * FROM pageviews ORDER BY user_id"
    async for record in database.iterate(query=query):
        print(record)

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

I through in some debug statements into the underlying asyncpg library and this is the output:

starting to init cursor iterator
starting to init base cursor
state None
starting anext
<asyncpg.protocol.protocol.PreparedStatementState object at 0x10b734c18>

After adding the timeout, this the traceback:

Traceback (most recent call last):
  File "test2.py", line 24, in <module>
    loop.run_until_complete(run())
  File "/Users/johria/.asdf/installs/python/anaconda3-2018.12/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "test2.py", line 12, in run
    async for record in con.cursor('SELECT * FROM pageviews ORDER BY user_id', prefetch=10, timeout=2):
  File "/Users/johria/.asdf/installs/python/anaconda3-2018.12/lib/python3.7/site-packages/asyncpg/cursor.py", line 185, in __anext__
    buffer = await self._bind_exec(self._prefetch, self._timeout)
  File "/Users/johria/.asdf/installs/python/anaconda3-2018.12/lib/python3.7/site-packages/asyncpg/cursor.py", line 100, in _bind_exec
    self._state, self._args, self._portal_name, n, True, timeout)
  File "asyncpg/protocol/protocol.pyx", line 196, in bind_execute
concurrent.futures._base.TimeoutError

Some issue in _bind_exec ?

AlJohri commented 5 years ago

I also observed when using asyncpg the CPU usage skyrocketed and once I even got a DiskFullError. Do you think it has something to do with the creation of temporary files on the server? This SO answer has some details on when cursors create the temporary files: https://stackoverflow.com/questions/42292341/correct-use-of-cursors-for-very-large-result-sets-in-postgres

Any chance a FETCH ALL is getting run anywhere after creating the cursor?

AlJohri commented 5 years ago

I tried the same thing using node-postgres with pg-cursor and ran into the same hanging error. However, manually declaring the cursor worked.

import asyncio
import asyncpg

# Monkey Patch asyncpg
from asyncpg.connection import Connection
async def cursor2(self, query, itersize=100):
    await self.execute(f'DECLARE c NO SCROLL CURSOR WITHOUT HOLD FOR {query}')
    while True:
        rows = await self.fetch(f'FETCH {itersize} FROM c')
        if not rows:
            await self.execute('CLOSE c')
            break
        for row in rows: yield row
Connection.cursor2 = cursor2

# ---------------------------------------------------------- #

async def run():
    con = await asyncpg.connect(dsn)
    async with con.transaction():
        async for row in con.cursor2('SELECT * FROM pageviews ORDER BY user_id'):
            print(row)
    await con.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

any ideas why this might be the case? cc @elprans

elprans commented 5 years ago

A full scan, and the use of temporary files, are triggered by the ORDER BY clause in your query. The server has to perform a very expensive operation before it returns the first rows from the cursor.

That said, I've been unable to reproduce the hanging locally. This might be an issue with your particular setup. Server-side logs might shed some more light.

AlJohri commented 5 years ago

@elprans forgive my ignorance, but at a high level, what's the difference between the way I'm instantiating the cursor in the example that worked and the way asyncpg does it? I believe they are both NO SCROLL, WITHOUT HOLD server-side cursors.

A full scan, and the use of temporary files, are triggered by the ORDER BY clause in your query. The server has to perform a very expensive operation before it returns the first rows from the cursor.

I read the same while googling but running the following in the psql shell returns immediately:

BEGIN;
DECLARE c NO SCROLL CURSOR WITHOUT HOLD FOR SELECT * FROM pageviews ORDER BY user_id;
FETCH 10 FROM c;
FETCH 10 FROM c;
FETCH 10 FROM c;
COMMIT;

FWIW: user_id is an indexed field. I also tried running SET cursor_tuple_fraction TO 1.0; in the asyncpg transaction to hint that I just want the entire database streamed but it had no effect.

Happy to share some server side logs to get to the bottom of this. Will try to pull those up soon.

elprans commented 5 years ago

In theory, there should be no difference between a NO SCROLL CURSOR WITHOUT HOLD and the protocol-level Portal asyncpg uses, but in your case there seems to be a difference. Are you able to replicate the issue on a non-RDS setup?

leftys commented 4 years ago

I can confirm we encountered a similar problem as well. Simple filtered select cursor queries on large tables sometime hang before returning any data. What's worse, regular queries without cursor behaved the same. So I think the whole issue is not relevant to cursors. When running the same queries through psycopg2, the queries succeeded quickly.

Tested with version 0.20.1.

s-ilyas commented 4 years ago

Have a potentially related issue with really bad performance when using cursor(). Postgres auto_explain revealed a weird query-plan pattern that was causing the slowness. Any updates on this thread?

AlJohri commented 4 years ago

I'm no longer working on the project that ran into this error but for the sake of debugging, @leftys and @s-ilyas were you also on RDS versions of postgres?

leftys commented 4 years ago

Yes, our Postgres runs on EC2.

s-ilyas commented 4 years ago

@AlJohri no, I'm using a local Postgres deployment in a Docker container. It's running the TimescaleDB extension which probably only complicates things wrt. to the issue :/

leftys commented 4 years ago

Interesting, we use TimescaleDB as well.

wrobell commented 3 years ago

@AlJohri @leftys Have you tried something like this

cursor = await conn.cursor(sql)
while (data := await cursor.fetch(10 ** 5)):
    ... process data ...

With above I am able to stream the data from PostgreSQL/TimescalDB databases with billion of rows (ordering is involved as well).

lucasgadams commented 9 months ago

I think i am experiencing this issue? I have a large but not that large table (but does contain maybe a lot of data as it includes embeddings) and trying to use stream_scalars from sqlalchemy with asyncpg, against an RDS postgres instance. All i am doing is trying to stream a single column and print it out. While the SQLAlchmey statement result = await session.stream_scalars(statement) comes back quickly, as I async for iterate over the results not a single result ever comes back. Instead it hangs, and checking the metrics on my client machine I have a CPU at 100% and RAM climbing until the process dies. Not sure what is up but server side cursor does not seem to work correctly.