adriangb / pgpq

Stream Arrow data into Postgres
MIT License
251 stars 18 forks source link

Recipe: Copy Parquet file into Postgres #28

Closed jonashaag closed 1 year ago

jonashaag commented 1 year ago

Maybe this is useful for some other people.

import sys

import psycopg
import pyarrow.parquet as pa
import tqdm
import pgpq

USE_AUTOCOMMIT = True
BATCH_SIZE = 2**20

def write_batches(conn, batches):
    encoder = pgpq.ArrowToPostgresBinaryEncoder(batches[0].schema)
    with conn.cursor() as cur, cur.copy(
        f"COPY {table_name} FROM STDIN WITH (FORMAT BINARY)"
    ) as pg_copy:
        pg_copy.write(encoder.write_header())
        for b in batches:
            pg_copy.write(encoder.write_batch(b))
        pg_copy.write(encoder.finish())

def main(parquet_file_path, table_name):
    with pa.ParquetFile(parquet_file_path) as pq_file:
        encoder = pgpq.ArrowToPostgresBinaryEncoder(pq_file.schema.to_arrow_schema())
        cols = [
            f'"{col_name}" {col.data_type.ddl()}'
            for col_name, col in encoder.schema().columns
        ]
        ddl = f"CREATE TABLE {table_name} {', '.join(cols)}"
        with psycopg.connect(
            "postgres://localhost:5432/postgres", autocommit=USE_AUTOCOMMIT
        ) as conn:
            conn.execute(ddl)
            batches = []
            with tqdm.tqdm(total=pq_file.metadata.num_rows) as t:
                for batch in pq_file.iter_batches():
                    batches.append(batch)
                    num_rows = sum(b.num_rows for b in batches)
                    if num_rows >= BATCH_SIZE:
                        write_batches(conn, batches)
                        t.update(num_rows)
                        batches = []
                if batches:
                    write_batches(conn, batches)

if __name__ == "__main__":
    parquet_file_path, table_name = sys.argv[1:]
    main(parquet_file_path, table_name)
adriangb commented 1 year ago

I'm curious why you're re-batching in Python instead of having pyarrow do the batching for you?

jonashaag commented 1 year ago

Good point, I observed a small speedup when using columnar backend (Citus/Hydra) vs sending a single Parquet batch per COPY. But honestly I haven't rigorously benchmarked this so could be useless.

Another thing I wasn't sure about is what happens if you submit a huge Parquet file in a single COPY. I think it will lead to a huge WAL. Not sure what the side effects of this are.

adriangb commented 1 year ago

The example in the README uses pyarrow to stream batches. So you never hold the entire file in memory and can use pyarrow to rebatch it to whatever number of rows best fits your database.

jonashaag commented 1 year ago

Oh, I actually didn't realize I could just use that even though I'm not using Hive style Parquet datasets 🙈

How about the WAL growing too large though? Could that become a problem?

In any case there's probably nothing useful that my recipe adds over your example. Would be nice to make a CLI tool from this at some point -- I imagine that getting Parquet into Postgres quickly is one of the primary uses of this library.

jonashaag commented 1 year ago

My recipe fails to recognize VARCHAR columns as such and always uses TEXT. Is that the case with your example as well?

adriangb commented 1 year ago

How about the WAL growing too large though? Could that become a problem?

I don't think so. I've used binary copies to load some pretty large datasets.

My recipe fails to recognize VARCHAR columns as such and always uses TEXT. Is that the case with your example as well?

I'm not sure what you mean by this but pgpq uses TEXT for String columns because TEXT is always the right choice in Postgres.

adriangb commented 1 year ago

Would be nice to make a CLI tool from this at some point -- I imagine that getting Parquet into Postgres quickly is one of the primary uses of this library.

Agreed! I tried this at one point and ran out of time. Ideally it would be written in Rust so it can ship as a single binary without Python.

jonashaag commented 1 year ago

TEXT is always the right choice in Postgres

TIL! Not the case in most other DBMS.

How about the WAL growing too large though? Could that become a problem?

I don't think so. I've used binary copies to load some pretty large datasets.

In my case the WAL grew to a couple of 10 GB, but as long as you have enough space maybe it's not a problem.

Closing this because my recipe is not any better than what's provided already