sqlalchemy / sqlalchemy

The Database Toolkit for Python
https://www.sqlalchemy.org
MIT License
9.67k stars 1.44k forks source link

Joining a Session into an External Transaction (async API) #5811

Closed PhillCli closed 3 years ago

PhillCli commented 3 years ago

Describe your question

Trying to setup "Joining a Session into an External Transaction (such as for test suite)" recipe for pytest and asyncio API

Example sync pytest recipe works fine

import pytest
from sqlalchemy.orm import Session
from sqlalchemy import event, Column, Integer, create_engine
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

# a model
class Thing(Base):
    __tablename__ = "thing"

    id = Column(Integer, primary_key=True)

@pytest.fixture(scope="session")
def engine_fixture():
    engine = create_engine("postgresql://postgres:changethis@db/app_test", echo=True)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)

    yield engine

    Base.metadata.drop_all(engine)

@pytest.fixture
def session(engine_fixture):
    conn = engine_fixture.connect()
    trans = conn.begin()
    session = Session(bind=conn)

    def _fixture(session):
        session.add_all([Thing(), Thing(), Thing()])
        session.commit()

    # load fixture data within the scope of the transaction
    _fixture(session)

    # start the session in a SAVEPOINT...
    session.begin_nested()

    # then each time that SAVEPOINT ends, reopen it
    @event.listens_for(session, "after_transaction_end")
    def restart_savepoint(session, transaction):
        if transaction.nested and not transaction._parent.nested:
            session.begin_nested()

    yield session

    # same teardown from the docs
    session.close()
    trans.rollback()
    conn.close()

def _test_thing(session, extra_rollback=0):

    rows = session.query(Thing).all()
    assert len(rows) == 3

    for elem in range(extra_rollback):
        # run N number of rollbacks
        session.add_all([Thing(), Thing(), Thing()])
        rows = session.query(Thing).all()
        assert len(rows) == 6

        session.rollback()

    # after rollbacks, still @ 3 rows
    rows = session.query(Thing).all()
    assert len(rows) == 3

    session.add_all([Thing(), Thing()])
    session.commit()

    rows = session.query(Thing).all()
    assert len(rows) == 5

    session.add(Thing())
    rows = session.query(Thing).all()
    assert len(rows) == 6

    for elem in range(extra_rollback):
        # run N number of rollbacks
        session.add_all([Thing(), Thing(), Thing()])
        rows = session.query(Thing).all()
        if elem > 0:
            # b.c. we rolled back that other "thing" too
            assert len(rows) == 8
        else:
            assert len(rows) == 9
        session.rollback()

    rows = session.query(Thing).all()
    if extra_rollback:
        assert len(rows) == 5
    else:
        assert len(rows) == 6

def test_thing_one_pytest(session):
    # run zero rollbacks
    _test_thing(session, 0)

def test_thing_two_pytest(session):
    # run two extra rollbacks
    _test_thing(session, 2)

port to asyncio API, togheter with pytest-asyncio (0.14.0) fails, due to wrong/malfunctioning teardown of the first test-case:

import pytest
from sqlalchemy import Column, Integer, create_engine, event
from sqlalchemy.future import select
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine

Base = declarative_base()

# a model
class Thing(Base):
    __tablename__ = "thing"

    id = Column(Integer, primary_key=True)

@pytest.fixture(scope="session", autouse=True)
def meta_migration():
    # setup
    sync_engine = create_engine(
        "postgresql://postgres:changethis@db/app_test", echo=True
    )
    Base.metadata.drop_all(sync_engine)
    Base.metadata.create_all(sync_engine)

    yield sync_engine

    # teardown
    Base.metadata.drop_all(sync_engine)

@pytest.fixture(scope="session")
async def async_engine() -> AsyncEngine:
    # setup
    engine = create_async_engine(
        "postgresql+asyncpg://postgres:changethis@db/app_test", echo=True
    )

    yield engine

@pytest.fixture(scope="function")
async def session(async_engine):
    conn = await async_engine.connect()
    trans = await conn.begin()
    session = AsyncSession(bind=conn)

    async def _fixture(session: AsyncSession):
        session.add_all([Thing(), Thing(), Thing()])
        await session.commit()

    # load fixture data within the scope of the transaction
    await _fixture(session)

    # start the session in a SAVEPOINT...
    await session.begin_nested()

    # then each time that SAVEPOINT ends, reopen it
    # NOTE: no async listeners yet
    @event.listens_for(session.sync_session, "after_transaction_end")
    def restart_savepoint(session, transaction):
        if transaction.nested and not transaction._parent.nested:
            session.begin_nested()

    yield session

    # same teardown from the docs
    await session.close()
    await trans.rollback()
    await conn.close()

async def _test_thing(session: AsyncSession, extra_rollback=0):

    rows = (await session.execute(select(Thing))).all()
    assert len(rows) == 3

    for elem in range(extra_rollback):
        # run N number of rollbacks
        session.add_all([Thing(), Thing(), Thing()])
        rows = (await session.execute(select(Thing))).all()
        assert len(rows) == 6

        await session.rollback()

    # after rollbacks, still @ 3 rows
    rows = (await session.execute(select(Thing))).all()
    assert len(rows) == 3

    session.add_all([Thing(), Thing()])
    await session.commit()

    rows = (await session.execute(select(Thing))).all()
    assert len(rows) == 5

    session.add(Thing())
    rows = (await session.execute(select(Thing))).all()
    assert len(rows) == 6

    for elem in range(extra_rollback):
        # run N number of rollbacks
        session.add_all([Thing(), Thing(), Thing()])
        rows = (await session.execute(select(Thing))).all()
        if elem > 0:
            # b.c. we rolled back that other "thing" too
            assert len(rows) == 8
        else:
            assert len(rows) == 9
        await session.rollback()

    rows = (await session.execute(select(Thing))).all()
    if extra_rollback:
        assert len(rows) == 5
    else:
        assert len(rows) == 6

@pytest.mark.asyncio
async def test_thing_one_pytest(session):
    # run zero rollbacks
    await _test_thing(session, 0)

@pytest.mark.asyncio
async def test_thing_two_pytest(session):
    # run two extra rollbacks
    await _test_thing(session, 2)

Complete stack trace, if applicable

test_async.py::test_thing_two_pytest 
---------------------------------------------------------------------------------------------------------- live log setup ----------------------------------------------------------------------------------------------------------
INFO     sqlalchemy.engine.Engine:log.py:117 BEGIN (implicit)
INFO     sqlalchemy.engine.Engine:log.py:117 BEGIN (implicit)
INFO     sqlalchemy.engine.Engine:log.py:117 INSERT INTO thing DEFAULT VALUES RETURNING thing.id
INFO     sqlalchemy.engine.Engine:log.py:117 [cached since 0.05257s ago] ()
INFO     sqlalchemy.engine.Engine:log.py:117 INSERT INTO thing DEFAULT VALUES RETURNING thing.id
INFO     sqlalchemy.engine.Engine:log.py:117 [cached since 0.05624s ago] ()
INFO     sqlalchemy.engine.Engine:log.py:117 INSERT INTO thing DEFAULT VALUES RETURNING thing.id
INFO     sqlalchemy.engine.Engine:log.py:117 [cached since 0.05979s ago] ()
INFO     sqlalchemy.engine.Engine:log.py:117 COMMIT
---------------------------------------------------------------------------------------------------------- live log call -----------------------------------------------------------------------------------------------------------
INFO     sqlalchemy.engine.Engine:log.py:117 BEGIN (implicit)
INFO     sqlalchemy.engine.Engine:log.py:117 SAVEPOINT sa_savepoint_1
INFO     sqlalchemy.engine.Engine:log.py:117 [no key 0.00043s] ()
INFO     sqlalchemy.engine.Engine:log.py:117 SELECT thing.id 
FROM thing
INFO     sqlalchemy.engine.Engine:log.py:117 [cached since 0.05067s ago] ()
FAILED                                                                                                                                                                                                                       [100%]
-------------------------------------------------------------------------------------------------------- live log teardown ---------------------------------------------------------------------------------------------------------
INFO     sqlalchemy.engine.Engine:log.py:117 ROLLBACK
INFO     sqlalchemy.engine.Engine:log.py:117 ROLLBACK
INFO     sqlalchemy.engine.Engine:log.py:117 BEGIN (implicit)
INFO     sqlalchemy.engine.Engine:log.py:117 select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
INFO     sqlalchemy.engine.Engine:log.py:117 [cached since 0.3944s ago] {'name': 'thing'}
INFO     sqlalchemy.engine.Engine:log.py:117 
DROP TABLE thing
INFO     sqlalchemy.engine.Engine:log.py:117 [no key 0.00040s] {}
INFO     sqlalchemy.engine.Engine:log.py:117 COMMIT

============================================================================================================= FAILURES =============================================================================================================
______________________________________________________________________________________________________ test_thing_two_pytest _______________________________________________________________________________________________________

session = <sqlalchemy.ext.asyncio.session.AsyncSession object at 0x7fea00ccea90>

    @pytest.mark.asyncio
    async def test_thing_two_pytest(session):
        # run two extra rollbacks
>       await _test_thing(session, 2)

test_async.py:126: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

session = <sqlalchemy.ext.asyncio.session.AsyncSession object at 0x7fea00ccea90>, extra_rollback = 2

    async def _test_thing(session: AsyncSession, extra_rollback=0):

        rows = (await session.execute(select(Thing))).all()
>       assert len(rows) == 3
E       assert 8 == 3
E         +8
E         -3

test_async.py:75: AssertionError

Any clues on what I could be doing wrong? Worth mentioning is that restart_savepoint is forced to use AsyncSession.sync_session for listeners, as async listeners are not yet implemented, could it be a reason?

Versions

Thanks!

gordthompson commented 3 years ago

Also being discussed on Stack Overflow here.

zzzeek commented 3 years ago

hey there. there's a lot happening here so we would have to find some time to dig into it, I would make the immediate suggestion to see if the recipe without the SAVEPOINT is working correctly. that is, remove the begin_nested() block and don't call rollback() within a session test. I've no experience with pytest's asyncio support. cc @CaselIT

CaselIT commented 3 years ago

will try to have a look tomorrow.

Regarding pytest asyncio iirc just runs the function in the loop with asyncio.run

CaselIT commented 3 years ago

@PhillCli running your example I get the following errors:

ScopeMismatch: You tried to access the 'function' scoped fixture 'event_loop' with a 'session' scoped request object, involved factories

Removing the scope from the fixtures I get two passes.

I'm using pytest 6.0.1 and 6.2.1 and pytest-asyncio 0.14.0

PhillCli commented 3 years ago

Thanks for looking at it!

@CaselIT I see, I've contaminated my test dir with external conftest.py, where I redefined event_yield fixture of pytest-asyncio. My goal was to run db schema migration once per test session, thus using scope=session was intended, but I forgot to include it in the posted snippet, pytest-asyncio issue. If one makes everything default scoped (scope="function"), then schema drops are run after each test case, which prevents db state "leak" from the first test to the second, making it effecteviely "ultimate" transaction rollback :)

In the original recipe I was looking at, zzzek's gist, it's done explicitly outside of the unittest.TestCase

...
# a database w a schema
engine = create_engine("postgresql://scott:tiger@localhost/test", echo=True)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
...

To sum up, I belive that the following should work, but it does not:

  1. I'm running schema migration with non-async fixture once per session (no conflicting scopes for pytest-asyncio),
  2. I've removed scope="session" from async fixtures (alternatively one have to redefine event_loop fixture),
import pytest
from sqlalchemy import Column, Integer, create_engine, event
from sqlalchemy.future import select
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine

Base = declarative_base()

# a model
class Thing(Base):
    __tablename__ = "thing"

    id = Column(Integer, primary_key=True)

@pytest.fixture(scope="session", autouse=True)
def meta_migration():
    # setup
    sync_engine = create_engine(
        "postgresql://postgres:changethis@db/app_test", echo=True
    )
    Base.metadata.drop_all(sync_engine)
    Base.metadata.create_all(sync_engine)

    yield sync_engine

    # teardown
    Base.metadata.drop_all(sync_engine)

@pytest.fixture
async def async_engine() -> AsyncEngine:
    # setup
    engine = create_async_engine(
        "postgresql+asyncpg://postgres:changethis@db/app_test", echo=True
    )

    yield engine

@pytest.fixture
async def session(async_engine):
    conn = await async_engine.connect()
    trans = await conn.begin()
    session = AsyncSession(bind=conn)

    async def _fixture(session: AsyncSession):
        session.add_all([Thing(), Thing(), Thing()])
        await session.commit()

    # load fixture data within the scope of the transaction
    await _fixture(session)

    # start the session in a SAVEPOINT...
    await session.begin_nested()

    # then each time that SAVEPOINT ends, reopen it
    # NOTE: no async listeners yet
    @event.listens_for(session.sync_session, "after_transaction_end")
    def restart_savepoint(session, transaction):
        if transaction.nested and not transaction._parent.nested:
            session.begin_nested()

    yield session

    # same teardown from the docs
    await session.close()
    await trans.rollback()
    await conn.close()

async def _test_thing(session: AsyncSession, extra_rollback=0):

    rows = (await session.execute(select(Thing))).all()
    assert len(rows) == 3

    for elem in range(extra_rollback):
        # run N number of rollbacks
        session.add_all([Thing(), Thing(), Thing()])
        rows = (await session.execute(select(Thing))).all()
        assert len(rows) == 6

        await session.rollback()

    # after rollbacks, still @ 3 rows
    rows = (await session.execute(select(Thing))).all()
    assert len(rows) == 3

    session.add_all([Thing(), Thing()])
    await session.commit()

    rows = (await session.execute(select(Thing))).all()
    assert len(rows) == 5

    session.add(Thing())
    rows = (await session.execute(select(Thing))).all()
    assert len(rows) == 6

    for elem in range(extra_rollback):
        # run N number of rollbacks
        session.add_all([Thing(), Thing(), Thing()])
        rows = (await session.execute(select(Thing))).all()
        if elem > 0:
            # b.c. we rolled back that other "thing" too
            assert len(rows) == 8
        else:
            assert len(rows) == 9
        await session.rollback()

    rows = (await session.execute(select(Thing))).all()
    if extra_rollback:
        assert len(rows) == 5
    else:
        assert len(rows) == 6

@pytest.mark.asyncio
async def test_thing_one_pytest(session):
    # run zero rollbacks
    await _test_thing(session, 0)

@pytest.mark.asyncio
async def test_thing_two_pytest(session):
    # run two extra rollbacks
    await _test_thing(session, 2)

fails with the same error message (asssert 8 == 3).

@zzzeek Thanks for the suggestion, I'll try it and report if it helped, although ideally I would like to have possibilty to test rollbacks() as well.

PhillCli commented 3 years ago

Stripped begin_nested(), the issue perssists, seems like outermost transaction rollback (in teardown) has no effect:

import pytest
from sqlalchemy import Column, Integer, create_engine, event
from sqlalchemy.future import select
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine

Base = declarative_base()

# a model
class Thing(Base):
    __tablename__ = "thing"

    id = Column(Integer, primary_key=True)

@pytest.fixture(scope="session", autouse=True)
def meta_migration():
    # setup
    sync_engine = create_engine(
        "postgresql://postgres:changethis@db/app_test", echo=True
    )
    Base.metadata.drop_all(sync_engine)
    Base.metadata.create_all(sync_engine)

    yield sync_engine

    # teardown
    Base.metadata.drop_all(sync_engine)

@pytest.fixture
async def async_engine() -> AsyncEngine:
    # setup
    engine = create_async_engine(
        "postgresql+asyncpg://postgres:changethis@db/app_test", echo=True
    )

    yield engine

@pytest.fixture
async def session(async_engine):
    conn = await async_engine.connect()
    trans = await conn.begin()
    session = AsyncSession(bind=conn)

    async def _fixture(session: AsyncSession):
        session.add_all([Thing(), Thing(), Thing()])
        await session.commit()

    # load fixture data within the scope of the transaction
    await _fixture(session)

    yield session

    # same teardown from the docs
    await session.close()
    await trans.rollback()
    await conn.close()

async def _test_thing(session: AsyncSession):

    rows = (await session.execute(select(Thing))).all()
    assert len(rows) == 3

@pytest.mark.asyncio
async def test_thing_one_pytest(session):
    # run zero rollbacks
    await _test_thing(session)

@pytest.mark.asyncio
async def test_thing_two_pytest(session):
    # run two extra rollbacks
    await _test_thing(session)

full verbose traceback, with echo=True is:

root@c521a2003aec:/# pytest test_async.py -s -vv
======================================================================================================= test session starts ========================================================================================================
platform linux -- Python 3.8.5, pytest-6.1.2, py-1.10.0, pluggy-0.13.1 -- /usr/local/bin/python
cachedir: .pytest_cache
rootdir: /
plugins: faker-2.0.0, mock-3.3.1, asyncio-0.14.0, Faker-5.0.2, recording-0.11.0, cov-2.10.1
collected 2 items                                                                                                                                                                                                                  

test_async.py::test_thing_one_pytest 2021-01-02 12:48:46,044 INFO sqlalchemy.engine.Engine select version()
2021-01-02 12:48:46,045 INFO sqlalchemy.engine.Engine [raw sql] {}
2021-01-02 12:48:46,048 INFO sqlalchemy.engine.Engine select current_schema()
2021-01-02 12:48:46,048 INFO sqlalchemy.engine.Engine [raw sql] {}
2021-01-02 12:48:46,050 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2021-01-02 12:48:46,050 INFO sqlalchemy.engine.Engine [raw sql] {}
2021-01-02 12:48:46,052 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-01-02 12:48:46,054 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
2021-01-02 12:48:46,054 INFO sqlalchemy.engine.Engine [generated in 0.00044s] {'name': 'thing'}
2021-01-02 12:48:46,056 INFO sqlalchemy.engine.Engine COMMIT
2021-01-02 12:48:46,057 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-01-02 12:48:46,057 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
2021-01-02 12:48:46,058 INFO sqlalchemy.engine.Engine [cached since 0.003733s ago] {'name': 'thing'}
2021-01-02 12:48:46,059 INFO sqlalchemy.engine.Engine 
CREATE TABLE thing (
        id SERIAL NOT NULL, 
        PRIMARY KEY (id)
)

2021-01-02 12:48:46,059 INFO sqlalchemy.engine.Engine [no key 0.00033s] {}
2021-01-02 12:48:46,076 INFO sqlalchemy.engine.Engine COMMIT
2021-01-02 12:48:46,135 INFO sqlalchemy.engine.Engine select version()
2021-01-02 12:48:46,135 INFO sqlalchemy.engine.Engine [raw sql] ()
2021-01-02 12:48:46,137 INFO sqlalchemy.engine.Engine select current_schema()
2021-01-02 12:48:46,138 INFO sqlalchemy.engine.Engine [raw sql] ()
2021-01-02 12:48:46,147 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2021-01-02 12:48:46,148 INFO sqlalchemy.engine.Engine [raw sql] ()
2021-01-02 12:48:46,151 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-01-02 12:48:46,170 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-01-02 12:48:46,173 INFO sqlalchemy.engine.Engine INSERT INTO thing DEFAULT VALUES RETURNING thing.id
2021-01-02 12:48:46,173 INFO sqlalchemy.engine.Engine [generated in 0.00037s] ()
2021-01-02 12:48:46,176 INFO sqlalchemy.engine.Engine INSERT INTO thing DEFAULT VALUES RETURNING thing.id
2021-01-02 12:48:46,177 INFO sqlalchemy.engine.Engine [cached since 0.004132s ago] ()
2021-01-02 12:48:46,179 INFO sqlalchemy.engine.Engine INSERT INTO thing DEFAULT VALUES RETURNING thing.id
2021-01-02 12:48:46,180 INFO sqlalchemy.engine.Engine [cached since 0.007074s ago] ()
2021-01-02 12:48:46,184 INFO sqlalchemy.engine.Engine COMMIT
2021-01-02 12:48:46,188 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-01-02 12:48:46,192 INFO sqlalchemy.engine.Engine SELECT thing.id 
FROM thing
2021-01-02 12:48:46,192 INFO sqlalchemy.engine.Engine [generated in 0.00074s] ()
PASSED2021-01-02 12:48:46,199 INFO sqlalchemy.engine.Engine ROLLBACK
2021-01-02 12:48:46,200 INFO sqlalchemy.engine.Engine ROLLBACK

test_async.py::test_thing_two_pytest 2021-01-02 12:48:46,215 INFO sqlalchemy.engine.Engine select version()
2021-01-02 12:48:46,215 INFO sqlalchemy.engine.Engine [raw sql] ()
2021-01-02 12:48:46,218 INFO sqlalchemy.engine.Engine select current_schema()
2021-01-02 12:48:46,218 INFO sqlalchemy.engine.Engine [raw sql] ()
2021-01-02 12:48:46,224 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2021-01-02 12:48:46,225 INFO sqlalchemy.engine.Engine [raw sql] ()
2021-01-02 12:48:46,226 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-01-02 12:48:46,235 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-01-02 12:48:46,236 INFO sqlalchemy.engine.Engine INSERT INTO thing DEFAULT VALUES RETURNING thing.id
2021-01-02 12:48:46,236 INFO sqlalchemy.engine.Engine [generated in 0.00034s] ()
2021-01-02 12:48:46,240 INFO sqlalchemy.engine.Engine INSERT INTO thing DEFAULT VALUES RETURNING thing.id
2021-01-02 12:48:46,240 INFO sqlalchemy.engine.Engine [cached since 0.004235s ago] ()
2021-01-02 12:48:46,242 INFO sqlalchemy.engine.Engine INSERT INTO thing DEFAULT VALUES RETURNING thing.id
2021-01-02 12:48:46,242 INFO sqlalchemy.engine.Engine [cached since 0.006574s ago] ()
2021-01-02 12:48:46,245 INFO sqlalchemy.engine.Engine COMMIT
2021-01-02 12:48:46,247 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-01-02 12:48:46,248 INFO sqlalchemy.engine.Engine SELECT thing.id 
FROM thing
2021-01-02 12:48:46,248 INFO sqlalchemy.engine.Engine [generated in 0.00030s] ()
FAILED2021-01-02 12:48:46,327 INFO sqlalchemy.engine.Engine ROLLBACK
2021-01-02 12:48:46,328 INFO sqlalchemy.engine.Engine ROLLBACK
2021-01-02 12:48:46,329 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-01-02 12:48:46,329 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
2021-01-02 12:48:46,330 INFO sqlalchemy.engine.Engine [cached since 0.2757s ago] {'name': 'thing'}
2021-01-02 12:48:46,331 INFO sqlalchemy.engine.Engine 
DROP TABLE thing
2021-01-02 12:48:46,331 INFO sqlalchemy.engine.Engine [no key 0.00026s] {}
2021-01-02 12:48:46,333 INFO sqlalchemy.engine.Engine COMMIT

============================================================================================================= FAILURES =============================================================================================================
______________________________________________________________________________________________________ test_thing_two_pytest _______________________________________________________________________________________________________

session = <sqlalchemy.ext.asyncio.session.AsyncSession object at 0x7f1f4f187bd0>

    @pytest.mark.asyncio
    async def test_thing_two_pytest(session):
        # run two extra rollbacks
>       await _test_thing(session)

test_async.py:77: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

session = <sqlalchemy.ext.asyncio.session.AsyncSession object at 0x7f1f4f187bd0>

    async def _test_thing(session: AsyncSession):

        rows = (await session.execute(select(Thing))).all()
>       assert len(rows) == 3
E       assert 6 == 3
E         +6
E         -3

test_async.py:65: AssertionError
-------------------------------------------------------------------------------------------------------- Captured log setup --------------------------------------------------------------------------------------------------------
INFO     sqlalchemy.engine.Engine:log.py:117 select version()
INFO     sqlalchemy.engine.Engine:log.py:117 [raw sql] ()
INFO     sqlalchemy.engine.Engine:log.py:117 select current_schema()
INFO     sqlalchemy.engine.Engine:log.py:117 [raw sql] ()
INFO     sqlalchemy.engine.Engine:log.py:117 show standard_conforming_strings
INFO     sqlalchemy.engine.Engine:log.py:117 [raw sql] ()
INFO     sqlalchemy.engine.Engine:log.py:117 BEGIN (implicit)
INFO     sqlalchemy.engine.Engine:log.py:117 BEGIN (implicit)
INFO     sqlalchemy.engine.Engine:log.py:117 INSERT INTO thing DEFAULT VALUES RETURNING thing.id
INFO     sqlalchemy.engine.Engine:log.py:117 [generated in 0.00034s] ()
INFO     sqlalchemy.engine.Engine:log.py:117 INSERT INTO thing DEFAULT VALUES RETURNING thing.id
INFO     sqlalchemy.engine.Engine:log.py:117 [cached since 0.004235s ago] ()
INFO     sqlalchemy.engine.Engine:log.py:117 INSERT INTO thing DEFAULT VALUES RETURNING thing.id
INFO     sqlalchemy.engine.Engine:log.py:117 [cached since 0.006574s ago] ()
INFO     sqlalchemy.engine.Engine:log.py:117 COMMIT
-------------------------------------------------------------------------------------------------------- Captured log call ---------------------------------------------------------------------------------------------------------
INFO     sqlalchemy.engine.Engine:log.py:117 BEGIN (implicit)
INFO     sqlalchemy.engine.Engine:log.py:117 SELECT thing.id 
FROM thing
INFO     sqlalchemy.engine.Engine:log.py:117 [generated in 0.00030s] ()
------------------------------------------------------------------------------------------------------ Captured log teardown -------------------------------------------------------------------------------------------------------
INFO     sqlalchemy.engine.Engine:log.py:117 ROLLBACK
INFO     sqlalchemy.engine.Engine:log.py:117 ROLLBACK
INFO     sqlalchemy.engine.Engine:log.py:117 BEGIN (implicit)
INFO     sqlalchemy.engine.Engine:log.py:117 select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
INFO     sqlalchemy.engine.Engine:log.py:117 [cached since 0.2757s ago] {'name': 'thing'}
INFO     sqlalchemy.engine.Engine:log.py:117 
DROP TABLE thing
INFO     sqlalchemy.engine.Engine:log.py:117 [no key 0.00026s] {}
INFO     sqlalchemy.engine.Engine:log.py:117 COMMIT
===================================================================================================== short test summary info ======================================================================================================
FAILED test_async.py::test_thing_two_pytest - assert 6 == 3
=================================================================================================== 1 failed, 1 passed in 0.62s ====================================================================================================
CaselIT commented 3 years ago

Stripped begin_nested(), the issue perssists, seems like outermost transaction rollback (in teardown) has no effect:

Once you commit with the session.commit() there is nothing to rollback.

Same in your other complete case:

you do in the middle of the _test_thing, line 94: await session.commit(). This means that everything before that point has been committed, since you are acting on the session, not on the nested transaction. See the output sql with pytest -s, I've added the lines of each operation:

2021-01-02 14:32:54,976 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-01-02 14:32:54,976 INFO sqlalchemy.engine.Engine SAVEPOINT sa_savepoint_1
2021-01-02 14:32:54,976 INFO sqlalchemy.engine.Engine [no key 0.00024s] ()
# _test_thing line 78
2021-01-02 14:32:54,981 INFO sqlalchemy.engine.Engine SELECT thing.id
FROM thing
2021-01-02 14:32:54,982 INFO sqlalchemy.engine.Engine [generated in 0.00033s] ()
# _test_thing line 90
2021-01-02 14:32:54,986 INFO sqlalchemy.engine.Engine SELECT thing.id
FROM thing
2021-01-02 14:32:54,986 INFO sqlalchemy.engine.Engine [cached since 0.00446s ago] ()
# _test_thing line 93
2021-01-02 14:32:54,990 INFO sqlalchemy.engine.Engine INSERT INTO thing DEFAULT VALUES RETURNING thing.id
2021-01-02 14:32:54,991 INFO sqlalchemy.engine.Engine [cached since 0.04653s ago] ()
2021-01-02 14:32:54,995 INFO sqlalchemy.engine.Engine INSERT INTO thing DEFAULT VALUES RETURNING thing.id
2021-01-02 14:32:54,995 INFO sqlalchemy.engine.Engine [cached since 0.0508s ago] ()
# _test_thing line 94
2021-01-02 14:32:54,999 INFO sqlalchemy.engine.Engine RELEASE SAVEPOINT sa_savepoint_1
2021-01-02 14:32:54,999 INFO sqlalchemy.engine.Engine [no key 0.00028s] ()
2021-01-02 14:32:55,002 INFO sqlalchemy.engine.Engine COMMIT

See https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint

CaselIT commented 3 years ago

@PhillCli Maybe the documentation on this is not overly clear on this. Could you point to where you think it could be improved?

PhillCli commented 3 years ago

@PhillCli Maybe the documentation on this is not overly clear on this. Could you point to where you think it could be improved?

@CaselIT

So, it is intended behavior? I was basing my assumption on link, second paragraph, but might have misread it.

"The Session detects that the given Connection is already in a transaction and will not run COMMIT on it if the transaction is in fact an outermost transaction. Then when the test tears down, the transaction is rolled back so that any data changes throughout the test are reverted"

This means it won't run COMMIT on non-orm transaction (the one started before the session was created), but it will run the commit on the orm one, which results in the change to the database state, right?

If so, my question would be, how can I achive "so that any data changes throughout the test are reverted", can it be done using savepoints?

My thinking and desired use-case are:

  1. I create database schema once per test-suite session,
  2. Data fixtures populate given tables, and before the given test-case is entered a "rollbackable" savepoint is created,
  3. In test-case I test code, that explicitly calls session.commit() and session.rollback() and after it exits, the database state to is reverted to the savepoint created in 2.

If it can be done, maybe extending documentation with such a snippet will be beneficial for the community, if not I will have to re-consider the testing approach, and sorry for the trouble.

CaselIT commented 3 years ago

I was basing my assumption on link, second paragraph, but might have misread it.

I forgot about that part. This actually sounds like a bug, @zzzeek

@PhillCli does the same test works as expected using a sync connection? If that's the case, then I guess it's a bug in the detection of the connection in transaction.

PhillCli commented 3 years ago

@CaselIT I'm checking it, it seems it does pass.

sync snippet:

import pytest
from sqlalchemy.orm import Session
from sqlalchemy import event, Column, Integer, create_engine
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

# a model
class Thing(Base):
    __tablename__ = "thing"

    id = Column(Integer, primary_key=True)

@pytest.fixture(scope="session")
def engine_fixture():
    engine = create_engine("postgresql://postgres:changethis@db/app_test", echo=True)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)

    yield engine

    Base.metadata.drop_all(engine)

@pytest.fixture
def session(engine_fixture):
    conn = engine_fixture.connect()
    trans = conn.begin()
    session = Session(bind=conn)

    def _fixture(session):
        session.add_all([Thing(), Thing(), Thing()])
        session.commit()

    # load fixture data within the scope of the transaction
    _fixture(session)

    yield session

    # same teardown from the docs
    session.close()
    trans.rollback()
    conn.close()

def _test_thing(session, extra_rollback=0):

    rows = session.query(Thing).all()
    assert len(rows) == 3

def test_thing_one_pytest(session):
    # run zero rollbacks
    _test_thing(session)

def test_thing_two_pytest(session):
    # run two extra rollbacks
    _test_thing(session)

logs:

root@c521a2003aec:/# pytest test_sync.py  -s -vv
======================================================================================================= test session starts ========================================================================================================
platform linux -- Python 3.8.5, pytest-6.1.2, py-1.10.0, pluggy-0.13.1 -- /usr/local/bin/python
cachedir: .pytest_cache
rootdir: /
plugins: faker-2.0.0, mock-3.3.1, asyncio-0.14.0, Faker-5.0.2, recording-0.11.0, cov-2.10.1
collected 2 items                                                                                                                                                                                                                  

test_sync.py::test_thing_one_pytest 2021-01-02 14:57:27,744 INFO sqlalchemy.engine.Engine select version()
2021-01-02 14:57:27,745 INFO sqlalchemy.engine.Engine [raw sql] {}
2021-01-02 14:57:27,747 INFO sqlalchemy.engine.Engine select current_schema()
2021-01-02 14:57:27,747 INFO sqlalchemy.engine.Engine [raw sql] {}
2021-01-02 14:57:27,748 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2021-01-02 14:57:27,749 INFO sqlalchemy.engine.Engine [raw sql] {}
2021-01-02 14:57:27,750 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-01-02 14:57:27,751 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
2021-01-02 14:57:27,751 INFO sqlalchemy.engine.Engine [generated in 0.00032s] {'name': 'thing'}
2021-01-02 14:57:27,753 INFO sqlalchemy.engine.Engine COMMIT
2021-01-02 14:57:27,754 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-01-02 14:57:27,754 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
2021-01-02 14:57:27,754 INFO sqlalchemy.engine.Engine [cached since 0.00346s ago] {'name': 'thing'}
2021-01-02 14:57:27,757 INFO sqlalchemy.engine.Engine 
CREATE TABLE thing (
        id SERIAL NOT NULL, 
        PRIMARY KEY (id)
)

2021-01-02 14:57:27,757 INFO sqlalchemy.engine.Engine [no key 0.00039s] {}
2021-01-02 14:57:27,767 INFO sqlalchemy.engine.Engine COMMIT
2021-01-02 14:57:27,772 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-01-02 14:57:27,776 INFO sqlalchemy.engine.Engine INSERT INTO thing (id) VALUES (DEFAULT) RETURNING thing.id
2021-01-02 14:57:27,777 INFO sqlalchemy.engine.Engine [generated in 0.00044s] ({}, {}, {})
2021-01-02 14:57:27,783 INFO sqlalchemy.engine.Engine SELECT thing.id AS thing_id 
FROM thing
2021-01-02 14:57:27,783 INFO sqlalchemy.engine.Engine [generated in 0.00033s] {}
PASSED2021-01-02 14:57:27,785 INFO sqlalchemy.engine.Engine ROLLBACK

test_sync.py::test_thing_two_pytest 2021-01-02 14:57:27,787 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-01-02 14:57:27,788 INFO sqlalchemy.engine.Engine INSERT INTO thing (id) VALUES (DEFAULT) RETURNING thing.id
2021-01-02 14:57:27,788 INFO sqlalchemy.engine.Engine [cached since 0.01193s ago] ({}, {}, {})
2021-01-02 14:57:27,791 INFO sqlalchemy.engine.Engine SELECT thing.id AS thing_id 
FROM thing
2021-01-02 14:57:27,791 INFO sqlalchemy.engine.Engine [cached since 0.007803s ago] {}
PASSED2021-01-02 14:57:27,792 INFO sqlalchemy.engine.Engine ROLLBACK
2021-01-02 14:57:27,793 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-01-02 14:57:27,793 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
2021-01-02 14:57:27,794 INFO sqlalchemy.engine.Engine [cached since 0.04272s ago] {'name': 'thing'}
2021-01-02 14:57:27,795 INFO sqlalchemy.engine.Engine 
DROP TABLE thing
2021-01-02 14:57:27,796 INFO sqlalchemy.engine.Engine [no key 0.00035s] {}
2021-01-02 14:57:27,798 INFO sqlalchemy.engine.Engine COMMIT

======================================================================================================== 2 passed in 0.36s =========================================================================================================
CaselIT commented 3 years ago

@CaselIT I'm checking it, it seems it does pass.

Thanks for testing it. I think that since the sync version does work and the async version does not there is a problem in sqlachemy detection. Thanks for the test cases!

zzzeek commented 3 years ago

Still haven't run through the code myself here. I would first want to try porting the recipe that I've actually tested verbatim at https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#joining-a-session-into-an-external-transaction-such-as-for-test-suites into a WYSIWYG example that simply runs "setup", "test_XYZ", "teardown" in an asyncio format.

There's actually some irony here @CaselIT that if the bug lies in the simple nesting ability of the AsyncSession that your proposal to run suites like test/orm/test_transaction.py against AsyncSession would likely find these issues.

@PhillCli if you turn on SQL logging can you confirm the issue is simply that the AsyncSession that's being tested is emitting COMMIT on the database connection? that's basically what it's not supposed to do. There's not tests for this basic case yet so this may be something simple.

CaselIT commented 3 years ago

the issue is simply that the AsyncSession that's being tested is emitting COMMIT on the database connection?

this seems to be the case. See the logging I ported here https://github.com/sqlalchemy/sqlalchemy/issues/5811#issuecomment-753475040 This is for the tests in this comment https://github.com/sqlalchemy/sqlalchemy/issues/5811#issuecomment-753467602

As you can see the line await session.commit() in _test_thing does commit

2021-01-02 14:32:54,999 INFO sqlalchemy.engine.Engine RELEASE SAVEPOINT sa_savepoint_1
2021-01-02 14:32:54,999 INFO sqlalchemy.engine.Engine [no key 0.00028s] ()
2021-01-02 14:32:55,002 INFO sqlalchemy.engine.Engine COMMIT

There's actually some irony here @CaselIT that if the bug lies in the simple nesting ability of the AsyncSession that your proposal to run suites like test/orm/test_transaction.py against AsyncSession would likely find these issues.

Well, I guess that the thing is always that more test find more issues :). In all seriousness, some e2e test on stuff are always useful

PhillCli commented 3 years ago

@CaselIT thanks for the assiting! @zzzeek Yes, see above post by @CaselIT

sqla-tester commented 3 years ago

Mike Bayer has proposed a fix for this issue in the master branch:

Implement connection binding for AsyncSession https://gerrit.sqlalchemy.org/c/sqlalchemy/sqlalchemy/+/2457

zzzeek commented 3 years ago

the above gerrit implements AsyncSession binding to a specific AsyncConnection, which previously was silently failing to associate with the given AsyncConnection. Tests are added for the basic "join into a transaction" and the new version of the "join into a transaction using an event with a savepoint" patterns which now work as expected.

by "the new version", I mean an adaptation of the one documented at https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#joining-a-session-into-an-external-transaction-such-as-for-test-suites . The "legacy" pattern that you have above which uses "session.begin_nested()" to create the savepoint, this is not supported for the "future" style engine which asyncio uses. The new version uses the connection itself to recreate the savepoint inside the event.

A complete example of the new pattern using asyncio from the above gerrit:


        async with async_engine.connect() as conn:

            await conn.begin()

            # note this points to the sync version of the Transaction
            self.nested = (await conn.begin_nested()).sync_transaction

            async_session = AsyncSession(conn)

            @event.listens_for(
                async_session.sync_session, "after_transaction_end"
            )
            def end_savepoint(session, transaction):
                # inside the event, we deal with the sync version of the
                # connection to make a new nested transaction
                if not self.nested.is_active and not conn.closed:
                    self.nested = conn.sync_connection.begin_nested()

            # work with AsyncSession here, commit and rollback OK
            # ...

            # rollback outer transaction
            await conn.rollback()
PhillCli commented 3 years ago

@zzzeek thanks for working on it!

Case without the savepoints works as expected, and I'm seeing a huge reduction in wall time for my test-suite in SA related tasks! As I used schema drops as a temporary workaround.

Looking at your snippet with savepoints, I'm a bit confused on how to port it to pytest since I won't usually have the access to self.nested transaction object, once it's created.

Ideally, end_savepoint() arguments - (session, transaction), should be sufficient to re-create nested transaction, or shouldn't they?

zzzeek commented 3 years ago

well for the "nested" thing you just have to put that transaction somewhere, in a global context, something like that. you could probably put it into session.info. For that I have to add .info to the connection, moment :)

zzzeek commented 3 years ago

OK with the latest patch to the above gerrit the nested trans can be stored in the connection:

        async with async_engine.connect() as conn:

            await conn.begin()

            conn.info["nested"] = (await conn.begin_nested()).sync_transaction

            async_session = AsyncSession(conn)

            @event.listens_for(
                async_session.sync_session, "after_transaction_end"
            )
            def end_savepoint(session, transaction):
                """here's an event.  inside the event we write blocking
                style code.    wow will this be fun to try to explain :)

                """

                if conn.closed:
                    return
                nested = conn.info["nested"]

                if not nested.is_active:
                    conn.info["nested"] = conn.sync_connection.begin_nested()
zzzeek commented 3 years ago

OK i'm dumb, there's a get_nested_transaction() method, let me just port that.

zzzeek commented 3 years ago

OK latest and greatest:

        async with async_engine.connect() as conn:

            await conn.begin()

            await conn.begin_nested()

            async_session = AsyncSession(conn)

            @event.listens_for(
                async_session.sync_session, "after_transaction_end"
            )
            def end_savepoint(session, transaction):
                """here's an event.  inside the event we write blocking
                style code.    wow will this be fun to try to explain :)

                """

                if conn.closed:
                    return

                if not conn.in_nested_transaction():
                    conn.sync_connection.begin_nested()
PhillCli commented 3 years ago

I see, looks much cleaner and less confusing! I guess the docstring is a bit outdated then, hehe :) Thanks again, for picking it up so quickly!

Kludex commented 3 years ago

If you're using it with Pydantic (or FastAPI), you need to set expire_on_commit=False on the AsyncSession creation.