Open elektracodes opened 3 months ago
Allow me to ask you a couple of questions first.
I don't think Asyncsession works with query api(correct me if I'm wrong)
you should have faced an error like AttributeError: 'async_sessionmaker' object has no attribute 'query'
but your code was able to call page_sorter
it doesn't support AsyncSession natively for now but there is a way around
you could use select
statement at dao level and in page_builder you could pass async_session as extra_context
where you can do session.execute(stmt)
and call all
or fetchall
method on your result.
Do you want me to add a working example for this workaround?
Or I think we can work on adding a support layer for AsyncSession
.
Hello. async queries in sqlalchemy work in a different way. I can suggest you to make a session dependency, use repository pattern for queries instead of combining DAO and queries to db.
from types import TracebackType
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from src.somewhere.settings import settings # my pydantic-settings, use yours instead.
class AsyncSessionMaker:
_engine = create_async_engine(
url=settings.POSTGRES_URL,
pool_pre_ping=True,
future=True,
)
_sessionmaker = async_sessionmaker(
bind=_engine,
autoflush=False,
autocommit=False,
)
def __init__(self) -> None:
self._session = self._sessionmaker()
@property
def session(self) -> AsyncSession:
return self._session
async def __aenter__(self) -> AsyncSession:
return await self._session.__aenter__()
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
return await self._session.__aexit__(exc_type, exc_value, traceback)
from collections.abc import AsyncGenerator
from typing import Annotated
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from src.somewhere.db import AsyncSessionMaker # import previously made session maker
__all__ = ("AsyncSessionDep", "get_async_session")
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionMaker() as session:
try:
yield session
await session.commit()
finally:
await session.close()
AsyncSessionDep = Annotated[AsyncSession, Depends(get_async_session)]
from collections.abc import Sequence
from typing import Generic, TypeVar
from uuid import UUID # id for those examples would be as UUID
from sqlalchemy import ColumnExpressionArgument, Subquery, func, inspect, select
from sqlalchemy.ext.asyncio import AsyncSession
from src.somewhere.db import SQLAlchemyBaseModel # your base-class for sqlalchemy models
_MT = TypeVar("_MT", bound=SQLAlchemyBaseModel)
class BaseRepository(Generic[_MT]):
_model: type[_MT]
_session: AsyncSession
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get_by_id(self, id_: UUID) -> _MT:
"""
Get model instance by id
"""
query = select(self._model).where(self._model.id == id_)
result = await self._session.execute(query)
return result.scalar_one()
async def get_all(self) -> Sequence[_MT]:
"""
Get all model instances
"""
query = select(self._model)
result = await self._session.execute(query)
return result.scalars().all()
async def save(self, instance: _MT) -> _MT:
"""
Save a new model instance or update if exists
"""
inspr = inspect(instance)
if not inspr.modified and inspr.has_identity:
return instance
self._session.add(instance)
await self._session.flush()
await self._session.refresh(instance)
return instance
async def delete(self, instance: _MT) -> None:
"""
Delete a model instance from database
"""
await self._session.delete(instance)
await self._session.flush()
async def count(
self, from_query: ColumnExpressionArgument[bool] | None = None, sub_query: Subquery | None = None
) -> int:
if from_query is not None:
query = select(func.count()).select_from(select(self._model).where(from_query).subquery())
elif sub_query is not None:
query = select(func.count()).select_from(sub_query)
else:
query = select(func.count()).select_from(self._model)
result = await self._session.execute(query)
return result.scalar_one()
from src.somewhere.videos.models import Video
from src.somewhere.db.repositories import BaseRepository
class VideoRepository(BaseRepository[Video]):
_model = Video
import uuid
from fastapi import APIRouter, status
from sqlalchemy.exc import NoResultFound
from src.somewhere.dependencies.db import AsyncSessionDep # made above for you
from src.somewhere.videos.dao import VideoDao
from src.somewhere.videos.models import Video
from src.somewhere.videos.repositories import VideoRepository
router = APIRouter(prefix="/{video_id}")
@router.get("/", response_model=VideoDao)
async def get_video_by_id(
video_id: uuid.UUID,
session: AsyncSessionDep,
) -> Video:
try:
return await VideoRepository(session).get_by_id(id_=video_id)
except NoResultFound as err:
raise YourOwnException("or do whatever you want") from err
Hello!
Can this work with AsyncSession? I am trying to work with the examples but I cannot seem to be able to overwrite the sorter.
I get this error
I have overwrite the
get_default_read
to work with async, but I cannot see how to do that in the page_sorter.