kvesteri / postgresql-audit

Audit trigger for PostgreSQL
BSD 2-Clause "Simplified" License
126 stars 28 forks source link

sqlalchemy.exc.InvalideRequestError when calling version_manager.init(Base) in minimal FastAPI app #61

Open michaeltoohig opened 2 years ago

michaeltoohig commented 2 years ago

I'm looking for a solution which may be documented and I haven't seen it yet or I am finding some bug when trying to initialize the versioning_manager or related to #39

My simplified app example I'm working with is as follows and is mostly a copy of the FastAPI documentation's SQL integration guide found here with some modifications for async database access via SQLAlchemy 1.4 based on this

What I notice about the error is the error does not come directly from the .init(Base) call but when the Base.metadata.create_all is called the error is raised. The error does not occur and I do not see any Activity table built or schemas in the database if I move init_versioning_manager after create_db_and_tables.

I would greatly appreciate help with this as I'm no longer sure what is wrong or if this is functionality that is not yet supported with async SQLAlchemy 1.4 or something else entirely. I also would love to use this project since I can not use SQLAlchemy-Continuum anylonger with SLQA1.4 since I'm now using the core api and not the orm so the events used by continuum do not work with the core api and this project seemed like my solution.

sqlalchemy.exc.InvalidRequestError

INFO:     Started server process [24408]
INFO:     Waiting for application startup.
ERROR:    Traceback (most recent call last):
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/starlette/routing.py", line 621, in lifespan
    async with self.lifespan_context(app):
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/starlette/routing.py", line 518, in __aenter__
    await self._router.startup()
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/starlette/routing.py", line 598, in startup
    await handler()
  File "/home/michael/personal/test-audit/app/main.py", line 15, in on_startup
    await create_db_and_tables()
  File "/home/michael/personal/test-audit/app/database.py", line 21, in create_db_and_tables
    await conn.run_sync(Base.metadata.create_all)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/ext/asyncio/engine.py", line 559, in run_sync
    return await greenlet_spawn(fn, conn, *arg, **kw)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 136, in greenlet_spawn
    result = context.switch(value)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/schema.py", line 4785, in create_all
    bind._run_ddl_visitor(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2113, in _run_ddl_visitor
    visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/visitors.py", line 524, in traverse_single
    return meth(obj, **kw)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 849, in visit_metadata
    self.traverse_single(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/visitors.py", line 524, in traverse_single
    return meth(obj, **kw)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 915, in visit_table
    table.dispatch.after_create(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/event/attr.py", line 343, in __call__
    fn(*args, **kw)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/postgresql_audit/base.py", line 262, in create_audit_table
    StatementExecutor(sql)(target, bind, **kwargs)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/postgresql_audit/base.py", line 38, in __call__
    tx = bind.begin()
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/future/engine.py", line 144, in begin
    return super(Connection, self).begin()
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 766, in begin
    raise exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: This connection has already initialized a SQLAlchemy Transaction() object via begin() or autobegin; can't call begin() here unless rollback() or commit() is called first.

ERROR:    Application startup failed. Exiting.
.
├── docker-compose.yml
├── main.py
├── app
│   ├── crud.py
│   ├── database.py
│   ├── main.py
│   ├── models.py
│   └── schemas.py
└── scripts
    └── init.sh
[tool.poetry.dependencies]
python = "^3.8"
PostgreSQL-Audit = "^0.13.0"
fastapi = "^0.74.1"
uvicorn = "^0.17.5"
asyncpg = "^0.25.0"

main.py

import uvicorn

if __name__ == "__main__":
    uvicorn.run("app.main:app", host="0.0.0.0", port=5001, log_level="debug")

docker-compose.yml

version: "3.9"

services:
  db:
    image: postgres:13
    environment:
      - POSTGRES_USER=db_user
      - POSTGRES_PASSWORD=db_pass
      - PGDATA=/var/lib/postgresql/data/pgdata
    ports:
      - "5432:5432"
    volumes:
      - ./scripts:/docker-entrypoint-initdb.d

  admin:
    image: adminer
    restart: always
    ports:
      - 8081:8080
    depends_on: 
      - db

scripts/init.sh

psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" <<EOF
CREATE EXTENSION IF NOT EXISTS btree_gist;
EOF

app/crud.py

import abc
from typing import Generic, List, Type, TypeVar
from sqlalchemy import select
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from . import models, schemas

IN_SCHEMA = TypeVar("IN_SCHEMA", bound=BaseModel)
SCHEMA = TypeVar("SCHEMA", bound=BaseModel)
TABLE = TypeVar("TABLE")

class CRUDBase(Generic[TABLE, IN_SCHEMA, SCHEMA], metaclass=abc.ABCMeta):
    def __init__(self, db_session: AsyncSession, *args, **kwargs) -> None:
        self._db_session: AsyncSession = db_session

    @property
    @abc.abstractmethod
    def _table(self) -> Type[TABLE]:
        ...

    @property
    @abc.abstractmethod
    def _schema(self) -> Type[SCHEMA]:
        ...

    async def create(self, in_schema: IN_SCHEMA) -> SCHEMA:
        item = self._table(**in_schema.dict())
        self._db_session.add(item)
        await self._db_session.commit()
        return self._schema.from_orm(item)

    async def get_by_id(self, item_id: int) -> SCHEMA:
        query = (
            select(self._table)
            .where(self._table.id == item_id)
        )
        (item,) = (await self._db_session.execute(query)).one()
        return self._schema.from_orm(item)

    async def get_multi(self) -> List[SCHEMA]:
        query = select(self._table)
        results = await self._db_session.execute(query)
        return (self._schema.from_orm(item) for item in results.scalars())

    async def remove(self, item_id: int) -> SCHEMA:
        item = await self._get_one(item_id)
        await self._db_session.delete(item)
        await self._db_session.commit()
        return self._schema.from_orm(item)

class CRUDItem(CRUDBase[models.Item, schemas.ItemCreate, schemas.Item]):
    @property
    def _in_schema(self) -> Type[schemas.ItemCreate]:
        return schemas.ItemCreate

    @property
    def _schema(self) -> Type[schemas.Item]:
        return schemas.Item

    @property
    def _table(self) -> Type[models.Item]:
        return models.Item

app/database.py

from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "postgresql+asyncpg://db_user:db_pass@localhost:5432/db_user"

engine = create_async_engine(SQLALCHEMY_DATABASE_URL)
async_session_maker = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
Base = declarative_base()

# please don't mind the exact implementation here... I've tried many different ways to call `versioning_manager.init(Base)` and this is just the last one I attempted.
async def init_versioning_manager():
    from postgresql_audit import versioning_manager
    async with async_session_maker() as session:
        versioning_manager.init(Base)
        await session.commit()

async def create_db_and_tables():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session_maker() as session:
        yield session
        await session.commit()

app/main.py

from typing import List

from fastapi import Depends, FastAPI
from sqlalchemy.ext.asyncio.session import AsyncSession

from . import crud, schemas
from .database import create_db_and_tables, get_async_session, init_versioning_manager

app = FastAPI()

@app.on_event("startup")
async def on_startup():
    # Not needed if you setup a migration system like Alembic
    await init_versioning_manager()
    await create_db_and_tables()

@app.post("/items/", response_model=schemas.Item)
async def create_item(
    db: AsyncSession = Depends(get_async_session),
    *,
    item: schemas.ItemCreate,
):
    item_crud = crud.CRUDItem(db)
    item = await item_crud.create(in_schema=item)
    return item

@app.get("/items/", response_model=List[schemas.Item])
async def read_items(db: AsyncSession = Depends(get_async_session)):
    item_crud = crud.CRUDItem(db)
    items = await item_crud.get_multi()
    return items

app/models.py

from sqlalchemy import Column, Integer, String
from .database import Base

class Item(Base):
    __tablename__ = "items"
    __versioned__ = {}
    id = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(String)
    description = Column(String)

app/schemas.py

from typing import List, Optional
from pydantic import BaseModel

class ItemBase(BaseModel):
    title: str
    description: Optional[str] = None

class ItemCreate(ItemBase):
    pass

class Item(ItemBase):
    id: int

    class Config:
        orm_mode = True
michaeltoohig commented 2 years ago

Not sure if this helps or is unrelated but I initialized my own versioning manager and set a random schema_name value to "test". I also tested postgresql 10 to be sure the following error was not related to newer versions of postgresql.

I thought my issue was strictly an issue with async but this error appears to be pointing to something else but I'm not sure anymore why this error is occuring.

DB log

db_1     | 2022-02-25 02:44:12.360 UTC [584] ERROR:  cannot insert multiple commands into a prepared statement
db_1     | 2022-02-25 02:44:12.360 UTC [584] STATEMENT:  CREATE SCHEMA test;
db_1     |      REVOKE ALL ON SCHEMA test FROM public;

Python

INFO:     Started server process [26032]
INFO:     Waiting for application startup.
ERROR:    Traceback (most recent call last):
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 399, in _prepare_and_execute
    prepared_stmt, attributes = await adapt_connection._prepare(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 641, in _prepare
    prepared_stmt = await self._connection.prepare(operation)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/asyncpg/connection.py", line 566, in prepare
    return await self._prepare(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/asyncpg/connection.py", line 584, in _prepare
    stmt = await self._get_statement(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/asyncpg/connection.py", line 398, in _get_statement
    statement = await self._protocol.prepare(
  File "asyncpg/protocol/protocol.pyx", line 168, in prepare
asyncpg.exceptions.PostgresSyntaxError: cannot insert multiple commands into a prepared statement

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 460, in execute
    self._adapt_connection.await_(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 76, in await_only
    return current.driver.switch(awaitable)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 129, in greenlet_spawn
    value = await result
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 435, in _prepare_and_execute
    self._handle_exception(error)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 370, in _handle_exception
    self._adapt_connection._handle_exception(error)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 663, in _handle_exception
    raise translated_error from error
sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.ProgrammingError: <class 'asyncpg.exceptions.PostgresSyntaxError'>: cannot insert multiple commands into a prepared statement

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/starlette/routing.py", line 621, in lifespan
    async with self.lifespan_context(app):
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/starlette/routing.py", line 518, in __aenter__
    await self._router.startup()
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/starlette/routing.py", line 598, in startup
    await handler()
  File "/home/michael/personal/test-audit/app/main.py", line 15, in on_startup
    await create_db_and_tables()
  File "/home/michael/personal/test-audit/app/database.py", line 21, in create_db_and_tables
    await conn.run_sync(Base.metadata.create_all)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/ext/asyncio/engine.py", line 559, in run_sync
    return await greenlet_spawn(fn, conn, *arg, **kw)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 134, in greenlet_spawn
    result = context.throw(*sys.exc_info())
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/schema.py", line 4785, in create_all
    bind._run_ddl_visitor(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2113, in _run_ddl_visitor
    visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/visitors.py", line 524, in traverse_single
    return meth(obj, **kw)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 849, in visit_metadata
    self.traverse_single(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/visitors.py", line 524, in traverse_single
    return meth(obj, **kw)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 877, in visit_table
    table.dispatch.before_create(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/event/attr.py", line 343, in __call__
    fn(*args, **kw)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 248, in __call__
    return bind.execute(self.against(target))
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/future/engine.py", line 280, in execute
    return self._execute_20(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 80, in _execute_on_connection
    return connection._execute_ddl(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1381, in _execute_ddl
    ret = self._execute_context(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
    self._handle_dbapi_exception(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
    util.raise_(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 460, in execute
    self._adapt_connection.await_(
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 76, in await_only
    return current.driver.switch(awaitable)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 129, in greenlet_spawn
    value = await result
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 435, in _prepare_and_execute
    self._handle_exception(error)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 370, in _handle_exception
    self._adapt_connection._handle_exception(error)
  File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 663, in _handle_exception
    raise translated_error from error
sqlalchemy.exc.ProgrammingError: (sqlalchemy.dialects.postgresql.asyncpg.ProgrammingError) <class 'asyncpg.exceptions.PostgresSyntaxError'>: cannot insert multiple commands into a prepared statement
[SQL: CREATE SCHEMA test;
REVOKE ALL ON SCHEMA test FROM public;
]
(Background on this error at: https://sqlalche.me/e/14/f405)

ERROR:    Application startup failed. Exiting.