Mause / duckdb_engine

SQLAlchemy driver for DuckDB
MIT License
334 stars 35 forks source link

[Feature Proposal]: async support #1031

Open evan0greenup opened 2 months ago

evan0greenup commented 2 months ago

According to https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html,

It For a SQL DB implementation, it require relevant asyncio driver to make SQLAlchemy support this database in asyncio mode.

It would be awesome to add asyncio feature in duckdb_engine.

sherpya commented 1 month ago

currenty testing with this code:

from typing import Type

from duckdb_engine import Dialect
from sqlalchemy import URL, pool
from sqlalchemy.dialects import registry  # noqa

class AsyncDuckDB(Dialect):
    driver = 'aioduckdb'
    supports_statement_cache = True

    is_async = True

    @classmethod
    def get_pool_class(cls, url: URL) -> Type[pool.Pool]:
        if url.database == ':memory:':
            return pool.NullPool
        else:
            return pool.StaticPool

registry.register('duckdb.aioduckdb', 'engine.aioduckdb', 'AsyncDuckDB')

it is safe to set supports_statement_cache to True with this driver?

here the example:

#!/usr/bin/env python3
import asyncio

from sqlalchemy import Column, Integer, Sequence, String, Select
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import DeclarativeBase

from engine import aioduckdb  # noqa

class Base(AsyncAttrs, DeclarativeBase):
    pass

class FakeModel(Base):
    __tablename__ = 'fake'

    id = Column(Integer, Sequence('fakemodel_id_sequence'), primary_key=True)
    name = Column(String)

    def __str__(self):
        return f'FakeModel(id={self.id}, name={self.name})'

async def main():
    engine = create_async_engine('duckdb+aioduckdb:///:memory:', echo=True)
    # engine = create_async_engine('duckdb+aioduckdb:///test.db', echo=True)

    async_session = async_sessionmaker(engine, expire_on_commit=False)

    async with async_session() as session:
        async with session.begin():
            conn = await session.connection()
            await conn.run_sync(Base.metadata.drop_all)
            await conn.run_sync(Base.metadata.create_all)

            session.add(FakeModel(name='Frank'))
            session.add(FakeModel(name='Joe'))

            for result in await session.scalars(Select(FakeModel)):
                print(result)

            for result in await session.scalars(Select(FakeModel)):
                print(result)

            result = await session.stream(Select(FakeModel))

            async for e in result.scalars():
                print(e)

        await session.commit()

if __name__ == '__main__':
    asyncio.run(main())

output:

2024-07-31 15:27:14,401 INFO sqlalchemy.engine.Engine select current_schema()
2024-07-31 15:27:14,401 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-07-31 15:27:14,402 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-07-31 15:27:14,402 INFO sqlalchemy.engine.Engine 
            SELECT oid, table_name
            FROM (
                SELECT table_oid AS oid, table_name,              database_name, schema_name FROM duckdb_tables()
                UNION ALL BY NAME
                SELECT view_oid AS oid , view_name AS table_name, database_name, schema_name FROM duckdb_views()
            )
            WHERE schema_name NOT LIKE 'pg\_%' ESCAPE '\'
            AND table_name = $1

2024-07-31 15:27:14,403 INFO sqlalchemy.engine.Engine [generated in 0.00011s] ('fake',)
2024-07-31 15:27:14,434 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = $1 AND pg_catalog.pg_class.relname = $2 AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != $3
2024-07-31 15:27:14,434 INFO sqlalchemy.engine.Engine [generated in 0.00016s] ('S', 'fakemodel_id_sequence', 'pg_catalog')
2024-07-31 15:27:14,438 INFO sqlalchemy.engine.Engine 
            SELECT oid, table_name
            FROM (
                SELECT table_oid AS oid, table_name,              database_name, schema_name FROM duckdb_tables()
                UNION ALL BY NAME
                SELECT view_oid AS oid , view_name AS table_name, database_name, schema_name FROM duckdb_views()
            )
            WHERE schema_name NOT LIKE 'pg\_%' ESCAPE '\'
            AND table_name = $1

2024-07-31 15:27:14,438 INFO sqlalchemy.engine.Engine [cached since 0.03565s ago] ('fake',)
2024-07-31 15:27:14,443 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = $1 AND pg_catalog.pg_class.relname = $2 AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != $3
2024-07-31 15:27:14,443 INFO sqlalchemy.engine.Engine [cached since 0.009541s ago] ('S', 'fakemodel_id_sequence', 'pg_catalog')
2024-07-31 15:27:14,447 INFO sqlalchemy.engine.Engine CREATE SEQUENCE fakemodel_id_sequence
2024-07-31 15:27:14,447 INFO sqlalchemy.engine.Engine [no key 0.00007s] ()
2024-07-31 15:27:14,447 INFO sqlalchemy.engine.Engine 
CREATE TABLE fake (
        id INTEGER NOT NULL, 
        name VARCHAR, 
        PRIMARY KEY (id)
)

2024-07-31 15:27:14,447 INFO sqlalchemy.engine.Engine [no key 0.00006s] ()
2024-07-31 15:27:14,449 INFO sqlalchemy.engine.Engine INSERT INTO fake (id, name) SELECT p0::INTEGER, p1::VARCHAR FROM (VALUES (nextval('fakemodel_id_sequence'), $1, 0), (nextval('fakemodel_id_sequence'), $2, 1)) AS imp_sen(p0, p1, sen_counter) ORDER BY sen_counter RETURNING fake.id, fake.id AS id__1
2024-07-31 15:27:14,450 INFO sqlalchemy.engine.Engine [generated in 0.00006s (insertmanyvalues) 1/1 (ordered)] ('Frank', 'Joe')
2024-07-31 15:27:14,452 INFO sqlalchemy.engine.Engine SELECT fake.id, fake.name 
FROM fake
2024-07-31 15:27:14,452 INFO sqlalchemy.engine.Engine [generated in 0.00009s] ()
FakeModel(id=1, name=Frank)
FakeModel(id=2, name=Joe)
2024-07-31 15:27:14,452 INFO sqlalchemy.engine.Engine SELECT fake.id, fake.name 
FROM fake
2024-07-31 15:27:14,453 INFO sqlalchemy.engine.Engine [cached since 0.0008197s ago] ()
FakeModel(id=1, name=Frank)
FakeModel(id=2, name=Joe)
2024-07-31 15:27:14,453 INFO sqlalchemy.engine.Engine SELECT fake.id, fake.name 
FROM fake
2024-07-31 15:27:14,453 INFO sqlalchemy.engine.Engine [cached since 0.001428s ago] ()
FakeModel(id=1, name=Frank)
FakeModel(id=2, name=Joe)
2024-07-31 15:27:14,454 INFO sqlalchemy.engine.Engine COMMIT