MagicStack / asyncpg

A fast PostgreSQL Database Client Library for Python/asyncio.
Apache License 2.0
6.96k stars 402 forks source link

Using async iterator in copy_records_to_table #689

Closed alex-eri closed 3 years ago

alex-eri commented 3 years ago

I want to stream data to base with COPY, but i have async code, that produce data. I can pass regular iterator to records, but it ends then Queue empty. So reconnect needed.

queue = asyncio.Queue(2<<10)
loop.create_task(feed(queue))

async def get_records(queue):
    while True:
        r = await queue.get()
        yield r
        self.q.task_done()

async with pg.acquire() as connection:
    await connection.copy_records_to_table(
                     'radarlog2',
                     records = get_records(queue)
                 )
robhaswell commented 3 years ago

I too was surprised that this wasn't supported.

It seems that copy_to_table supports an async iterator but it must generate the Postgres binary protocol? I couldn't find a protocol formatter anywhere even though I assume that asyncpg must contain one somewhere in order to support this function.

If anyone has some guidance on how to use copy_to_table then that would be a good alternative, thanks.

elprans commented 3 years ago

713 adds support for async iterables in copy_records_to_table.

robhaswell commented 3 years ago

Thanks!