sqlalchemy / sqlalchemy

The Database Toolkit for Python
https://www.sqlalchemy.org
MIT License
9.05k stars 1.36k forks source link

Design a Repository pattern with sqlalchemy #11353

Closed brunolnetto closed 1 month ago

brunolnetto commented 1 month ago

Describe the use case

Hi guys. I have a use case which is oftentimes used along SQLAlchemy: the repository pattern. On this pattern, we separate concerns, storage and domain in such a way to allow acess along interface with the repository.

Databases / Backends / Drivers targeted

sqlalchemy 2.0.29 and Postgres 13

Example Use

from abc import ABC, abstractmethod
from typing import Union, TypeVar, List, Any
from uuid import UUID
from sqlalchemy.exc import SQLAlchemyError

from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Optional
from sqlalchemy import select

from sqlalchemy.orm import declarative_base

Base = declarative_base()

# Repository types
PrimaryKeyType = Union[int, str, UUID]
Model = TypeVar("Model", bound=Base)

class BaseRepository(ABC):
    @abstractmethod
    def find_all(self) -> List[Model]:
        pass

    @abstractmethod
    def find_by_id(self, pk: PrimaryKeyType) -> Model:
        pass

    @abstractmethod
    def save(self, entity: Model) -> Model:
        pass

    @abstractmethod
    def exists_by_id(self, id: PrimaryKeyType) -> bool:
        pass

    @abstractmethod
    def delete_by_id(self, id: PrimaryKeyType) -> None:
        pass

class RepositoryException(Exception):
    pass

def fail_message(action, e):
    return f"Failed to {action}: {e}"

# NOTE: Refactor this:
# https://medium.com/@lawsontaylor/the-factory-and-repository-pattern-with-sqlalchemy-and-pydantic-33cea9ae14e0
class SQLRepository(BaseRepository):
    def __init__(self, model: Model) -> None:
        self.model = model
        self.model_name = model.__class__.__name__

    async def __raise_repository_exception(self, action: str, e: Exception) -> None:
        fail_message = f"Failed to {action}: {e}"
        raise RepositoryException(fail_message)

    async def find_all(self, session: AsyncSession) -> List[Model]:
        try:
            # Await the coroutine to get the data list
            query_result = await session.execute(select(self.model))
            data = query_result.scalars().all()
            return data
        except SQLAlchemyError as e:
            action = f'find all {self.model_name}'
            await self.__raise_repository_exception(action, e)

    async def find_by_id(
        self, pk: PrimaryKeyType, session: AsyncSession
    ) -> Optional[Model]:

        try:
            data = await session.get(self.model, pk)
            return data
        except SQLAlchemyError as e:
            action = f'find {self.model_name} by id {pk}'
            await self.__raise_repository_exception(action, e)

    async def exists_by_id(self, pk: PrimaryKeyType, session: AsyncSession) -> bool:
        try:
            entity = await session.get(self.model, pk)
            return entity is not None
        except SQLAlchemyError as e:
            action = f'check existence of {self.model_name} by id {pk}'
            await self.__raise_repository_exception(action, e)

    async def delete_by_id(self, pk: PrimaryKeyType, session: AsyncSession) -> None:
        try:
            entity = await session.get(self.model, pk)
            if entity is not None:
                session.delete(entity)
                await session.commit()
        except SQLAlchemyError as e:
            action = f'delete {self.model_name} by id {pk}'
            await self.__raise_repository_exception(action, e)

    async def save(self, entity: Model, session: AsyncSession) -> Model:
        try:
            await session.add(entity)
            await session.commit()
            await session.refresh(entity)
            return entity

        except SQLAlchemyError as e:
            action = f'save {self.model_name}'
            await self.__raise_repository_exception(action, e)

    async def filter_by_field(
        self, field: str, value: Any, session: AsyncSession
    ) -> Optional[List[Model]]:
        try:
            query = select(self.model).filter(getattr(self.model, field) == value)
            data = await query.fetch_all()
            return data
        except SQLAlchemyError as e:
            action = f"filter {self.model_name} by field {field}={value}"
            await self.__raise_repository_exception(action, e)

Additional context

The above implementation is not fully correct. However, due my partial knowledge on the library, it is still hard for me to fix on agile pace. I found this library redbird, which does not work with Pydantic V2, which is sad.

CaselIT commented 1 month ago

Hi,

This seems like something that should be in an external library.

Looking at the implementation it seems ok for the function part, but you should pass a class to the init, not an instance.

I'm also not sure how pydantic figures in this