cunybpl / aiodal

async data access layer and related tools for sqlalchemy core
0 stars 0 forks source link

Devise bulk loading API #30

Closed bsnacks000 closed 3 months ago

bsnacks000 commented 3 months ago

@tinnaing347 as discussed... we do alot of bulk loading via psql and loose scripts or some combo of this with bash.

While this is totally fine I think as our projects are growing in scope our ETL is becoming more complicated.

We could use asyncpg directly and use the copy commands to achieve the same results but be able to nest the ops in a reusable API that will be faster for us to develop...

A simple script using jsonl uploading looks like this:

import asyncpg
import asyncio
import os

url = os.getenv("DATABASE_URL", "")

async def main(url: str, fpath: str):
    conn = await asyncpg.connect(url)
    async with conn.transaction():
        await conn.execute(
            """ 
            create temp table mydata_tmp( record jsonb ) on commit drop;
            """
        )
        result = await conn.copy_to_table("mydata_tmp", source=fpath)

        print(result)

        result = await conn.execute(
            """ 
            insert into mydata (id, data_key, val)
            select 
                (record->>'id')::int as id, 
                (record->>'data_key')::varchar as data_key, 
                (record->>'val')::numeric as val
            from mydata_tmp on conflict (id) do nothing;
            """
        )

        print(result)

asyncio.run(main(url, "test-data.jsonl"))

We would encapsulate this sort of pattern into an interface here and then for different projects re-use these.