apache / arrow-adbc

Database connectivity API standard and libraries for Apache Arrow
https://arrow.apache.org/adbc/
Apache License 2.0
360 stars 86 forks source link

Performance questions: What is the best way to upsert and in general? (Postgres) #2046

Open avm19 opened 1 month ago

avm19 commented 1 month ago

What would you like help with?


I want to insert and update records in a table using Python API of adbc_driver_postgres, let's say, I have 10k rows:

import pyarrow as pa
a = pa.array(range(10000))
table =  pa.Table.from_arrays(arrays=[a,a,a], names=['col1', 'col2', 'col3'])

I noticed that executemany() is much slower than adbc_ingest() for ingesting data. Let's say I have 10k rows:

# 0.1 s
with adbc_driver_postgresql.dbapi.connect(uri) as conn:
    with conn.cursor() as cursor:
        cursor.execute("TRUNCATE TABLE test_table;")
        cursor.adbc_ingest('test_table', table, mode="replace")
        cursor.execute('ALTER TABLE test_table ADD PRIMARY KEY ("col1");')
    conn.commit()

as compared to

# ~7.5 sec
with adbc_driver_postgresql.dbapi.connect(uri) as conn:
    with conn.cursor() as cursor:
        cursor.execute("TRUNCATE TABLE test_table;")
        query = 'INSERT INTO test_table ("col1", "col2", "col3") VALUES ($1, $2, $3);'
        cursor.executemany(query, table)       
    conn.commit()

I don't mind using adbc_ingest() to populate my database, but later in its lifecycle I need to upsert records and more. For example, I need to do something like:

# ~7.5 sec
query = (
    'INSERT INTO test_table ("col1", "col2", "col3") VALUES ($1, $2, $3)'
    'ON CONFLICT ("col1") DO UPDATE SET "col2" = EXCLUDED."col2", "col3" = 0;'
)
with adbc_driver_postgresql.dbapi.connect(uri) as conn:
    with conn.cursor() as cursor:
         cursor.executemany(query, table)

which is too slow. Apparently executemany() is extremely inefficient for this ask. What is the cause of such a poor performance? What is the bottleneck?

The same outcome could be achieved much faster by first ingesting data into a temporary table and then making Postgres run a more complex operation from it rather than from input:

# 0.2s
with adbc_driver_postgresql.dbapi.connect(uri) as conn:
    with conn.cursor() as cursor:
        cursor.adbc_ingest('test_table2', table, mode="replace")        
        query = (
            'INSERT INTO test_table ("col1", "col2", "col3")\n'
            'SELECT "col1", "col2", "col3" FROM test_table2\n'
            'ON CONFLICT ("col1") DO UPDATE SET "col2" = EXCLUDED."col2", "col3" = 0;'
        )
        cursor.execute(query)
    conn.commit()

This approach gives a reasonable performance, but is this how one supposed to do this? Is there anything that can be easily improved? I do not know much about Postgres's backend operation and what optimisations it does for ingestion, but I suspect that it is not best practice to create temporary tables (which are not even TEMPORARY) when we just want to stream data.

lidavidm commented 1 month ago

The difference is that a bulk ingest runs via COPY, which can't do a fancy upsert, but is instead optimized to just throw data into the database as fast as possible. While a regular insert uses bind parameters, which requires a database roundtrip for every single row. I would say a temporary table is reasonable for your use case.

avm19 commented 1 month ago

The difference is that a bulk ingest runs via COPY

Just to clarify, the "COPY" you are referring to, is it the COPY command of Postgres wire protocol or is it something else? And is it called/implemented in adbc_driver_postgres via libpq library?

lidavidm commented 1 month ago

Yes and yes: we use the COPY statement with binary format, and we use libpq as the underlying client library.