psqlpy-python / psqlpy

Asynchronous Python PostgreSQL driver written in Rust
https://psqlpy-python.github.io/
MIT License
227 stars 3 forks source link

Add support to copy to table #68

Closed xiaoxianma closed 3 months ago

xiaoxianma commented 4 months ago

I've found this library quite useful. In order to fully migrate my projects to use this, I'd like to have an equivalent copy_to_table of asyncpg.

chandr-andr commented 4 months ago

Hello! Thank you for your kind words about the library. We'll make this feature request as soon as possible.

chandr-andr commented 3 months ago

@xiaoxianma Hello again! Could you please describe in what way you use copy_to_table in asyncpg? As I understand it supports different types of source: file, async iterator and etc.

Due to strong typing in rust it will take long to support everything. So i'd rather split this task into different ones than do it in one. And firstly I'll do functionality you are using.

xiaoxianma commented 3 months ago

Sure. I use pgpq to serialize pandas dataframe into pg binary data and copy it into pg table. An example below,

import pyarrow.parquet as pq
from pgpq import ArrowToPostgresBinaryEncoder
from io import BytesIO

# load data from file
arrow_table = pq.read_table(file)
encoder = ArrowToPostgresBinaryEncoder(arrow_table.schema)

# prepare data
buf = BytesIO()
buf.write(encoder.write_header())
for batch in arrow_table.to_batches():
    buf.write(encoder.write_batch(batch))
buf.write(encoder.finish())
buf.seek(0)

# save to table
await connection.copy_to_table("test_table", source=buf, columns=list(arrow_table.columns), format="binary")
chandr-andr commented 3 months ago

@xiaoxianma Thank you very much! I didn't know about the pgpq library. I'll try to integrate it fully since it's written in Rust too.
I think it'd be more convenient for everyone with your usecase.

xiaoxianma commented 3 months ago

Ty. I think pgpq library is unnecessary in order to support copy_to_table function as it is simply a converter from arrow_table to pg copy binary format. Since my data is pandas dataframe, it's convenient to leverage existing lib to do that and encoding is very fast. I had ever done something like this, but the performance was very bad.

chandr-andr commented 3 months ago

Yeap, I understand. The main reason why I want to fully integrate pgpq because it's rust library and with the new integration you wont need to create BytesIO instance, you could do everything with instance from psqly what, in my opinion, will make performance even better.

chandr-andr commented 3 months ago

@xiaoxianma I've made new release https://github.com/qaspen-python/psqlpy/releases/tag/0.7.3 with new method binary_copy_to_table. It isn't a generic way that supports all possible ways to execute COPY psql command but covers your case.

I've checked it with test data and it works well. Could you please test it with your code and data?

async def main():
    arrow_table = pq.read_table("./MTcars.parquet")
    encoder = ArrowToPostgresBinaryEncoder(arrow_table.schema)
    buf = BytesIO()
    buf.write(encoder.write_header())
    for batch in arrow_table.to_batches():
        buf.write(encoder.write_batch(batch))
    buf.write(encoder.finish())
    buf.seek(0)

    psql_pool = ConnectionPool()

    conn = await psql_pool.connection()
    res = await conn.binary_copy_to_table(
        table_name="cars",
        py_buffer=buf,
    )
    print(res)
xiaoxianma commented 3 months ago

Thanks for the quick action. I'll give it a shot and let u know.

xiaoxianma commented 3 months ago

i may hit a separate issue. My postgres server enables SSL. I got this error when trying to open a connection. psqlpy.exceptions.BaseConnectionPoolError: Database engine pool exception: Error occurred while creating a new object: db error: FATAL: SSL required

Then I have to add sslmode=require in my connection string. Then ran into another error psqlpy.exceptions.BaseConnectionPoolError: Database engine pool exception: Error occurred while creating a new object: error performing TLS handshake: no TLS implementation configured.

I don't have such error when using psycopg or asyncpg with my connection string (I also don't need to specify ssl anywhere). Do u have any idea?

chandr-andr commented 3 months ago

@xiaoxianma I've checked, and there is really a problem with not allowing users to use self-signed certificates. I've made a new release https://github.com/qaspen-python/psqlpy/releases/tag/0.7.4 with the fix. I tested locally with SSL mode enabled on PostgreSQL and everything works great. Waiting for your word that all is good)

xiaoxianma commented 3 months ago

Thanks. The opening a new connection is fixed. I've found two more problems.

1) My table has some special column names. E.g a-b. It got complained by psqlpy. psqlpy.exceptions.DriverError: Database engine exception: db error: ERROR: syntax error at or near "-".

2) It seems connection got automatically closed after binary_copy_to_table. However, I want to keep and reuse the same connection if possible to avoid acquiring a new connection.

await conn.binary_copy_to_table(buf1, table_name='cars')
await conn.binary_copy_to_table(buf2, table_name='cars')
chandr-andr commented 3 months ago

@xiaoxianma 1) About the first problem, I need to see the columns. db error in the error text means that the exception was raised on the database side, not on the psqlpy. There is no custom formating process for columns input, so the final query has exactly the same columns you passed.

2) About the second, I've done it now, but it doesn't close the connection. The connection will be closed if you use it in async context manager, like this:

async with pool.acquire() as connection:
    # connection is valid here
    await connection.execute(...)
    # connection is valid here
# connection is INvalid here

But if you retrieve the connection this way:

connection = await pool.connection()

It'll be valid by the end of the scope (function, for example).

In general, I'd like to see the full snippet of code to help u.

chandr-andr commented 3 months ago

@xiaoxianma I think I've understood the first problem. Since you column looks like a-b, you need to pass it these way:

async def main():
    res = await conn.binary_copy_to_table(
        columns=[
            '"a-b"',
        ],
        ...
    )

I'll change this behavior but not as fast as the last bug with sslmode

If you have some rust experience, it'd be good to have one more contributor!

xiaoxianma commented 3 months ago

Thanks.

1) i think i can have a workaround like you suggested above now. We can open a separate issue tracking this. And I hope i'll try to fix it if getting a chance. 2) I tried both context manager async with pool.acquire as well. Still not working for me. Within the ctxmgr, I could have execute, fetch commands prior to binary_copy_to_table without problem. But after binary_copy_to_table, it seems the connection was automatically closed, i cannot run any even within the ctxmgr.

ok version:

async with pool.acquire() as connection:
    await connection.fetch(...)
    await connection.execute(...)
    await connection.bbinary_copy_to_table()

Problem version:

async with pool.acquire() as connection:
    await connection.bbinary_copy_to_table()
    await connection.fetch(...) # not working due to connection closed.
chandr-andr commented 3 months ago

@xiaoxianma 1) Yeap, please, open a new issue for this feature. 2) It seems strange cuz the same works well on my side. I need to dive deeper into the all configuration u are using (PostgreSQL version and configuration if possible, OS system, queries, mb you have some external connection pools like PGBouncer).

You can open an issue for this one too cuz the problem could be not on the psqlpy side and I wanna understand it))

My full test case is this one. I use cars file from this site https://www.tablab.app/datasets/sample/parquet:

arrow_table = pq.read_table("./python/tests/test_data/MTcars.parquet")
encoder = ArrowToPostgresBinaryEncoder(arrow_table.schema)
buf = BytesIO()
buf.write(encoder.write_header())
for batch in arrow_table.to_batches():
    buf.write(encoder.write_batch(batch))
buf.write(encoder.finish())
buf.seek(0)

async with psql_pool.acquire() as connection:
        await connection.binary_copy_to_table(
            table_name="cars",
            source=buf,
        )
        res = await connection.fetch("SELECT COUNT(*) FROM cars")
        print(res.result())  # It returns the result
xiaoxianma commented 3 months ago

Thanks. I'll mark this one as DONE. And open separate issues. Thanks for help again.

chandr-andr commented 3 months ago

Thank u for improving the project with finding these bugs!