collerek / ormar

python async orm with fastapi in mind and pydantic validation
https://collerek.github.io/ormar/
MIT License
1.61k stars 81 forks source link

Specify connection to operation #1360

Open zevisert opened 2 weeks ago

zevisert commented 2 weeks ago

Is your feature request related to a problem? Please describe. I can't work with ormar in concurrent contexts very well since databases^0.8, because databases has changed (my doing in https://github.com/encode/databases/pull/546) to not share connections across asyncio tasks. Databases will now create a new connection for each asyncio.Task. For me this is a problem when I want to do some concurrent work within a transaction.

Describe the solution you'd like A clear and concise description of what you want to happen.

I'd like to be able to specify a databases connection to use for a given operation. Something like this would be great:

Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

I don't have any alternative options right now at all.

Additional context Add any other context or screenshots about the feature request here.

The MVCE below shows this.

To run it:

  1. Run docker run -it --rm -e POSTGRES_PASSWORD=demo-1360 -p 5432:5432 postgres in one terminal
  2. Run python -m venv .demo-1360 && source .demo-1360/bin/activate
  3. Run pip install ormar==0.12.2 asyncpg && pip install databases==0.8.0 See endnote
  4. Save the file below as ormar-connection.py
  5. Then run python ormar-connection.py --concurrent (should error),
    • Running python ormar-connection.py won't error
Source for ormar-connection.py ```py import asyncio import sys import databases import ormar import sqlalchemy from sqlalchemy.ext.asyncio import create_async_engine DATABASE_URL = "postgresql+asyncpg://postgres:demo-1360@localhost:5432" db = databases.Database(DATABASE_URL) meta = sqlalchemy.MetaData() class BaseMeta(ormar.ModelMeta): database = db metadata = meta class Department(ormar.Model): class Meta(BaseMeta): tablename = "departments" id: int = ormar.Integer(primary_key=True) name: str = ormar.String(max_length=100) class Course(ormar.Model): class Meta(BaseMeta): tablename = "courses" id: int = ormar.Integer(primary_key=True) name: str = ormar.String(max_length=100) department: Department | None = ormar.ForeignKey(Department) async def main(parallel: bool): async with create_async_engine(DATABASE_URL).begin() as conn: await conn.run_sync(meta.drop_all) await conn.run_sync(meta.create_all) async with db: async with db.transaction(): csse = await Department( id=1337, name="Computer Science & Software Engineering", ).save() courses = [ ("Introduction to Computer Science", 101), ("Computer Architecture", 255), ("Algorithms and Data Structures:I", 225), ("Algorithms and Data Structures:II", 226), ("Operating Systems", 360), ("Database Systems", 370), ("Concurrent Programming and Distributed Systems", 461), ("Analysis of Algorithms", 425), ("Data Analysis and Pattern Recognition", 535), ] if parallel: async with asyncio.TaskGroup() as tasks: for name, id in courses: tasks.create_task( Course(id=id, name=name, department=csse).save() ) else: for name, id in courses: await Course(id=id, name=name, department=csse).save() print(await Course.objects.all()) asyncio.run( main( parallel=bool( {"-c", "--concurrent", "--concurrent=true"} & {arg.lower() for arg in sys.argv} ) ) ) ```

The gist is that by running this work in separate asyncio tasks, databases creates a new connection, and in the context of that new connection, there's no active transaction and since that csse department is not yet committed, we get a foreign key error.

# except from complete example in the file above
async with db:
    async with db.transaction():
        csse = await Department(id=1337, name="Computer Science & Software Engineering").save()
        courses = [ ... ]
        async with asyncio.TaskGroup() as tasks:
            for name, id in courses:
                tasks.create_task(Course(id=id, name=name, department=csse).save())

Here I'm just turning the coroutine from .save() into a task as a placeholder for a real world example where there's IO bound work to be done for each "course", and it'd be significantly faster to do that concurrently.

If I could just tell ormar which connection to use, this wouldn't be a problem.

Note: I'm filing this as an enhancement rather than a bug, since ormar@0.12.2 requires databases<0.6.3 and ormar@0.20.1 requires databases^0.7.0, and connection use across tasks is fixed in databases@0.8.0; so currently ormar doesn't experience this, but it will

zevisert commented 2 weeks ago

@tarsil Out of curiosity, does edgy allow connections to be specified like this? You might already have this issue

tarsil commented 2 weeks ago

@zevisert good question. I need to have a look and come back to you. Thank you for tagging me on it. @devkral can we run some tests and check this, please?

tarsil commented 2 weeks ago

@zevisert i can see I your example that you want to use different connections? We have something similar with the "using" and "using_with_db" operations on a queryset. We don't share connections per se but We have it working properly but this might be another underlying problem that might require some addressing.

zevisert commented 2 weeks ago

@tarsil I haven't tested, but I think this may be an issue for you based on what I see in Edgy's docs. As of databases>=0.8.0, you have to provide a way to share the current connection to reuse the state of a transaction across asyncio.Tasks. Edgy's using seems to only provide the ability to refer to another registered database or schema by it's registered name; it doesn't seem like a databases.core.connection instance can be specified to Edgy.