thearchitector / sqlalchemy-triopg

A Trio-based PostgreSQL SQLAlchemy driver using asyncpg
BSD 3-Clause Clear License
1 stars 0 forks source link

Concurrency issues #1

Closed iiLaurens closed 2 weeks ago

iiLaurens commented 1 month ago

This library is great, it is only a small adjustment and it somehow makes a complex library like SQLAlchemy compatible with Trio and Postgresql! I had difficulties getting anything else to work.

I did notice however that if I have multiple coroutines making a connection that I would get errors originating from trio_asyncio about asyncio futures being active. I am specifically sharing a singleton engine and singleton async_sessionmaker for all those coroutines. Now I had to wrap any use of the async_sessionmaker with a lock to prevent concurrent access to the database.

Am I doing something wrong here?

thearchitector commented 1 month ago

do you mean that you're having difficulty running concurrent tasks using the same session object? if so, that pattern is not supported by SQLAlchemy - every task requires its own database session.

if that's not what you mean, could you share a code snippet?

iiLaurens commented 1 month ago

do you mean that you're having difficulty running concurrent tasks using the same session object? if so, that pattern is not supported by SQLAlchemy - every task requires its own database session.

if that's not what you mean, could you share a code snippet?

I exclusively use the async sessionmaker to create new sessions and I'm not passing any session objects around. I think that the sessionmaker makes new sessions only so I don't think I'm concurrently using the same session in different tasks.

To be sure, I wrapper the async_sessionmaker to print the id of the session instance every time it is created and they are unique. That means I am not using one session across tasks. I am also not passing around any session objects after creating, and exclusively use them within the scope the were created.

For your reference, this is how I now set up my sessionmaker to make it work:

db_lock = trio.Lock()

engine = create_async_engine(db_uri)

_async_session = async_sessionmaker(engine, expire_on_commit=False)

@contextlib.asynccontextmanager
async def async_session():
    async with db_lock:  # This is needed to avoid the error
      async with _async_session() as session:
          print(id(session))
          yield session
iiLaurens commented 1 month ago

The error & stacktrace I am getting when I disable the lock around the call to sessionmaker:

    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 574, in scalars
    |     result = await self.execute(
    |              ^^^^^^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute
    |     result = await greenlet_spawn(
    |              ^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 201, in greenlet_spawn
    |     result = context.throw(*sys.exc_info())
    |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2351, in execute
    |     return self._execute_internal(
    |            ^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2226, in _execute_internal
    |     conn = self._connection_for_bind(bind)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2095, in _connection_for_bind
    |     return trans._connection_for_bind(engine, execution_options)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "<string>", line 2, in _connection_for_bind
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/orm/state_changes.py", line 139, in _go
    |     ret_value = fn(self, *arg, **kw)
    |                 ^^^^^^^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1189, in _connection_for_bind
    |     conn = bind.connect()
    |            ^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3276, in connect
    |     return self._connection_cls(self)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 146, in __init__
    |     self._dbapi_connection = engine.raw_connection()
    |                              ^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3300, in raw_connection
    |     return self.pool.connect()
    |            ^^^^^^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 449, in connect
    |     return _ConnectionFairy._checkout(self)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 1263, in _checkout
    |     fairy = _ConnectionRecord.checkout(pool)
    |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 712, in checkout
    |     rec = pool._do_get()
    |           ^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", line 179, in _do_get
    |     with util.safe_reraise():
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 146, in __exit__
    |     raise exc_value.with_traceback(exc_tb)
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", line 177, in _do_get
    |     return self._create_connection()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 390, in _create_connection
    |     return _ConnectionRecord(self)
    |            ^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 674, in __init__
    |     self.__connect()
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 914, in __connect
    |     )._exec_w_sync_on_first_run(self.dbapi_connection, self)
    |       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/event/attr.py", line 481, in _exec_w_sync_on_first_run
    |     with self._exec_once_mutex:
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 224, in __enter__
    |     return await_fallback(self.mutex.acquire())
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 164, in await_fallback
    |     return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/laurens/projects/news-archive/venv/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
    |     value = await result
    |             ^^^^^^^^^^^^
    |   File "/usr/local/lib/python3.11/asyncio/locks.py", line 114, in acquire
    |     await fut
    | TypeError: trio.run received unrecognized yield message <Future pending>. Are you trying to use a library written for some other framework like asyncio? That won't work without some kind of compatibility shim.
thearchitector commented 1 month ago

if you're using session maker everywhere, then things should be working fine

would you mind providing a small example to reproduce the behavior? the snippet you provided, without the lock, should be functional...

iiLaurens commented 1 month ago

if you're using session maker everywhere, then things should be working fine

would you mind providing a small example to reproduce the behavior? the snippet you provided, without the lock, should be functional...

Haven't gotten around to it yet but I plan to produce a minimal working example soon

iiLaurens commented 1 month ago

I was able to reduce the code to a minimal working example in which the error still occurs.

from __future__ import annotations

import contextlib
import datetime
import sys

import trio
from loguru import logger
from sqlalchemy import DateTime
from sqlalchemy import select
from sqlalchemy.dialects import registry
from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, create_async_engine
from sqlalchemy.orm import (
    DeclarativeBase,
    Mapped,
    mapped_column,
)
from sqlalchemy.sql.expression import func
from trio_asyncio import run

registry.register("triopg", "sqlalchemy_triopg.triopg", "TrioPGDialect")

engine = create_async_engine("triopg://my-server", echo=False)

async_session = async_sessionmaker(engine, expire_on_commit=False)

class Base(AsyncAttrs, DeclarativeBase):
    __abstract__ = True

class Article(Base):
    __tablename__ = "articles"

    id: Mapped[int] = mapped_column(primary_key=True)
    url: Mapped[str]
    title: Mapped[str]
    on_archive: Mapped[int]
    on_disk: Mapped[int]
    lastmod: Mapped[datetime.datetime] = mapped_column(DateTime(timezone=True))
    site: Mapped[int]
    publish_date: Mapped[datetime.datetime] = mapped_column(DateTime(timezone=True))
    record_date: Mapped[datetime.datetime] = mapped_column(DateTime(timezone=True))
    skip: Mapped[int] = mapped_column(default=0)
    attempts: Mapped[int]
    last_attempt_msg: Mapped[str]
    last_attempt_ts: Mapped[datetime.datetime] = mapped_column(DateTime(timezone=True))

async def load_from_db() -> list[Article]:
    async with async_session() as session:
        stmt = select(Article)

        return (await session.scalars(stmt)).all()

async def main():
    async with trio.open_nursery() as nrs:
        nrs.start_soon(load_from_db)
        nrs.start_soon(load_from_db)

if __name__ == "__main__":
    run(main)

This is my pip environment (linux, ubuntu server, python 3.11.4):

ago==0.0.95
aio-databases==0.15.0
aiobotocore==2.5.4
aiohttp==3.8.5
aioitertools==0.11.0
aiopg==1.4.0
aiosignal==1.3.1
annotated-types==0.6.0
anyio==4.0.0
async-generator==1.10
async-timeout==4.0.3
asyncpg==0.29.0
attrs==23.1.0
Automat==22.10.0
beautifulsoup4==4.12.2
boto3==1.28.52
botocore==1.31.17
bs4==0.0.1
cachetools==5.3.3
certifi==2024.6.2
cffi==1.15.1
chardet==5.2.0
charset-normalizer==3.2.0
click==8.1.7
cloudpickle==3.0.0
connectorx==0.3.3
constantly==15.1.0
courlan==0.9.4
cryptography==41.0.4
cssselect==1.2.0
curl_cffi==0.7.0b4
dateparser==1.1.8
deltalake==0.18.2
dotmap==1.3.30
duckdb==1.0.0
elastic-transport==8.4.0
elasticsearch==8.9.0
expiring-dict==1.1.0
faust-cchardet==2.1.19
feedfinder2==0.0.4
feedparser==6.0.10
filelock==3.12.4
frozenlist==1.4.0
fsspec==2023.9.2
greenlet==3.0.3
h11==0.14.0
hjson==3.1.0
htmldate==1.5.1
httpcore==0.17.3
httpx==0.24.1
hurry.filesize==0.9
hyperlink==21.0.0
idna==3.4
incremental==22.10.0
iniconfig==2.0.0
itemadapter==0.8.0
itemloaders==1.1.0
jieba3k==0.35.1
jmespath==1.0.1
joblib==1.3.2
jusText==3.0.0
langcodes==3.3.0
langdetect==1.0.9
loguru==0.7.1
lxml==4.9.3
multidict==6.0.4
mypy==1.8.0
mypy-extensions==1.0.0
nltk==3.8.1
numpy==1.25.2
outcome==1.2.0
packaging==23.1
pandas==2.1.4
parsel==1.8.1
peewee==3.17.6
peewee-aio==1.7.4
Pillow==10.0.1
plac==1.4.0
pluggy==1.3.0
polars==1.2.1
Protego==0.3.0
psycopg2-binary==2.9.7
pyarrow==17.0.0
pyarrow-hotfix==0.6
pyasn1==0.5.0
pyasn1-modules==0.3.0
pycparser==2.21
pydantic==2.6.1
pydantic_core==2.16.2
PyDispatcher==2.0.7
pyee==11.1.0
PyMySQL==1.1.0rc2
pyOpenSSL==23.2.0
PySocks==1.7.1
pytest==7.4.2
pytest-pycharm==0.7.0
pytest-trio==0.8.0
pythainlp==4.0.2
python-dateutil==2.8.2
pytz==2023.3.post1
PyYAML==6.0.1
queuelib==1.6.2
readability-lxml==0.8.1
regex==2023.8.8
requests==2.31.0
requests-file==1.5.1
s3fs==2023.9.2
s3transfer==0.6.2
Scrapy==2.11.0
selenium==4.22.0
service-identity==23.1.0
sgmllib3k==1.0.0
six==1.16.0
sniffio==1.3.0
socksio==1.0.0
sortedcontainers==2.4.0
soupsieve==2.5
SQLAlchemy==2.0.32
sqlalchemy-triopg==1.0.0
tblib==3.0.0
tinysegmenter==0.3
tld==0.13
tldextract==3.6.0
tqdm==4.66.1
trafilatura==1.6.2
trio==0.22.2
trio-asyncio==0.12.0
trio-mysql==1.0.3
trio-parallel==1.2.1
trio-util==0.7.0
trio-websocket==0.11.1
triopg==0.6.0
Twisted==22.10.0
typing_extensions==4.12.2
tzdata==2023.3
tzlocal==5.0.1
urllib3==1.26.16
w3lib==2.1.2
warcio==1.7.4
websocket-client==1.8.0
wrapt==1.15.0
wsproto==1.2.0
yarl==1.9.2
zope.interface==6.0
iiLaurens commented 1 month ago

Upon further investigation I found that a asyncio Lock object is the issue. I was able to solve the error by monkey patching the lock being used by SQLAlchemy:

from sqlalchemy.util.langhelpers import memoized_property
from sqlalchemy.util._concurrency_py3k import await_fallback
from typing import Any
import sqlalchemy

class AsyncAdaptedLock:
    @memoized_property
    def mutex(self) -> trio.Lock:
        # there should not be a race here for coroutines creating the
        # new lock as we are not using await, so therefore no concurrency
        return trio.Lock()

    def __enter__(self) -> bool:
        # await is used to acquire the lock only after the first calling
        # coroutine has created the mutex.
        return await_fallback(self.mutex.acquire())

    def __exit__(self, *arg: Any, **kw: Any) -> None:
        self.mutex.release()

if __name__ == "__main__":
    from unittest.mock import patch

    sqlalchemy.event.attr.AsyncAdaptedLock = AsyncAdaptedLock
    run(main)

Probably the easiest way would be to immediately monkey patch the Lock class in the asyncio library with trio's Lock, but I do not know what kind of unintended consequences that might have on other asyncio enabled packages with trio.

thearchitector commented 1 month ago

hi @iiLaurens: well, this is a fascinating problem! i can reproduce this error, but it seems to come up only in certain circumstances. my existing unit tests always drop and recreate the database tables before every test. if i do that, this error does not appear. if i let the tables persist between tests, though, I run into this error.

strange indeed

however, your proposal for patching sqlalchemy's lock is seemingly the best option. i'll play around with it a little more, and release a new version that includes whatever changes are necessary. thanks!

thearchitector commented 1 month ago

this should be resolved in v1.0.1 if you could try it out and lmk

thearchitector commented 3 weeks ago

@iiLaurens does the new version work for you?