BeanieODM / beanie

Asynchronous Python ODM for MongoDB
http://beanie-odm.dev/
Apache License 2.0
2.04k stars 215 forks source link

Using transactions in order to ensure data consistency #172

Closed mr-blue-sky closed 1 year ago

mr-blue-sky commented 2 years ago

Hey, First of all, I wanted to say thank you very much for all of your hard work on this library! It's great! :blush:

Problem

I want to implement a system like this:

Domain Models

from typing import List, Optional

from pydantic import BaseModel

class PostDetails(BaseModel):
    slug: str
    title: str
    description: Optional[str] = None
    body: str
    tags: List[str] = []

class Post(PostDetails):
    _id: str

class CategoryDetails(BaseModel):
    slug: str
    title: str
    description: Optional[str] = None

class Category(CategoryDetails):
    _id: str

Beanie Implementation

from typing import AsyncIterable, List, Optional

from beanie import Document, Indexed
from bson import ObjectId
from pymongo.client_session import ClientSession

IndexedStrField = Indexed(str, unique=True)

class PostDocument(Document):
    slug: IndexedStrField  # Question 1
    title: str
    description: Optional[str]
    body: str
    tags: List[str]
    categories: List[ObjectId]

    def to_post_model(self) -> Post:
        return Post(
            _id=str(self.id),
            slug=self.slug,
            title=self.title,
            description=self.description,
            body=self.body,
            tags=self.tags,
        )

class CategoryDocument(Document):
    slug: IndexedStrField
    title: str
    description: Optional[str]
    parent_category: Optional[ObjectId]

    def to_category_model(self) -> Category:
        return Category(
            _id=str(self.id),
            slug=self.slug,
            title=self.title,
            description=self.description,
        )

    async def get_subcategories_recursively(
        self, session: Optional[ClientSession] = None
    ) -> AsyncIterable["CategoryDocument"]:
        async for subcategory in CategoryDocument.find(
            CategoryDocument.parent_category == self.id, session=session
        ):
            yield subcategory
            async for subsubcategory in subcategory.get_subcategories_recursively(
                session
            ):
                yield subsubcategory

    async def get_posts(
        self, session: Optional[ClientSession] = None
    ) -> AsyncIterable[PostDocument]:
        async for post in PostDocument.find(
            PostDocument.categories == self.id, session=session
        ):  # Question 2
            yield post

    async def delete_all_posts(self, session: Optional[ClientSession] = None) -> None:
        async for post in self.get_posts(session):
            await post.delete(session=session)

    async def delete_recursively(self, session: Optional[ClientSession] = None) -> None:
        await self.delete_all_posts(session)

        async for subcaregory in self.get_subcategories_recursively(session):
            await subcaregory.delete_all_posts(session)
            await subcaregory.delete(session=session)

Naive Usage

category: Category = ...
await category.delete_recursively()

Questions

  1. About Question 1: my mypy is not satisfied with this statement. do you know what should I do in order to solve this?
  2. About Question 2: is it going to query all of the posts containing category.id in their categories field correctly?
  3. My main question: I want to make my await category.delete_recursively() is atomic, in order to ensure data consistency. I thought about transactions. What should I do?

Possible Solution

will it work for me?

client: AsyncIOMotorClient = ... # as declared before
async with await client.start_session() as s:
    async with s.start_transaction():
        await category.delete_recursively(s)

And another question, is it necessary to pass the session to all of these methods?

Thank you very very much! :smile:

mr-blue-sky commented 2 years ago

Hi, I would love to know if the method I presented is correct, because if so, I think it can be used by a lot of people. Thank you very much :)

roman-right commented 2 years ago

Hey, Sorry. I missed your issue.

  1. Unfortunately I dnt know. MyPy is tricky and I spent 50% of the whole development time fixing issues with it, but still have no idea about the nature of some of them :)
  2. It looks correctly
  3. Yup, it should work as you wrote. I plan to add a transaction context manager to the document class to be able to code it in a less verbose way, but there are some tricks with it. Also yup. for now you should pass session to the each method. I dnt know, if I have to add autodecetor for the sessions as with it it would be impossible to run queries inside session context manager without using this session. But it would be comfier, do agree
mr-blue-sky commented 2 years ago

Thank you, @roman-right! I implemented a context manager like this:

class MongoClient:

    _client: AsyncIOMotorClient
    _database: str

    def __init__(self, connection_string: str, database: str) -> None:
        self._client = AsyncIOMotorClient(connection_string)
        self._database = database

    async def initialize(self) -> None:
        await init_beanie(database=self._client[self._database], document_models=[PostDocument, CategoryDocument])

    @asynccontextmanager
    async def start_transaction(self) -> AsyncIterator[MongoRepository]:
        async with await self._client.start_session() as session:
            async with session.start_transaction():
                yield MongoRepository(session)

where MongoRepository is just a class that uses beanie:

class MongoRepository(Repository):

    _session: ClientSession

    def __init__(self, session: ClientSession):
        self._session = session

    async def create_post(self, details: PostDetails) -> Post:
        doc = PostDocument(
            slug=details.slug,
            title=details.title,
            description=details.description,
            body=details.body,
            tags=details.tags,
            categories=[],
        )
        await doc.insert(session=self._session)

        return doc.to_post_model()

    async def get_all_posts(self) -> AsyncIterable[Post]:
        async for doc in PostDocument.find_all(session=self._session):
            yield doc.to_post_model()
    # more methods...

and usage via


client = MongoClient("...", "...")
await client.initialize()
async with client.start_transaction() as repo:
    print(await repo.get_all_posts())
    # more operations...

what do you say?

roman-right commented 2 years ago

It looks neat. Do you control somehow when you stop the transaction? When endpoint returns the result or it works during another object lifetime?

mozTheFuzz commented 1 year ago

@mr-blue-sky Your solution looks very neat. I am looking for ways to 'dynamically' append operations to a session/transaction in MongoDB.

Wonder how do you close the transaction when all operations are complete?

github-actions[bot] commented 1 year ago

This issue is stale because it has been open 30 days with no activity.

github-actions[bot] commented 1 year ago

This issue was closed because it has been stalled for 14 days with no activity.

jtdiscovery commented 1 year ago

I'd like to ping this back up - @mr-blue-sky I really dig this approach - any downsides you've encountered?